mercurial/worker.py
author Raphaël Gomès <rgomes@octobus.net>
Wed, 26 Oct 2022 12:20:23 +0200
branchstable
changeset 49540 318bdd289cf2
parent 49537 3556f0392808
child 49975 3eef8baf6b92
permissions -rw-r--r--
dirstate-v2: correct documented return values of `pack_dirstate`
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     1
# worker.py - master-slave parallelism support
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     2
#
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     3
# Copyright 2013 Facebook, Inc.
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     4
#
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     5
# This software may be used and distributed according to the terms of the
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     6
# GNU General Public License version 2 or any later version.
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     7
25992
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
     8
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
     9
import os
48961
df56e6bd37f6 py3: use pickle directly
Gregory Szorc <gregory.szorc@gmail.com>
parents: 46819
diff changeset
    10
import pickle
49288
311fcc5a65f6 thirdparty: remove Python 2-specific selectors2 copy
Manuel Jacob <me@manueljacob.de>
parents: 49283
diff changeset
    11
import selectors
25992
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    12
import signal
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    13
import sys
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
    14
import threading
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
    15
import time
25992
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    16
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    17
from .i18n import _
30406
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
    18
from . import (
30640
a150173da1c1 py3: replace os.environ with encoding.environ (part 2 of 5)
Pulkit Goyal <7895pulkit@gmail.com>
parents: 30530
diff changeset
    19
    encoding,
30406
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
    20
    error,
30644
d524c88511a7 py3: replace os.name with pycompat.osname (part 1 of 2)
Pulkit Goyal <7895pulkit@gmail.com>
parents: 30640
diff changeset
    21
    pycompat,
30530
86cd09bc13ba worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents: 30434
diff changeset
    22
    scmutil,
30406
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
    23
)
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
    24
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
    25
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
    26
def countcpus():
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
    27
    '''try to count the number of CPUs on the system'''
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    28
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    29
    # posix
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
    30
    try:
43554
9f70512ae2cf cleanup: remove pointless r-prefixes on single-quoted strings
Augie Fackler <augie@google.com>
parents: 43077
diff changeset
    31
        n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    32
        if n > 0:
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    33
            return n
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    34
    except (AttributeError, ValueError):
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    35
        pass
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    36
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    37
    # windows
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    38
    try:
43077
687b865b95ad formatting: byteify all mercurial/ and hgext/ string literals
Augie Fackler <augie@google.com>
parents: 43076
diff changeset
    39
        n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    40
        if n > 0:
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    41
            return n
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    42
    except (KeyError, ValueError):
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    43
        pass
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    44
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    45
    return 1
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    46
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
    47
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    48
def _numworkers(ui):
43077
687b865b95ad formatting: byteify all mercurial/ and hgext/ string literals
Augie Fackler <augie@google.com>
parents: 43076
diff changeset
    49
    s = ui.config(b'worker', b'numcpus')
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    50
    if s:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    51
        try:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    52
            n = int(s)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    53
            if n >= 1:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    54
                return n
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    55
        except ValueError:
43077
687b865b95ad formatting: byteify all mercurial/ and hgext/ string literals
Augie Fackler <augie@google.com>
parents: 43076
diff changeset
    56
            raise error.Abort(_(b'number of cpus must be an integer'))
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    57
    return min(max(countcpus(), 4), 32)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    58
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
    59
48992
cc0e059d2af8 worker: remove Python 2 support code
Gregory Szorc <gregory.szorc@gmail.com>
parents: 48966
diff changeset
    60
def ismainthread():
cc0e059d2af8 worker: remove Python 2 support code
Gregory Szorc <gregory.szorc@gmail.com>
parents: 48966
diff changeset
    61
    return threading.current_thread() == threading.main_thread()
44165
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    62
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
    63
49245
cdb85d0512b8 branching: fix wrong merge conflict resolution from 13dfad0f9f7a
Manuel Jacob <me@manueljacob.de>
parents: 49238
diff changeset
    64
