mercurial/worker.py
author Martin von Zweigbergk <martinvonz@google.com>
Tue, 15 Sep 2020 23:19:14 -0700
changeset 45502 aad11a26a054
parent 45409 7d24201b6447
child 45844 8f07f5a9c3de
permissions -rw-r--r--
mergestate: simplify reset(), knowing that `other` and `node` go together There's only one caller of `reset()` that passes any arguments at all, and that originates from `merge.py:1371`. That code always passes values for both `node` and `other`. Differential Revision: https://phab.mercurial-scm.org/D9032
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     1
# worker.py - master-slave parallelism support
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     2
#
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     3
# Copyright 2013 Facebook, Inc.
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     4
#
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     5
# This software may be used and distributed according to the terms of the
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     6
# GNU General Public License version 2 or any later version.
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     7
25992
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
     8
from __future__ import absolute_import
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
     9
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    10
import errno
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    11
import os
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    12
import signal
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    13
import sys
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
    14
import threading
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
    15
import time
25992
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    16
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
    17
try:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
    18
    import selectors
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
    19
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
    20
    selectors.BaseSelector
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
    21
except ImportError:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
    22
    from .thirdparty import selectors2 as selectors
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
    23
25992
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    24
from .i18n import _
30406
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
    25
from . import (
30640
a150173da1c1 py3: replace os.environ with encoding.environ (part 2 of 5)
Pulkit Goyal <7895pulkit@gmail.com>
parents: 30530
diff changeset
    26
    encoding,
30406
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
    27
    error,
30644
d524c88511a7 py3: replace os.name with pycompat.osname (part 1 of 2)
Pulkit Goyal <7895pulkit@gmail.com>
parents: 30640
diff changeset
    28
    pycompat,
30530
86cd09bc13ba worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents: 30434
diff changeset
    29
    scmutil,
30406
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
    30
    util,
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
    31
)
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
    32
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
    33
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
    34
def countcpus():
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
    35
    '''try to count the number of CPUs on the system'''
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    36
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    37
    # posix
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
    38
    try:
43554
9f70512ae2cf cleanup: remove pointless r-prefixes on single-quoted strings
Augie Fackler <augie@google.com>
parents: 43077
diff changeset
    39
        n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    40
        if n > 0:
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    41
            return n
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    42
    except (AttributeError, ValueError):
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    43
        pass
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    44
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    45
    # windows
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    46
    try:
43077
687b865b95ad formatting: byteify all mercurial/ and hgext/ string literals
Augie Fackler <augie@google.com>
parents: 43076
diff changeset
    47
        n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    48
        if n > 0:
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    49
            return n
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    50
    except (KeyError, ValueError):
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    51
        pass
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    52
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    53
    return 1
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    54
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
    55
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    56
def _numworkers(ui):
43077
687b865b95ad formatting: byteify all mercurial/ and hgext/ string literals
Augie Fackler <augie@google.com>
parents: 43076
diff changeset
    57
    s = ui.config(b'worker', b'numcpus')
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    58
    if s:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    59
        try:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    60
            n = int(s)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    61
            if n >= 1:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    62
                return n
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    63
        except ValueError:
43077
687b865b95ad formatting: byteify all mercurial/ and hgext/ string literals
Augie Fackler <augie@google.com>
parents: 43076
diff changeset
    64
            raise error.Abort(_(b'number of cpus must be an integer'))
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    65
    return min(max(countcpus(), 4), 32)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    66
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
    67
44165
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    68
if pycompat.ispy3:
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    69
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    70
    class _blockingreader(object):
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    71
        def __init__(self, wrapped):
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    72
            self._wrapped = wrapped
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    73
45409
7d24201b6447 worker: don't expose readinto() on _blockingreader since pickle is picky
Martin von Zweigbergk <martinvonz@google.com>
parents: 45396
diff changeset
    74
        # Do NOT implement readinto() by making it delegate to
7d24201b6447 worker: don't expose readinto() on _blockingreader since pickle is picky
Martin von Zweigbergk <martinvonz@google.com>
parents: 45396
diff changeset
    75
        # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
