mercurial/worker.py
author Gregory Szorc <gregory.szorc@gmail.com>
Tue, 17 Jul 2018 16:57:27 -0700
changeset 38730 69ed2cff4277
parent 38729 9e6afe7fca31
child 38731 ef3838a47503
permissions -rw-r--r--
worker: rename variable to reflect constant Differential Revision: https://phab.mercurial-scm.org/D3961
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     1
# worker.py - master-slave parallelism support
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     2
#
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     3
# Copyright 2013 Facebook, Inc.
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     4
#
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     5
# This software may be used and distributed according to the terms of the
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     6
# GNU General Public License version 2 or any later version.
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     7
25992
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
     8
from __future__ import absolute_import
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
     9
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    10
import errno
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    11
import os
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    12
import signal
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    13
import sys
35427
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
    14
import threading
35432
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
    15
import time
25992
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    16
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
    17
try:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
    18
    import selectors
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
    19
    selectors.BaseSelector
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
    20
except ImportError:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
    21
    from .thirdparty import selectors2 as selectors
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
    22
25992
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    23
from .i18n import _
30396
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
    24
from . import (
30635
a150173da1c1 py3: replace os.environ with encoding.environ (part 2 of 5)
Pulkit Goyal <7895pulkit@gmail.com>
parents: 30521
diff changeset
    25
    encoding,
30396
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
    26
    error,
30639
d524c88511a7 py3: replace os.name with pycompat.osname (part 1 of 2)
Pulkit Goyal <7895pulkit@gmail.com>
parents: 30635
diff changeset
    27
    pycompat,
30521
86cd09bc13ba worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents: 30425
diff changeset
    28
    scmutil,
30396
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
    29
    util,
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
    30
)
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
    31
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
    32
def countcpus():
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
    33
    '''try to count the number of CPUs on the system'''
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    34
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    35
    # posix
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
    36
    try:
32611
954489932c4f py3: pass str in os.sysconf()
Pulkit Goyal <7895pulkit@gmail.com>
parents: 32112
diff changeset
    37
        n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    38
        if n > 0:
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    39
            return n
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    40
    except (AttributeError, ValueError):
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    41
        pass
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    42
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    43
    # windows
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    44
    try:
30635
a150173da1c1 py3: replace os.environ with encoding.environ (part 2 of 5)
Pulkit Goyal <7895pulkit@gmail.com>
parents: 30521
diff changeset
    45
        n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    46
        if n > 0:
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    47
            return n
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    48
    except (KeyError, ValueError):
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    49
        pass
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    50
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    51
    return 1
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    52
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    53
def _numworkers(ui):
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    54
    s = ui.config('worker', 'numcpus')
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    55
    if s:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    56
        try:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    57
            n = int(s)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    58
            if n >= 1:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    59
                return n
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    60
        except ValueError:
26587
56b2bcea2529 error: get Abort from 'error' instead of 'util'
Pierre-Yves David <pierre-yves.david@fb.com>
parents: 26568
diff changeset
    61
            raise error.Abort(_('number of cpus must be an integer'))
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    62
    return min(max(countcpus(), 4), 32)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    63
35427
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
    64
if pycompat.isposix or pycompat.iswindows:
38730
69ed2cff4277 worker: rename variable to reflect constant
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38729
diff changeset
    65
    _STARTUP_COST = 0.01
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    66
else:
38730
69ed2cff4277 worker: rename variable to reflect constant
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38729
diff changeset
    67
    _STARTUP_COST = 1e30
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    68
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    69
def worthwhile(ui, costperop, nops):
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    70
    '''try to determine whether the benefit of multiple processes can
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    71
    outweigh the cost of starting them'''
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    72
    linear = costperop * nops
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    73
    workers = _numworkers(ui)
38730
69ed2cff4277 worker: rename variable to reflect constant
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38729
diff changeset
    74
    benefit = linear - (_STARTUP_COST * workers + linear / workers)
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    75
    return benefit >= 0.15