class _blockingreader:
49252
4d42a5fb70bf worker: add docstring to _blockingreader
Manuel Jacob <me@manueljacob.de>
parents: 49251
diff changeset
    65
    """Wrap unbuffered stream such that pickle.load() works with it.
4d42a5fb70bf worker: add docstring to _blockingreader
Manuel Jacob <me@manueljacob.de>
parents: 49251
diff changeset
    66
4d42a5fb70bf worker: add docstring to _blockingreader
Manuel Jacob <me@manueljacob.de>
parents: 49251
diff changeset
    67
    pickle.load() expects that calls to read() and readinto() read as many
4d42a5fb70bf worker: add docstring to _blockingreader
Manuel Jacob <me@manueljacob.de>
parents: 49251
diff changeset
    68
    bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
4d42a5fb70bf worker: add docstring to _blockingreader
Manuel Jacob <me@manueljacob.de>
parents: 49251
diff changeset
    69
    pickle.load() raises an EOFError.
4d42a5fb70bf worker: add docstring to _blockingreader
Manuel Jacob <me@manueljacob.de>
parents: 49251
diff changeset
    70
    """
4d42a5fb70bf worker: add docstring to _blockingreader
Manuel Jacob <me@manueljacob.de>
parents: 49251
diff changeset
    71
49238
13dfad0f9f7a branching: merge stable into default
Raphaël Gomès <rgomes@octobus.net>
parents: 49037 48870
diff changeset
    72
    def __init__(self, wrapped):
13dfad0f9f7a branching: merge stable into default
Raphaël Gomès <rgomes@octobus.net>
parents: 49037 48870
diff changeset
    73
        self._wrapped = wrapped
48870
2fe4efaa59af worker: adapt _blockingreader to work around a python3.8.[0-1] bug (issue6444)
Matt Harbison <matt_harbison@yahoo.com>
parents: 46819
diff changeset
    74
49238
13dfad0f9f7a branching: merge stable into default
Raphaël Gomès <rgomes@octobus.net>
parents: 49037 48870
diff changeset
    75
    def readline(self):
13dfad0f9f7a branching: merge stable into default
Raphaël Gomès <rgomes@octobus.net>
parents: 49037 48870
diff changeset
    76
        return self._wrapped.readline()
13dfad0f9f7a branching: merge stable into default
Raphaël Gomès <rgomes@octobus.net>
parents: 49037 48870
diff changeset
    77
49254
520722523955 worker: implement _blockingreader.readinto() (issue6444)
Manuel Jacob <me@manueljacob.de>
parents: 49253
diff changeset
    78
    def readinto(self, buf):
49238
13dfad0f9f7a branching: merge stable into default
Raphaël Gomès <rgomes@octobus.net>
parents: 49037 48870
diff changeset
    79
        pos = 0
49254
520722523955 worker: implement _blockingreader.readinto() (issue6444)
Manuel Jacob <me@manueljacob.de>
parents: 49253
diff changeset
    80
        size = len(buf)
49238
13dfad0f9f7a branching: merge stable into default
Raphaël Gomès <rgomes@octobus.net>
parents: 49037 48870
diff changeset
    81
49253
4c57ce494a4e worker: stop relying on garbage collection to release memoryview
Manuel Jacob <me@manueljacob.de>
parents: 49252
diff changeset
    82
        with memoryview(buf) as view:
4c57ce494a4e worker: stop relying on garbage collection to release memoryview
Manuel Jacob <me@manueljacob.de>
parents: 49252
diff changeset
    83
            while pos < size:
4c57ce494a4e worker: stop relying on garbage collection to release memoryview
Manuel Jacob <me@manueljacob.de>
parents: 49252
diff changeset
    84
                with view[pos:] as subview:
4c57ce494a4e worker: stop relying on garbage collection to release memoryview
Manuel Jacob <me@manueljacob.de>
parents: 49252
diff changeset
    85
                    ret = self._wrapped.readinto(subview)
4c57ce494a4e worker: stop relying on garbage collection to release memoryview
Manuel Jacob <me@manueljacob.de>
parents: 49252
diff changeset
    86
                if not ret:
4c57ce494a4e worker: stop relying on garbage collection to release memoryview
Manuel Jacob <me@manueljacob.de>
parents: 49252
diff changeset
    87
                    break
4c57ce494a4e worker: stop relying on garbage collection to release memoryview
Manuel Jacob <me@manueljacob.de>
parents: 49252
diff changeset
    88
                pos += ret
49238
13dfad0f9f7a branching: merge stable into default
Raphaël Gomès <rgomes@octobus.net>
parents: 49037 48870
diff changeset
    89
49254
520722523955 worker: implement _blockingreader.readinto() (issue6444)
Manuel Jacob <me@manueljacob.de>
parents: 49253
diff changeset
    90
        return pos
520722523955 worker: implement _blockingreader.readinto() (issue6444)
Manuel Jacob <me@manueljacob.de>
parents: 49253
diff changeset
    91