7d24201b6447 worker: don't expose readinto() on _blockingreader since pickle is picky
Martin von Zweigbergk <martinvonz@google.com>
parents: 45396
diff changeset
    76
        # with just read() and readline(), so we don't need to implement it.
7d24201b6447 worker: don't expose readinto() on _blockingreader since pickle is picky
Martin von Zweigbergk <martinvonz@google.com>
parents: 45396
diff changeset
    77
7d24201b6447 worker: don't expose readinto() on _blockingreader since pickle is picky
Martin von Zweigbergk <martinvonz@google.com>
parents: 45396
diff changeset
    78
        def readline(self):
7d24201b6447 worker: don't expose readinto() on _blockingreader since pickle is picky
Martin von Zweigbergk <martinvonz@google.com>
parents: 45396
diff changeset
    79
            return self._wrapped.readline()
44165
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    80
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    81
        # issue multiple reads until size is fulfilled
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    82
        def read(self, size=-1):
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    83
            if size < 0:
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    84
                return self._wrapped.readall()
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    85
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    86
            buf = bytearray(size)
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    87
            view = memoryview(buf)
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    88
            pos = 0
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    89
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    90
            while pos < size:
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    91
                ret = self._wrapped.readinto(view[pos:])
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    92
                if not ret:
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    93
                    break
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    94
                pos += ret
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    95
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    96
            del view
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    97
            del buf[pos:]
45409
7d24201b6447 worker: don't expose readinto() on _blockingreader since pickle is picky
Martin von Zweigbergk <martinvonz@google.com>
parents: 45396
diff changeset
    98
            return bytes(buf)
44165
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    99
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
   100
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
   101
else:
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
   102
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
   103
    def _blockingreader(wrapped):
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
   104
        return wrapped
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
   105
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
   106
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   107
if pycompat.isposix or pycompat.iswindows:
38730
69ed2cff4277 worker: rename variable to reflect constant
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38729
diff changeset
   108
    _STARTUP_COST = 0.01
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   109
    # The Windows worker is thread based. If tasks are CPU bound, threads
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   110
    # in the presence of the GIL result in excessive context switching and
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   111
    # this overhead can slow down execution.
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   112
    _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
   113
else:
38730
69ed2cff4277 worker: rename variable to reflect constant
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38729
diff changeset
   114
    _STARTUP_COST = 1e30
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   115
    _DISALLOW_THREAD_UNSAFE = False
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
   116
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   117
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   118
def worthwhile(ui, costperop, nops, threadsafe=True):
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
   119
    '''try to determine whether the benefit of multiple processes can
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
   120
    outweigh the cost of starting them'''
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   121
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   122
    if not threadsafe and _DISALLOW_THREAD_UNSAFE:
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   123
        return False
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   124
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
   125
    linear = costperop * nops
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
   126
    workers = _numworkers(ui)
38730
69ed2cff4277 worker: rename variable to reflect constant
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38729
diff changeset
   127
    benefit = linear - (_STARTUP_COST * workers + linear / workers)
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
   128
    return benefit >= 0.15
18637
ac4dbceeb14a worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents: 18636
diff changeset
   129
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   130
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   131
def worker(
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   132
    ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   133
):
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   134
    '''run a function, possibly in parallel in multiple worker
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   135
    processes.
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   136
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   137
    returns a progress iterator
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   138
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   139
    costperarg - cost of a single task
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   140
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   141
    func - function to run. It is expected to return a progress iterator.
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   142
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   143
    staticargs - arguments to pass to every invocation of the function
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   144
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   145
    args - arguments to split into chunks, to pass to individual
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   146
    workers
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   147
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   148
    hasretval - when True, func and the current function return an progress
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   149
    iterator then a dict (encoded as an iterator that yield many (False, ..)
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   150
    then a (True, dict)). The dicts are joined in some arbitrary order, so
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   151
    overlapping keys are a bad idea.
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   152
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   153
    threadsafe - whether work items are thread safe and can be executed using
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   154
    a thread-based worker. Should be disabled for CPU heavy tasks that don't
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   155
    release the GIL.
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   156
    '''