18637
ac4dbceeb14a worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents: 18636
diff changeset
    76
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    77
def worker(ui, costperarg, func, staticargs, args):
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    78
    '''run a function, possibly in parallel in multiple worker
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    79
    processes.
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    80
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    81
    returns a progress iterator
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    82
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    83
    costperarg - cost of a single task
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    84
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    85
    func - function to run
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    86
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    87
    staticargs - arguments to pass to every invocation of the function
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    88
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    89
    args - arguments to split into chunks, to pass to individual
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    90
    workers
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    91
    '''
35431
471918fa7f46 workers: add config to enable/diable workers
Wojciech Lis <wlis@fb.com>
parents: 35428
diff changeset
    92
    enabled = ui.configbool('worker', 'enabled')
471918fa7f46 workers: add config to enable/diable workers
Wojciech Lis <wlis@fb.com>
parents: 35428
diff changeset
    93
    if enabled and worthwhile(ui, costperarg, len(args)):
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    94
        return _platformworker(ui, func, staticargs, args)
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    95
    return func(*staticargs + (args,))
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    96
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    97
def _posixworker(ui, func, staticargs, args):
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    98
    workers = _numworkers(ui)
18708
86524a70c0f6 worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents: 18707
diff changeset
    99
    oldhandler = signal.getsignal(signal.SIGINT)
86524a70c0f6 worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents: 18707
diff changeset
   100
    signal.signal(signal.SIGINT, signal.SIG_IGN)
30413
9c25a1a8c685 worker: change "pids" to a set
Jun Wu <quark@fb.com>
parents: 30412
diff changeset
   101
    pids, problem = set(), [0]
30410
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
   102
    def killworkers():
30423
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30422
diff changeset
   103
        # unregister SIGCHLD handler as all children will be killed. This
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30422
diff changeset
   104
        # function shouldn't be interrupted by another SIGCHLD; otherwise pids
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30422
diff changeset
   105
        # could be updated while iterating, which would cause inconsistency.
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30422
diff changeset
   106
        signal.signal(signal.SIGCHLD, oldchldhandler)
30410
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
   107
        # if one worker bails, there's no good reason to wait for the rest
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
   108
        for p in pids:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
   109
            try:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
   110
                os.kill(p, signal.SIGTERM)
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
   111
            except OSError as err:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
   112
                if err.errno != errno.ESRCH:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
   113
                    raise
30412
7bc25549e084 worker: allow waitforworkers to be non-blocking
Jun Wu <quark@fb.com>
parents: 30411
diff changeset
   114
    def waitforworkers(blocking=True):
30414
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   115
        for pid in pids.copy():
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   116
            p = st = 0
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   117
            while True:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   118
                try:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   119
                    p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
30422
0e6ce6313e47 worker: fix missed break on successful waitpid()
Yuya Nishihara <yuya@tcha.org>
parents: 30416
diff changeset
   120
                    break
30414
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   121
                except OSError as e:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   122
                    if e.errno == errno.EINTR:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   123
                        continue
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   124
                    elif e.errno == errno.ECHILD:
30425
03f7aa2bd0e3 worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents: 30424
diff changeset
   125
                        # child would already be reaped, but pids yet been
03f7aa2bd0e3 worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents: 30424
diff changeset
   126
                        # updated (maybe interrupted just after waitpid)
03f7aa2bd0e3 worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents: 30424
diff changeset
   127
                        pids.discard(pid)
03f7aa2bd0e3 worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents: 30424
diff changeset
   128
                        break
30414
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   129
                    else:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   130
                        raise
31063
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30639
diff changeset
   131
            if not p:
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30639
diff changeset
   132
                # skip subsequent steps, because child process should
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30639
diff changeset
   133
                # be still running in this case
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30639
diff changeset
   134
                continue
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30639
diff changeset
   135
            pids.discard(p)
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30639
diff changeset
   136
            st = _exitstatus(st)