520722523955 worker: implement _blockingreader.readinto() (issue6444)
Manuel Jacob <me@manueljacob.de>
parents: 49253
diff changeset
    92
    # issue multiple reads until size is fulfilled (or EOF is encountered)
520722523955 worker: implement _blockingreader.readinto() (issue6444)
Manuel Jacob <me@manueljacob.de>
parents: 49253
diff changeset
    93
    def read(self, size=-1):
520722523955 worker: implement _blockingreader.readinto() (issue6444)
Manuel Jacob <me@manueljacob.de>
parents: 49253
diff changeset
    94
        if size < 0:
520722523955 worker: implement _blockingreader.readinto() (issue6444)
Manuel Jacob <me@manueljacob.de>
parents: 49253
diff changeset
    95
            return self._wrapped.readall()
520722523955 worker: implement _blockingreader.readinto() (issue6444)
Manuel Jacob <me@manueljacob.de>
parents: 49253
diff changeset
    96
520722523955 worker: implement _blockingreader.readinto() (issue6444)
Manuel Jacob <me@manueljacob.de>
parents: 49253
diff changeset
    97
        buf = bytearray(size)
520722523955 worker: implement _blockingreader.readinto() (issue6444)
Manuel Jacob <me@manueljacob.de>
parents: 49253
diff changeset
    98
        n_read = self.readinto(buf)
520722523955 worker: implement _blockingreader.readinto() (issue6444)
Manuel Jacob <me@manueljacob.de>
parents: 49253
diff changeset
    99
        del buf[n_read:]
49238
13dfad0f9f7a branching: merge stable into default
Raphaël Gomès <rgomes@octobus.net>
parents: 49037 48870
diff changeset
   100
        return bytes(buf)
44165
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
   101
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
   102
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   103
if pycompat.isposix or pycompat.iswindows:
38730
69ed2cff4277 worker: rename variable to reflect constant
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38729
diff changeset
   104
    _STARTUP_COST = 0.01
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   105
    # The Windows worker is thread based. If tasks are CPU bound, threads
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   106
    # in the presence of the GIL result in excessive context switching and
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   107
    # this overhead can slow down execution.
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   108
    _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
   109
else:
38730
69ed2cff4277 worker: rename variable to reflect constant
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38729
diff changeset
   110
    _STARTUP_COST = 1e30
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   111
    _DISALLOW_THREAD_UNSAFE = False
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
   112
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   113
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   114
def worthwhile(ui, costperop, nops, threadsafe=True):
45957
89a2afe31e82 formating: upgrade to black 20.8b1
Augie Fackler <raf@durin42.com>
parents: 45844
diff changeset
   115
    """try to determine whether the benefit of multiple processes can
89a2afe31e82 formating: upgrade to black 20.8b1
Augie Fackler <raf@durin42.com>
parents: 45844
diff changeset
   116
    outweigh the cost of starting them"""
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   117
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   118
    if not threadsafe and _DISALLOW_THREAD_UNSAFE:
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   119
        return False
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   120
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
   121
    linear = costperop * nops
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
   122
    workers = _numworkers(ui)
38730
69ed2cff4277 worker: rename variable to reflect constant
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38729
diff changeset
   123
    benefit = linear - (_STARTUP_COST * workers + linear / workers)
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
   124
    return benefit >= 0.15
18637
ac4dbceeb14a worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents: 18636
diff changeset
   125
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   126
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   127
def worker(
49537
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   128
    ui,
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   129
    costperarg,
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   130
    func,
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   131
    staticargs,
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   132
    args,
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   133
    hasretval=False,
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   134
    threadsafe=True,
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   135
    prefork=None,
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   136
):
45957
89a2afe31e82 formating: upgrade to black 20.8b1
Augie Fackler <raf@durin42.com>
parents: 45844
diff changeset
   137
    """run a function, possibly in parallel in multiple worker
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   138
    processes.
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   139
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   140
    returns a progress iterator
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   141
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   142
    costperarg - cost of a single task
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   143
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   144
    func - function to run. It is expected to return a progress iterator.
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   145
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   146
    staticargs - arguments to pass to every invocation of the function
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   147
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   148
    args - arguments to split into chunks, to pass to individual
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   149
    workers
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   150
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   151
    hasretval - when True, func and the current function return an progress
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   152
    iterator then a dict (encoded as an iterator that yield many (False, ..)
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   153
    then a (True, dict)). The dicts are joined in some arbitrary order, so
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   154
    overlapping keys are a bad idea.
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   155
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   156
    threadsafe - whether work items are thread safe and can be executed using
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   157
    a thread-based worker. Should be disabled for CPU heavy tasks that don't
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   158
    release the GIL.
49537
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   159
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   160
    prefork - a parameterless Callable that is invoked prior to forking the
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   161
    process.  fork() is only used on non-Windows platforms, but is also not
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   162
    called on POSIX platforms if the work amount doesn't warrant a worker.
45957
89a2afe31e82 formating: upgrade to black 20.8b1
Augie Fackler <raf@durin42.com>
parents: 45844
diff changeset
   163
    """