43077
687b865b95ad formatting: byteify all mercurial/ and hgext/ string literals
Augie Fackler <augie@google.com>
parents: 43076
diff changeset
   157
    enabled = ui.configbool(b'worker', b'enabled')
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   158
    if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   159
        return _platformworker(ui, func, staticargs, args, hasretval)
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   160
    return func(*staticargs + (args,))
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   161
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   162
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   163
def _posixworker(ui, func, staticargs, args, hasretval):
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   164
    workers = _numworkers(ui)
18708
86524a70c0f6 worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents: 18707
diff changeset
   165
    oldhandler = signal.getsignal(signal.SIGINT)
86524a70c0f6 worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents: 18707
diff changeset
   166
    signal.signal(signal.SIGINT, signal.SIG_IGN)
30423
9c25a1a8c685 worker: change "pids" to a set
Jun Wu <quark@fb.com>
parents: 30422
diff changeset
   167
    pids, problem = set(), [0]
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   168
30420
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
   169
    def killworkers():
30432
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
   170
        # unregister SIGCHLD handler as all children will be killed. This
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
   171
        # function shouldn't be interrupted by another SIGCHLD; otherwise pids
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
   172
        # could be updated while iterating, which would cause inconsistency.
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
   173
        signal.signal(signal.SIGCHLD, oldchldhandler)
30420
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
   174
        # if one worker bails, there's no good reason to wait for the rest
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
   175
        for p in pids:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
   176
            try:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
   177
                os.kill(p, signal.SIGTERM)
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
   178
            except OSError as err:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
   179
                if err.errno != errno.ESRCH:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
   180
                    raise
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   181
30422
7bc25549e084 worker: allow waitforworkers to be non-blocking
Jun Wu <quark@fb.com>
parents: 30421
diff changeset
   182
    def waitforworkers(blocking=True):
30424
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
   183
        for pid in pids.copy():
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
   184
            p = st = 0
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
   185
            while True:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
   186
                try:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
   187
                    p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
30431
0e6ce6313e47 worker: fix missed break on successful waitpid()
Yuya Nishihara <yuya@tcha.org>
parents: 30426
diff changeset
   188
                    break
30424
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
   189
                except OSError as e:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
   190
                    if e.errno == errno.EINTR:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
   191
                        continue
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
   192
                    elif e.errno == errno.ECHILD:
30434
03f7aa2bd0e3 worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents: 30433
diff changeset
   193
                        # child would already be reaped, but pids yet been
03f7aa2bd0e3 worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents: 30433
diff changeset
   194
                        # updated (maybe interrupted just after waitpid)
03f7aa2bd0e3 worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents: 30433
diff changeset
   195
                        pids.discard(pid)
03f7aa2bd0e3 worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents: 30433
diff changeset
   196
                        break
30424
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
   197
                    else:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
   198
                        raise
30878
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
   199
            if not p:
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
   200
                # skip subsequent steps, because child process should
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
   201
                # be still running in this case
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
   202
                continue
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
   203
            pids.discard(p)
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
   204
            st = _exitstatus(st)
30420
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
   205
            if st and not problem[0]:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
   206
                problem[0] = st
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   207
30425
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
   208
    def sigchldhandler(signum, frame):
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
   209
        waitforworkers(blocking=False)
30433
f2d13eb85198 worker: kill workers after all zombie processes are reaped
Yuya Nishihara <yuya@tcha.org>
parents: 30432
diff changeset
   210
        if problem[0]:
f2d13eb85198 worker: kill workers after all zombie processes are reaped
Yuya Nishihara <yuya@tcha.org>
parents: 30432
diff changeset
   211
            killworkers()
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   212
30425
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
   213
    oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
31701
9d3d56aa1a9f worker: flush ui buffers before running the worker
David Soria Parra <davidsp@fb.com>
parents: 31134
diff changeset
   214
    ui.flush()
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   215
    parentpid = os.getpid()
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   216
    pipes = []
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   217
    retval = {}