30410
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
   137
            if st and not problem[0]:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
   138
                problem[0] = st
30415
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30414
diff changeset
   139
    def sigchldhandler(signum, frame):
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30414
diff changeset
   140
        waitforworkers(blocking=False)
30424
f2d13eb85198 worker: kill workers after all zombie processes are reaped
Yuya Nishihara <yuya@tcha.org>
parents: 30423
diff changeset
   141
        if problem[0]:
f2d13eb85198 worker: kill workers after all zombie processes are reaped
Yuya Nishihara <yuya@tcha.org>
parents: 30423
diff changeset
   142
            killworkers()
30415
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30414
diff changeset
   143
    oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
31696
9d3d56aa1a9f worker: flush ui buffers before running the worker
David Soria Parra <davidsp@fb.com>
parents: 31119
diff changeset
   144
    ui.flush()
32112
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   145
    parentpid = os.getpid()
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   146
    pipes = []
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   147
    for pargs in partition(args, workers):
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   148
        # Every worker gets its own pipe to send results on, so we don't have to
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   149
        # implement atomic writes larger than PIPE_BUF. Each forked process has
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   150
        # its own pipe's descriptors in the local variables, and the parent
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   151
        # process has the full list of pipe descriptors (and it doesn't really
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   152
        # care what order they're in).
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   153
        rfd, wfd = os.pipe()
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   154
        pipes.append((rfd, wfd))
32112
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   155
        # make sure we use os._exit in all worker code paths. otherwise the
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   156
        # worker may do some clean-ups which could cause surprises like
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   157
        # deadlock. see sshpeer.cleanup for example.
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   158
        # override error handling *before* fork. this is necessary because
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   159
        # exception (signal) may arrive after fork, before "pid =" assignment
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   160
        # completes, and other exception handler (dispatch.py) can lead to
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   161
        # unexpected code path without os._exit.
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   162
        ret = -1
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   163
        try:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   164
            pid = os.fork()
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   165
            if pid == 0:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   166
                signal.signal(signal.SIGINT, oldhandler)
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   167
                signal.signal(signal.SIGCHLD, oldchldhandler)
30521
86cd09bc13ba worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents: 30425
diff changeset
   168
32112
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   169
                def workerfunc():
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   170
                    for r, w in pipes[:-1]:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   171
                        os.close(r)
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   172
                        os.close(w)
32112
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   173
                    os.close(rfd)
38535
8c38d2948217 worker: support more return types in posix worker
Danny Hooper <hooper@google.com>
parents: 37844
diff changeset
   174
                    for result in func(*(staticargs + (pargs,))):
8c38d2948217 worker: support more return types in posix worker
Danny Hooper <hooper@google.com>
parents: 37844
diff changeset
   175
                        os.write(wfd, util.pickle.dumps(result))
32112
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   176
                    return 0
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   177
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   178
                ret = scmutil.callcatch(ui, workerfunc)
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   179
        except: # parent re-raises, child never returns
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   180
            if os.getpid() == parentpid:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   181
                raise
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   182
            exctype = sys.exc_info()[0]
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   183
            force = not issubclass(exctype, KeyboardInterrupt)
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   184
            ui.traceback(force=force)
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   185
        finally:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   186
            if os.getpid() != parentpid:
31118
a91c62752d08 worker: flush messages written by child processes before exit
Yuya Nishihara <yuya@tcha.org>
parents: 31063
diff changeset
   187
                try:
a91c62752d08 worker: flush messages written by child processes before exit
Yuya Nishihara <yuya@tcha.org>
parents: 31063
diff changeset
   188
                    ui.flush()
32112
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   189
                except: # never returns, no re-raises
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   190
                    pass
30521
86cd09bc13ba worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents: 30425
diff changeset
   191
                finally:
32112
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   192
                    os._exit(ret & 255)