43077
687b865b95ad formatting: byteify all mercurial/ and hgext/ string literals
Augie Fackler <augie@google.com>
parents: 43076
diff changeset
   164
    enabled = ui.configbool(b'worker', b'enabled')
46240
a42502e9ae6d worker: POSIX only supports workers from main thread (issue6460)
Joerg Sonnenberger <joerg@bec.de>
parents: 45957
diff changeset
   165
    if enabled and _platformworker is _posixworker and not ismainthread():
a42502e9ae6d worker: POSIX only supports workers from main thread (issue6460)
Joerg Sonnenberger <joerg@bec.de>
parents: 45957
diff changeset
   166
        # The POSIX worker has to install a handler for SIGCHLD.
a42502e9ae6d worker: POSIX only supports workers from main thread (issue6460)
Joerg Sonnenberger <joerg@bec.de>
parents: 45957
diff changeset
   167
        # Python up to 3.9 only allows this in the main thread.
a42502e9ae6d worker: POSIX only supports workers from main thread (issue6460)
Joerg Sonnenberger <joerg@bec.de>
parents: 45957
diff changeset
   168
        enabled = False
a42502e9ae6d worker: POSIX only supports workers from main thread (issue6460)
Joerg Sonnenberger <joerg@bec.de>
parents: 45957
diff changeset
   169
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
   170
    if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
49537
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   171
        return _platformworker(
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   172
            ui, func, staticargs, args, hasretval, prefork=prefork
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   173
        )
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   174
    return func(*staticargs + (args,))
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   175
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   176
49537
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   177
def _posixworker(ui, func, staticargs, args, hasretval, prefork=None):
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   178
    workers = _numworkers(ui)
18708
86524a70c0f6 worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents: 18707
diff changeset
   179
    oldhandler = signal.getsignal(signal.SIGINT)
86524a70c0f6 worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents: 18707
diff changeset
   180
    signal.signal(signal.SIGINT, signal.SIG_IGN)
30423
9c25a1a8c685 worker: change "pids" to a set
Jun Wu <quark@fb.com>
parents: 30422
diff changeset
   181
    pids, problem = set(), [0]
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   182
30420
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
   183
    def killworkers():
30432
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
   184
        # unregister SIGCHLD handler as all children will be killed. This
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
   185
        # function shouldn't be interrupted by another SIGCHLD; otherwise pids
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
   186
        # could be updated while iterating, which would cause inconsistency.
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
   187
        signal.signal(signal.SIGCHLD, oldchldhandler)
30420
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
   188
        # if one worker bails, there's no good reason to wait for the rest
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
   189
        for p in pids:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
   190
            try:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
   191
                os.kill(p, signal.SIGTERM)
49317
d54b213c4380 py3: catch ProcessLookupError instead of checking errno == ESRCH
Manuel Jacob <me@manueljacob.de>
parents: 49311
diff changeset
   192
            except ProcessLookupError:
d54b213c4380 py3: catch ProcessLookupError instead of checking errno == ESRCH
Manuel Jacob <me@manueljacob.de>
parents: 49311
diff changeset
   193
                pass
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   194
30422
7bc25549e084 worker: allow waitforworkers to be non-blocking
Jun Wu <quark@fb.com>
parents: 30421
diff changeset
   195
    def waitforworkers(blocking=True):
30424
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
   196
        for pid in pids.copy():
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
   197
            p = st = 0
49310
ee4537e365c8 py3: remove retry on EINTR errno
Manuel Jacob <me@manueljacob.de>
parents: 49288
diff changeset
   198
            try:
ee4537e365c8 py3: remove retry on EINTR errno
Manuel Jacob <me@manueljacob.de>
parents: 49288
diff changeset
   199
                p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