45396
26eb62bd0550 posixworker: avoid creating workers that end up getting no work
Martin von Zweigbergk <martinvonz@google.com>
parents: 44165
diff changeset
   218
    for pargs in partition(args, min(workers, len(args))):
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   219
        # Every worker gets its own pipe to send results on, so we don't have to
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   220
        # implement atomic writes larger than PIPE_BUF. Each forked process has
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   221
        # its own pipe's descriptors in the local variables, and the parent
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   222
        # process has the full list of pipe descriptors (and it doesn't really
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   223
        # care what order they're in).
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   224
        rfd, wfd = os.pipe()
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   225
        pipes.append((rfd, wfd))
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   226
        # make sure we use os._exit in all worker code paths. otherwise the
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   227
        # worker may do some clean-ups which could cause surprises like
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   228
        # deadlock. see sshpeer.cleanup for example.
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   229
        # override error handling *before* fork. this is necessary because
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   230
        # exception (signal) may arrive after fork, before "pid =" assignment
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   231
        # completes, and other exception handler (dispatch.py) can lead to
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   232
        # unexpected code path without os._exit.
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   233
        ret = -1
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   234
        try:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   235
            pid = os.fork()
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   236
            if pid == 0:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   237
                signal.signal(signal.SIGINT, oldhandler)
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   238
                signal.signal(signal.SIGCHLD, oldchldhandler)
30530
86cd09bc13ba worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents: 30434
diff changeset
   239
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   240
                def workerfunc():
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   241
                    for r, w in pipes[:-1]:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   242
                        os.close(r)
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   243
                        os.close(w)
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   244
                    os.close(rfd)
38536
8c38d2948217 worker: support more return types in posix worker
Danny Hooper <hooper@google.com>
parents: 37890
diff changeset
   245
                    for result in func(*(staticargs + (pargs,))):
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   246
                        os.write(wfd, util.pickle.dumps(result))
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   247
                    return 0
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   248
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   249
                ret = scmutil.callcatch(ui, workerfunc)
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   250
        except:  # parent re-raises, child never returns
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   251
            if os.getpid() == parentpid:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   252
                raise
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   253
            exctype = sys.exc_info()[0]
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   254
            force = not issubclass(exctype, KeyboardInterrupt)
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   255
            ui.traceback(force=force)
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   256
        finally:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   257
            if os.getpid() != parentpid:
30882
a91c62752d08 worker: flush messages written by child processes before exit
Yuya Nishihara <yuya@tcha.org>
parents: 30878
diff changeset
   258
                try:
a91c62752d08 worker: flush messages written by child processes before exit
Yuya Nishihara <yuya@tcha.org>
parents: 30878
diff changeset
   259
                    ui.flush()
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   260
                except:  # never returns, no re-raises
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   261
                    pass
30530
86cd09bc13ba worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents: 30434
diff changeset
   262
                finally:
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   263
                    os._exit(ret & 255)
30423
9c25a1a8c685 worker: change "pids" to a set
Jun Wu <quark@fb.com>
parents: 30422
diff changeset
   264
        pids.add(pid)
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   265
    selector = selectors.DefaultSelector()
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   266
    for rfd, wfd in pipes:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   267
        os.close(wfd)
44165
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
   268
        selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   269
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   270
    def cleanup():
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   271
        signal.signal(signal.SIGINT, oldhandler)
30426
c27614f2dec1 worker: stop using a separate thread waiting for children
Jun Wu <quark@fb.com>
parents: 30425
diff changeset
   272
        waitforworkers()
30425
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
   273
        signal.signal(signal.SIGCHLD, oldchldhandler)
38740
c08ea1e219c0 worker: call selector.close() to release polling resources
Yuya Nishihara <yuya@tcha.org>
parents: 38731
diff changeset
   274
        selector.close()
40472
03f7d0822ec1 worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents: 38740
diff changeset
   275
        return problem[0]
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   276
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   277
    try:
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   278
        openpipes = len(pipes)
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   279
        while openpipes > 0:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   280
            for key, events in selector.select():
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   281
                try:
44165
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
   282
                    res = util.pickle.load(_blockingreader(key.fileobj))
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   283
                    if hasretval and res[0]:
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   284
                        retval.update(res[1])
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   285
                    else:
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   286
                        yield res
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   287
                except EOFError:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   288
                    selector.unregister(key.fileobj)
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   289
                    key.fileobj.close()
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   290
                    openpipes -= 1
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   291
                except IOError as e:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   292
                    if e.errno == errno.EINTR:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   293
                        continue
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   294
                    raise
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   295
    except:  # re-raises
18709
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
   296
        killworkers()
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   297
        cleanup()
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   298
        raise
40472
03f7d0822ec1 worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents: 38740
diff changeset
   299
    status = cleanup()
03f7d0822ec1 worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents: 38740
diff changeset
   300
    if status:
03f7d0822ec1 worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents: 38740
diff changeset
   301
        if status < 0:
03f7d0822ec1 worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents: 38740
diff changeset
   302
            os.kill(os.getpid(), -status)
03f7d0822ec1 worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents: 38740
diff changeset
   303
        sys.exit(status)
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   304
    if hasretval:
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   305
        yield True, retval
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   306
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   307
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   308
def _posixexitstatus(code):
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   309
    '''convert a posix exit status into the same form returned by
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   310
    os.spawnv
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   311
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   312
    returns None if the process was stopped instead of exiting'''
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   313
    if os.WIFEXITED(code):
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   314
        return os.WEXITSTATUS(code)
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   315
    elif os.WIFSIGNALED(code):
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   316
        return -(os.WTERMSIG(code))
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   317
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   318
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   319
def _windowsworker(ui, func, staticargs, args, hasretval):
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   320
    class Worker(threading.Thread):
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   321
        def __init__(
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   322
            self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   323
        ):
40508
909c31805f54 py3: roll up threading.Thread constructor args into **kwargs
Matt Harbison <matt_harbison@yahoo.com>
parents: 38740
diff changeset
   324
            threading.Thread.__init__(self, *args, **kwargs)
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   325
            self._taskqueue = taskqueue
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   326
            self._resultqueue = resultqueue
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   327
            self._func = func
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   328
            self._staticargs = staticargs
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   329
            self._interrupted = False
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   330
            self.daemon = True
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   331
            self.exception = None
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   332
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   333
        def interrupt(self):
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   334
            self._interrupted = True
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   335
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   336
        def run(self):
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   337
            try:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   338
                while not self._taskqueue.empty():
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   339
                    try:
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   340
                        args = self._taskqueue.get_nowait()
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   341
                        for res in self._func(*self._staticargs + (args,)):
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   342
                            self._resultqueue.put(res)
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   343
                            # threading doesn't provide a native way to
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   344
                            # interrupt execution. handle it manually at every
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   345
                            # iteration.
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   346
                            if self._interrupted:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   347
                                return
37890
8fb9985382be pycompat: export queue module instead of symbols in module (API)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 36843
diff changeset
   348
                    except pycompat.queue.Empty:
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   349
                        break
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   350
            except Exception as e:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   351
                # store the exception such that the main thread can resurface
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   352
                # it as if the func was running without workers.
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   353
                self.exception = e
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   354
                raise
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   355
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   356
    threads = []
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   357
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   358
    def trykillworkers():
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   359
        # Allow up to 1 second to clean worker threads nicely
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   360
        cleanupend = time.time() + 1
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   361
        for t in threads:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   362
            t.interrupt()
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   363
        for t in threads:
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   364
            remainingtime = cleanupend - time.time()
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   365
            t.join(remainingtime)
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   366
            if t.is_alive():
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   367
                # pass over the workers joining failure. it is more
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   368
                # important to surface the inital exception than the
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   369
                # fact that one of workers may be processing a large
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   370
                # task and does not get to handle the interruption.
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   371
                ui.warn(
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   372
                    _(
43077
687b865b95ad formatting: byteify all mercurial/ and hgext/ string literals
Augie Fackler <augie@google.com>
parents: 43076
diff changeset
   373
                        b"failed to kill worker threads while "
687b865b95ad formatting: byteify all mercurial/ and hgext/ string literals
Augie Fackler <augie@google.com>
parents: 43076
diff changeset
   374
                        b"handling an exception\n"
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   375
                    )
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   376
                )
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   377
                return
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   378
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   379
    workers = _numworkers(ui)