30413
9c25a1a8c685 worker: change "pids" to a set
Jun Wu <quark@fb.com>
parents: 30412
diff changeset
   193
        pids.add(pid)
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   194
    selector = selectors.DefaultSelector()
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   195
    for rfd, wfd in pipes:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   196
        os.close(wfd)
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   197
        selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   198
    def cleanup():
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   199
        signal.signal(signal.SIGINT, oldhandler)
30416
c27614f2dec1 worker: stop using a separate thread waiting for children
Jun Wu <quark@fb.com>
parents: 30415
diff changeset
   200
        waitforworkers()
30415
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30414
diff changeset
   201
        signal.signal(signal.SIGCHLD, oldchldhandler)
18709
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
   202
        status = problem[0]
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
   203
        if status:
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
   204
            if status < 0:
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
   205
                os.kill(os.getpid(), -status)
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
   206
            sys.exit(status)
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   207
    try:
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   208
        openpipes = len(pipes)
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   209
        while openpipes > 0:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   210
            for key, events in selector.select():
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   211
                try:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   212
                    yield util.pickle.load(key.fileobj)
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   213
                except EOFError:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   214
                    selector.unregister(key.fileobj)
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   215
                    key.fileobj.close()
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   216
                    openpipes -= 1
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   217
                except IOError as e:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   218
                    if e.errno == errno.EINTR:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   219
                        continue
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38535
diff changeset
   220
                    raise
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   221
    except: # re-raises
18709
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
   222
        killworkers()
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   223
        cleanup()
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   224
        raise
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   225
    cleanup()
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   226
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   227
def _posixexitstatus(code):
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   228
    '''convert a posix exit status into the same form returned by
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   229
    os.spawnv
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   230
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   231
    returns None if the process was stopped instead of exiting'''
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   232
    if os.WIFEXITED(code):
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   233
        return os.WEXITSTATUS(code)
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   234
    elif os.WIFSIGNALED(code):
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   235
        return -os.WTERMSIG(code)
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   236
35427
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   237
def _windowsworker(ui, func, staticargs, args):
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   238
    class Worker(threading.Thread):
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   239
        def __init__(self, taskqueue, resultqueue, func, staticargs,
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   240
                     group=None, target=None, name=None, verbose=None):
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   241
            threading.Thread.__init__(self, group=group, target=target,
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   242
                                      name=name, verbose=verbose)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   243
            self._taskqueue = taskqueue
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   244
            self._resultqueue = resultqueue
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   245
            self._func = func
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   246
            self._staticargs = staticargs
35428
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   247
            self._interrupted = False
35432
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   248
            self.daemon = True
35428
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   249
            self.exception = None
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   250
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   251
        def interrupt(self):
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   252
            self._interrupted = True
35427
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   253
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   254
        def run(self):
35428
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   255
            try:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   256
                while not self._taskqueue.empty():
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   257
                    try:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   258
                        args = self._taskqueue.get_nowait()
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   259
                        for res in self._func(*self._staticargs + (args,)):
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   260
                            self._resultqueue.put(res)
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   261
                            # threading doesn't provide a native way to
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   262
                            # interrupt execution. handle it manually at every
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   263
                            # iteration.
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   264
                            if self._interrupted:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   265
                                return
37844
8fb9985382be pycompat: export queue module instead of symbols in module (API)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 36835
diff changeset
   266
                    except pycompat.queue.Empty:
35428
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   267
                        break
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   268
            except Exception as e:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   269
                # store the exception such that the main thread can resurface
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   270
                # it as if the func was running without workers.
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   271
                self.exception = e
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   272
                raise
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   273
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   274
    threads = []
35432
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   275
    def trykillworkers():
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   276
        # Allow up to 1 second to clean worker threads nicely
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   277
        cleanupend = time.time() + 1
35428
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   278
        for t in threads:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   279
            t.interrupt()
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   280
        for t in threads:
35432
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   281
            remainingtime = cleanupend - time.time()
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   282
            t.join(remainingtime)