49311
dfdf85f37215 py3: catch ChildProcessError instead of checking errno == ECHILD
Manuel Jacob <me@manueljacob.de>
parents: 49310
diff changeset
   200
            except ChildProcessError:
dfdf85f37215 py3: catch ChildProcessError instead of checking errno == ECHILD
Manuel Jacob <me@manueljacob.de>
parents: 49310
diff changeset
   201
                # child would already be reaped, but pids yet been
dfdf85f37215 py3: catch ChildProcessError instead of checking errno == ECHILD
Manuel Jacob <me@manueljacob.de>
parents: 49310
diff changeset
   202
                # updated (maybe interrupted just after waitpid)
dfdf85f37215 py3: catch ChildProcessError instead of checking errno == ECHILD
Manuel Jacob <me@manueljacob.de>
parents: 49310
diff changeset
   203
                pids.discard(pid)
30878
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
   204
            if not p:
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
   205
                # skip subsequent steps, because child process should
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
   206
                # be still running in this case
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
   207
                continue
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
   208
            pids.discard(p)
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
   209
            st = _exitstatus(st)
30420
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
   210
            if st and not problem[0]:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
   211
                problem[0] = st
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   212
30425
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
   213
    def sigchldhandler(signum, frame):
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
   214
        waitforworkers(blocking=False)
30433
f2d13eb85198 worker: kill workers after all zombie processes are reaped
Yuya Nishihara <yuya@tcha.org>
parents: 30432
diff changeset
   215
        if problem[0]:
f2d13eb85198 worker: kill workers after all zombie processes are reaped
Yuya Nishihara <yuya@tcha.org>
parents: 30432
diff changeset
   216
            killworkers()
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   217
30425
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
   218
    oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
31701
9d3d56aa1a9f worker: flush ui buffers before running the worker
David Soria Parra <davidsp@fb.com>
parents: 31134
diff changeset
   219
    ui.flush()
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   220
    parentpid = os.getpid()
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   221
    pipes = []
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   222
    retval = {}
49537
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   223
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   224
    if prefork:
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   225
        prefork()
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   226
45396
26eb62bd0550 posixworker: avoid creating workers that end up getting no work
Martin von Zweigbergk <martinvonz@google.com>
parents: 44165
diff changeset
   227
    for pargs in partition(args, min(workers, len(args))):
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   228
        # Every worker gets its own pipe to send results on, so we don't have to
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   229
        # implement atomic writes larger than PIPE_BUF. Each forked process has
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   230
        # its own pipe's descriptors in the local variables, and the parent
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   231
        # process has the full list of pipe descriptors (and it doesn't really
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   232
        # care what order they're in).
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   233
        rfd, wfd = os.pipe()
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   234
        pipes.append((rfd, wfd))
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   235
        # make sure we use os._exit in all worker code paths. otherwise the
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   236
        # worker may do some clean-ups which could cause surprises like
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   237
        # deadlock. see sshpeer.cleanup for example.
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   238
        # override error handling *before* fork. this is necessary because
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   239
        # exception (signal) may arrive after fork, before "pid =" assignment
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   240
        # completes, and other exception handler (dispatch.py) can lead to
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   241
        # unexpected code path without os._exit.
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   242
        ret = -1
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   243
        try:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   244
            pid = os.fork()
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   245
            if pid == 0:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   246
                signal.signal(signal.SIGINT, oldhandler)
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   247
                signal.signal(signal.SIGCHLD, oldchldhandler)
30530
86cd09bc13ba worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents: 30434
diff changeset
   248
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   249
                def workerfunc():
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   250
                    for r, w in pipes[:-1]:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   251
                        os.close(r)
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   252
                        os.close(w)
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   253
                    os.close(rfd)
49283
395f28064826 worker: avoid potential partial write of pickled data
Manuel Jacob <me@manueljacob.de>
parents: 49254
diff changeset
   254
                    with os.fdopen(wfd, 'wb') as wf:
395f28064826 worker: avoid potential partial write of pickled data
Manuel Jacob <me@manueljacob.de>
parents: 49254
diff changeset
   255
                        for result in func(*(staticargs + (pargs,))):
395f28064826 worker: avoid potential partial write of pickled data
Manuel Jacob <me@manueljacob.de>
parents: 49254
diff changeset
   256
                            pickle.dump(result, wf)