37890
8fb9985382be pycompat: export queue module instead of symbols in module (API)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 36843
diff changeset
   380
    resultqueue = pycompat.queue.Queue()
8fb9985382be pycompat: export queue module instead of symbols in module (API)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 36843
diff changeset
   381
    taskqueue = pycompat.queue.Queue()
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   382
    retval = {}
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   383
    # partition work to more pieces than workers to minimize the chance
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   384
    # of uneven distribution of large tasks between the workers
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   385
    for pargs in partition(args, workers * 20):
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   386
        taskqueue.put(pargs)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   387
    for _i in range(workers):
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   388
        t = Worker(taskqueue, resultqueue, func, staticargs)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   389
        threads.append(t)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   390
        t.start()
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   391
    try:
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   392
        while len(threads) > 0:
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   393
            while not resultqueue.empty():
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   394
                res = resultqueue.get()
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   395
                if hasretval and res[0]:
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   396
                    retval.update(res[1])
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   397
                else:
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   398
                    yield res
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   399
            threads[0].join(0.05)
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   400
            finishedthreads = [_t for _t in threads if not _t.is_alive()]
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   401
            for t in finishedthreads:
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   402
                if t.exception is not None:
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   403
                    raise t.exception
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   404
                threads.remove(t)
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   405
    except (Exception, KeyboardInterrupt):  # re-raises
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   406
        trykillworkers()
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   407
        raise
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   408
    while not resultqueue.empty():
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   409
        res = resultqueue.get()
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   410
        if hasretval and res[0]:
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   411
            retval.update(res[1])
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   412
        else:
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   413
            yield res
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   414
    if hasretval:
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   415
        yield True, retval
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   416
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   417
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   418
if pycompat.iswindows:
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   419
    _platformworker = _windowsworker
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   420
else:
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   421
    _platformworker = _posixworker
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   422
    _exitstatus = _posixexitstatus
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   423
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   424
18637
ac4dbceeb14a worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents: 18636
diff changeset
   425
def partition(lst, nslices):
28181
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   426
    '''partition a list into N slices of roughly equal size
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   427
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   428
    The current strategy takes every Nth element from the input. If
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   429
    we ever write workers that need to preserve grouping in input
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   430
    we should consider allowing callers to specify a partition strategy.
28292
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   431
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   432
    mpm is not a fan of this partitioning strategy when files are involved.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   433
    In his words:
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   434
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   435
        Single-threaded Mercurial makes a point of creating and visiting
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   436
        files in a fixed order (alphabetical). When creating files in order,
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   437
        a typical filesystem is likely to allocate them on nearby regions on
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   438
        disk. Thus, when revisiting in the same order, locality is maximized
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   439
        and various forms of OS and disk-level caching and read-ahead get a
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   440
        chance to work.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   441
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   442
        This effect can be quite significant on spinning disks. I discovered it
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   443
        circa Mercurial v0.4 when revlogs were named by hashes of filenames.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   444
        Tarring a repo and copying it to another disk effectively randomized
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   445
        the revlog ordering on disk by sorting the revlogs by hash and suddenly
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   446
        performance of my kernel checkout benchmark dropped by ~10x because the
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   447
        "working set" of sectors visited no longer fit in the drive's cache and
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   448
        the workload switched from streaming to random I/O.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   449
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   450
        What we should really be doing is have workers read filenames from a
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   451
        ordered queue. This preserves locality and also keeps any worker from
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   452
        getting more than one file out of balance.
28181
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   453
    '''
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   454
    for i in range(nslices):
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   455
        yield lst[i::nslices]