35428
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35427
diff changeset
   283
            if t.is_alive():
35432
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   284
                # pass over the workers joining failure. it is more
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   285
                # important to surface the inital exception than the
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   286
                # fact that one of workers may be processing a large
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   287
                # task and does not get to handle the interruption.
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   288
                ui.warn(_("failed to kill worker threads while "
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   289
                          "handling an exception\n"))
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   290
                return
35427
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   291
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   292
    workers = _numworkers(ui)
37844
8fb9985382be pycompat: export queue module instead of symbols in module (API)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 36835
diff changeset
   293
    resultqueue = pycompat.queue.Queue()
8fb9985382be pycompat: export queue module instead of symbols in module (API)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 36835
diff changeset
   294
    taskqueue = pycompat.queue.Queue()
35427
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   295
    # partition work to more pieces than workers to minimize the chance
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   296
    # of uneven distribution of large tasks between the workers
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   297
    for pargs in partition(args, workers * 20):
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   298
        taskqueue.put(pargs)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   299
    for _i in range(workers):
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   300
        t = Worker(taskqueue, resultqueue, func, staticargs)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   301
        threads.append(t)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   302
        t.start()
35432
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   303
    try:
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   304
        while len(threads) > 0:
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   305
            while not resultqueue.empty():
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   306
                yield resultqueue.get()
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   307
            threads[0].join(0.05)
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   308
            finishedthreads = [_t for _t in threads if not _t.is_alive()]
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   309
            for t in finishedthreads:
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   310
                if t.exception is not None:
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   311
                    raise t.exception
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   312
                threads.remove(t)
35453
44fd4cfc6c0a worker: handle interrupt on windows
Wojciech Lis <wlis@fb.com>
parents: 35432
diff changeset
   313
    except (Exception, KeyboardInterrupt): # re-raises
35432
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   314
        trykillworkers()
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35431
diff changeset
   315
        raise
35427
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   316
    while not resultqueue.empty():
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   317
        yield resultqueue.get()
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   318
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   319
if pycompat.iswindows:
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   320
    _platformworker = _windowsworker
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   321
else:
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   322
    _platformworker = _posixworker
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   323
    _exitstatus = _posixexitstatus
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   324
18637
ac4dbceeb14a worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents: 18636
diff changeset
   325
def partition(lst, nslices):
28181
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   326
    '''partition a list into N slices of roughly equal size
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   327
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   328
    The current strategy takes every Nth element from the input. If
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   329
    we ever write workers that need to preserve grouping in input
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   330
    we should consider allowing callers to specify a partition strategy.
28292
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   331
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   332
    mpm is not a fan of this partitioning strategy when files are involved.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   333
    In his words:
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   334
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   335
        Single-threaded Mercurial makes a point of creating and visiting
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   336
        files in a fixed order (alphabetical). When creating files in order,
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   337
        a typical filesystem is likely to allocate them on nearby regions on
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   338
        disk. Thus, when revisiting in the same order, locality is maximized
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   339
        and various forms of OS and disk-level caching and read-ahead get a
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   340
        chance to work.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   341
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   342
        This effect can be quite significant on spinning disks. I discovered it
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   343
        circa Mercurial v0.4 when revlogs were named by hashes of filenames.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   344
        Tarring a repo and copying it to another disk effectively randomized
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   345
        the revlog ordering on disk by sorting the revlogs by hash and suddenly
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   346
        performance of my kernel checkout benchmark dropped by ~10x because the
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   347
        "working set" of sectors visited no longer fit in the drive's cache and
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   348
        the workload switched from streaming to random I/O.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   349
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   350
        What we should really be doing is have workers read filenames from a
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   351
        ordered queue. This preserves locality and also keeps any worker from
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   352
        getting more than one file out of balance.
28181
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   353
    '''
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   354
    for i in range(nslices):
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   355
        yield lst[i::nslices]