395f28064826 worker: avoid potential partial write of pickled data
Manuel Jacob <me@manueljacob.de>
parents: 49254
diff changeset
   257
                            wf.flush()
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   258
                    return 0
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   259
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   260
                ret = scmutil.callcatch(ui, workerfunc)
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   261
        except:  # parent re-raises, child never returns
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   262
            if os.getpid() == parentpid:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   263
                raise
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   264
            exctype = sys.exc_info()[0]
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   265
            force = not issubclass(exctype, KeyboardInterrupt)
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   266
            ui.traceback(force=force)
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   267
        finally:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   268
            if os.getpid() != parentpid:
30882
a91c62752d08 worker: flush messages written by child processes before exit
Yuya Nishihara <yuya@tcha.org>
parents: 30878
diff changeset
   269
                try:
a91c62752d08 worker: flush messages written by child processes before exit
Yuya Nishihara <yuya@tcha.org>
parents: 30878
diff changeset
   270
                    ui.flush()
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   271
                except:  # never returns, no re-raises
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   272
                    pass
30530
86cd09bc13ba worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents: 30434
diff changeset
   273
                finally:
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
   274
                    os._exit(ret & 255)
30423
9c25a1a8c685 worker: change "pids" to a set
Jun Wu <quark@fb.com>
parents: 30422
diff changeset
   275
        pids.add(pid)
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   276
    selector = selectors.DefaultSelector()
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   277
    for rfd, wfd in pipes:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   278
        os.close(wfd)
49251
5d28246b9acc worker: explain why pickle reading stream has to be unbuffered
Manuel Jacob <me@manueljacob.de>
parents: 49245
diff changeset
   279
        # The stream has to be unbuffered. Otherwise, if all data is read from
5d28246b9acc worker: explain why pickle reading stream has to be unbuffered
Manuel Jacob <me@manueljacob.de>
parents: 49245
diff changeset
   280
        # the raw file into the buffer, the selector thinks that the FD is not
5d28246b9acc worker: explain why pickle reading stream has to be unbuffered
Manuel Jacob <me@manueljacob.de>
parents: 49245
diff changeset
   281
        # ready to read while pickle.load() could read from the buffer. This
5d28246b9acc worker: explain why pickle reading stream has to be unbuffered
Manuel Jacob <me@manueljacob.de>
parents: 49245
diff changeset
   282
        # would delay the processing of readable items.
44165
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
   283
        selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   284
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   285
    def cleanup():
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   286
        signal.signal(signal.SIGINT, oldhandler)
30426
c27614f2dec1 worker: stop using a separate thread waiting for children
Jun Wu <quark@fb.com>
parents: 30425
diff changeset
   287
        waitforworkers()
30425
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
   288
        signal.signal(signal.SIGCHLD, oldchldhandler)
38740
c08ea1e219c0 worker: call selector.close() to release polling resources
Yuya Nishihara <yuya@tcha.org>
parents: 38731
diff changeset
   289
        selector.close()
40472
03f7d0822ec1 worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents: 38740
diff changeset
   290
        return problem[0]
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   291
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   292
    try:
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   293
        openpipes = len(pipes)
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   294
        while openpipes > 0:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   295
            for key, events in selector.select():
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   296
                try:
49003
a0674e916fb6 worker: silence type error when calling pickle
Gregory Szorc <gregory.szorc@gmail.com>
parents: 48992
diff changeset
   297
                    # The pytype error likely goes away on a modern version of
a0674e916fb6 worker: silence type error when calling pickle
Gregory Szorc <gregory.szorc@gmail.com>
parents: 48992
diff changeset
   298
                    # pytype having a modern typeshed snapshot.
a0674e916fb6 worker: silence type error when calling pickle
Gregory Szorc <gregory.szorc@gmail.com>
parents: 48992
diff changeset
   299
                    # pytype: disable=wrong-arg-types
48961
df56e6bd37f6 py3: use pickle directly
Gregory Szorc <gregory.szorc@gmail.com>
parents: 46819
diff changeset
   300
                    res = pickle.load(_blockingreader(key.fileobj))
49003
a0674e916fb6 worker: silence type error when calling pickle
Gregory Szorc <gregory.szorc@gmail.com>
parents: 48992
diff changeset
   301
                    # pytype: enable=wrong-arg-types
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   302
                    if hasretval and res[0]:
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   303
                        retval.update(res[1])
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   304
                    else:
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   305
                        yield res
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   306
                except EOFError:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   307
                    selector.unregister(key.fileobj)
49288
311fcc5a65f6 thirdparty: remove Python 2-specific selectors2 copy
Manuel Jacob <me@manueljacob.de>
parents: 49283
diff changeset
   308
                    # pytype: disable=attribute-error
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   309
                    key.fileobj.close()
49288
311fcc5a65f6 thirdparty: remove Python 2-specific selectors2 copy
Manuel Jacob <me@manueljacob.de>
parents: 49283
diff changeset
   310
                    # pytype: enable=attribute-error
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
   311
                    openpipes -= 1
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   312
    except:  # re-raises
18709
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
   313
        killworkers()
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   314
        cleanup()
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   315
        raise
40472
03f7d0822ec1 worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents: 38740
diff changeset
   316
    status = cleanup()
03f7d0822ec1 worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents: 38740
diff changeset
   317
    if status:
03f7d0822ec1 worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents: 38740
diff changeset
   318
        if status < 0:
03f7d0822ec1 worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents: 38740
diff changeset
   319
            os.kill(os.getpid(), -status)
45844
8f07f5a9c3de worker: raise exception instead of calling sys.exit() with child's code
Martin von Zweigbergk <martinvonz@google.com>
parents: 45409
diff changeset
   320
        raise error.WorkerError(status)
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   321
    if hasretval:
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   322
        yield True, retval
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   323
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   324
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   325
def _posixexitstatus(code):
45957
89a2afe31e82 formating: upgrade to black 20.8b1
Augie Fackler <raf@durin42.com>
parents: 45844
diff changeset
   326
    """convert a posix exit status into the same form returned by
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   327
    os.spawnv
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   328
45957
89a2afe31e82 formating: upgrade to black 20.8b1
Augie Fackler <raf@durin42.com>
parents: 45844
diff changeset
   329
    returns None if the process was stopped instead of exiting"""
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   330
    if os.WIFEXITED(code):
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   331
        return os.WEXITSTATUS(code)
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   332
    elif os.WIFSIGNALED(code):
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   333
        return -(os.WTERMSIG(code))
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   334
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   335
49537
3556f0392808 lfs: avoid closing connections when the worker doesn't fork
Matt Harbison <matt_harbison@yahoo.com>
parents: 49317
diff changeset
   336
def _windowsworker(ui, func, staticargs, args, hasretval, prefork=None):
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   337
    class Worker(threading.Thread):
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   338
        def __init__(
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   339
            self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   340
        ):
40508
909c31805f54 py3: roll up threading.Thread constructor args into **kwargs
Matt Harbison <matt_harbison@yahoo.com>
parents: 38740
diff changeset
   341
            threading.Thread.__init__(self, *args, **kwargs)
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   342
            self._taskqueue = taskqueue
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   343
            self._resultqueue = resultqueue
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   344
            self._func = func
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   345
            self._staticargs = staticargs
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   346
            self._interrupted = False
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   347
            self.daemon = True
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   348
            self.exception = None
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   349
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   350
        def interrupt(self):
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   351
            self._interrupted = True
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   352
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   353
        def run(self):
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   354
            try:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   355
                while not self._taskqueue.empty():
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   356
                    try:
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   357
                        args = self._taskqueue.get_nowait()
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   358
                        for res in self._func(*self._staticargs + (args,)):
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   359
                            self._resultqueue.put(res)
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   360
                            # threading doesn't provide a native way to
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   361
                            # interrupt execution. handle it manually at every
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   362
                            # iteration.
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   363
                            if self._interrupted:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   364
                                return
37890
8fb9985382be pycompat: export queue module instead of symbols in module (API)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 36843
diff changeset
   365
                    except pycompat.queue.Empty:
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   366
                        break
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   367
            except Exception as e:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   368
                # store the exception such that the main thread can resurface
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   369
                # it as if the func was running without workers.
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   370
                self.exception = e
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   371
                raise
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   372
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   373
    threads = []
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   374
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   375
    def trykillworkers():
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   376
        # Allow up to 1 second to clean worker threads nicely
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   377
        cleanupend = time.time() + 1
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   378
        for t in threads:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   379
            t.interrupt()
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   380
        for t in threads:
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   381
            remainingtime = cleanupend - time.time()
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   382
            t.join(remainingtime)
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
   383
            if t.is_alive():
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   384
                # pass over the workers joining failure. it is more
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   385
                # important to surface the inital exception than the
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   386
                # fact that one of workers may be processing a large
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   387
                # task and does not get to handle the interruption.
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   388
                ui.warn(
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   389
                    _(
43077
687b865b95ad formatting: byteify all mercurial/ and hgext/ string literals
Augie Fackler <augie@google.com>
parents: 43076
diff changeset
   390
                        b"failed to kill worker threads while "
687b865b95ad formatting: byteify all mercurial/ and hgext/ string literals
Augie Fackler <augie@google.com>
parents: 43076
diff changeset
   391
                        b"handling an exception\n"
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   392
                    )
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   393
                )
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   394
                return
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   395
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   396
    workers = _numworkers(ui)
37890
8fb9985382be pycompat: export queue module instead of symbols in module (API)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 36843
diff changeset
   397
    resultqueue = pycompat.queue.Queue()
8fb9985382be pycompat: export queue module instead of symbols in module (API)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 36843
diff changeset
   398
    taskqueue = pycompat.queue.Queue()
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   399
    retval = {}
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   400
    # partition work to more pieces than workers to minimize the chance
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   401
    # of uneven distribution of large tasks between the workers
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   402
    for pargs in partition(args, workers * 20):
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   403
        taskqueue.put(pargs)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   404
    for _i in range(workers):
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   405
        t = Worker(taskqueue, resultqueue, func, staticargs)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   406
        threads.append(t)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   407
        t.start()
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   408
    try:
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   409
        while len(threads) > 0:
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   410
            while not resultqueue.empty():
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   411
                res = resultqueue.get()
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   412
                if hasretval and res[0]:
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   413
                    retval.update(res[1])
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   414
                else:
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   415
                    yield res
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   416
            threads[0].join(0.05)
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   417
            finishedthreads = [_t for _t in threads if not _t.is_alive()]
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   418
            for t in finishedthreads:
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   419
                if t.exception is not None:
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   420
                    raise t.exception
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   421
                threads.remove(t)
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   422
    except (Exception, KeyboardInterrupt):  # re-raises
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   423
        trykillworkers()
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
   424
        raise
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   425
    while not resultqueue.empty():
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   426
        res = resultqueue.get()
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   427
        if hasretval and res[0]:
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   428
            retval.update(res[1])
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   429
        else:
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   430
            yield res
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
   431
    if hasretval:
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
   432
        yield True, retval
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   433
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   434
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   435
if pycompat.iswindows:
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   436
    _platformworker = _windowsworker
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
   437
else:
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   438
    _platformworker = _posixworker
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   439
    _exitstatus = _posixexitstatus
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   440
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
   441
18637
ac4dbceeb14a worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents: 18636
diff changeset
   442
def partition(lst, nslices):
45957
89a2afe31e82 formating: upgrade to black 20.8b1
Augie Fackler <raf@durin42.com>
parents: 45844
diff changeset
   443
    """partition a list into N slices of roughly equal size
28181
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   444
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   445
    The current strategy takes every Nth element from the input. If
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   446
    we ever write workers that need to preserve grouping in input
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   447
    we should consider allowing callers to specify a partition strategy.
28292
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   448
46819
d4ba4d51f85f contributor: change mentions of mpm to olivia
Raphaël Gomès <rgomes@octobus.net>
parents: 46405
diff changeset
   449
    olivia is not a fan of this partitioning strategy when files are involved.
28292
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   450
    In his words:
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   451
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   452
        Single-threaded Mercurial makes a point of creating and visiting
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   453
        files in a fixed order (alphabetical). When creating files in order,
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   454
        a typical filesystem is likely to allocate them on nearby regions on
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   455
        disk. Thus, when revisiting in the same order, locality is maximized
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   456
        and various forms of OS and disk-level caching and read-ahead get a
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   457
        chance to work.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   458
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   459
        This effect can be quite significant on spinning disks. I discovered it
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   460
        circa Mercurial v0.4 when revlogs were named by hashes of filenames.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   461
        Tarring a repo and copying it to another disk effectively randomized
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   462
        the revlog ordering on disk by sorting the revlogs by hash and suddenly
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   463
        performance of my kernel checkout benchmark dropped by ~10x because the
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   464
        "working set" of sectors visited no longer fit in the drive's cache and
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   465
        the workload switched from streaming to random I/O.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   466
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   467
        What we should really be doing is have workers read filenames from a
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   468
        ordered queue. This preserves locality and also keeps any worker from
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   469
        getting more than one file out of balance.
45957
89a2afe31e82 formating: upgrade to black 20.8b1
Augie Fackler <raf@durin42.com>
parents: 45844
diff changeset
   470
    """
28181
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   471
    for i in range(nslices):
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   472
        yield lst[i::nslices]