author | Georges Racinet <georges.racinet@octobus.net> |
Tue, 05 Feb 2019 10:28:32 +0100 | |
changeset 41716 | 977432970080 |
parent 40989 | e10adebf8176 |
child 42455 | 5ca136bbd3f6 |
permissions | -rw-r--r-- |
18635
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
1 |
# worker.py - master-slave parallelism support |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
2 |
# |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
3 |
# Copyright 2013 Facebook, Inc. |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
4 |
# |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
5 |
# This software may be used and distributed according to the terms of the |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
6 |
# GNU General Public License version 2 or any later version. |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
7 |
|
25992
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
8 |
from __future__ import absolute_import |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
9 |
|
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
10 |
import errno |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
11 |
import os |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
12 |
import signal |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
13 |
import sys |
35427
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
14 |
import threading |
35432
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
15 |
import time |
25992
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
16 |
|
38729
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
17 |
try: |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
18 |
import selectors |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
19 |
selectors.BaseSelector |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
20 |
except ImportError: |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
21 |
from .thirdparty import selectors2 as selectors |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
22 |
|
25992
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
23 |
from .i18n import _ |
30396 | 24 |
from . import ( |
30635
a150173da1c1
py3: replace os.environ with encoding.environ (part 2 of 5)
Pulkit Goyal <7895pulkit@gmail.com>
parents:
30521
diff
changeset
|
25 |
encoding, |
30396 | 26 |
error, |
30639
d524c88511a7
py3: replace os.name with pycompat.osname (part 1 of 2)
Pulkit Goyal <7895pulkit@gmail.com>
parents:
30635
diff
changeset
|
27 |
pycompat, |
30521
86cd09bc13ba
worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents:
30425
diff
changeset
|
28 |
scmutil, |
30396 | 29 |
util, |
30 |
) |
|
18635
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
31 |
|
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
32 |
def countcpus(): |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
33 |
'''try to count the number of CPUs on the system''' |
26568
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
34 |
|
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
35 |
# posix |
18635
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
36 |
try: |
32611
954489932c4f
py3: pass str in os.sysconf()
Pulkit Goyal <7895pulkit@gmail.com>
parents:
32112
diff
changeset
|
37 |
n = int(os.sysconf(r'SC_NPROCESSORS_ONLN')) |
26568
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
38 |
if n > 0: |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
39 |
return n |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
40 |
except (AttributeError, ValueError): |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
41 |
pass |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
42 |
|
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
43 |
# windows |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
44 |
try: |
30635
a150173da1c1
py3: replace os.environ with encoding.environ (part 2 of 5)
Pulkit Goyal <7895pulkit@gmail.com>
parents:
30521
diff
changeset
|
45 |
n = int(encoding.environ['NUMBER_OF_PROCESSORS']) |
26568
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
46 |
if n > 0: |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
47 |
return n |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
48 |
except (KeyError, ValueError): |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
49 |
pass |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
50 |
|
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
51 |
return 1 |
18636
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
52 |
|
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
53 |
def _numworkers(ui): |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
54 |
s = ui.config('worker', 'numcpus') |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
55 |
if s: |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
56 |
try: |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
57 |
n = int(s) |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
58 |
if n >= 1: |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
59 |
return n |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
60 |
except ValueError: |
26587
56b2bcea2529
error: get Abort from 'error' instead of 'util'
Pierre-Yves David <pierre-yves.david@fb.com>
parents:
26568
diff
changeset
|
61 |
raise error.Abort(_('number of cpus must be an integer')) |
18636
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
62 |
return min(max(countcpus(), 4), 32) |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
63 |
|
35427
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
64 |
if pycompat.isposix or pycompat.iswindows: |
38730
69ed2cff4277
worker: rename variable to reflect constant
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38729
diff
changeset
|
65 |
_STARTUP_COST = 0.01 |
38731
ef3838a47503
worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38730
diff
changeset
|
66 |
# The Windows worker is thread based. If tasks are CPU bound, threads |
ef3838a47503
worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38730
diff
changeset
|
67 |
# in the presence of the GIL result in excessive context switching and |
ef3838a47503
worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38730
diff
changeset
|
68 |
# this overhead can slow down execution. |
ef3838a47503
worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38730
diff
changeset
|
69 |
_DISALLOW_THREAD_UNSAFE = pycompat.iswindows |
18636
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
70 |
else: |
38730
69ed2cff4277
worker: rename variable to reflect constant
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38729
diff
changeset
|
71 |
_STARTUP_COST = 1e30 |
38731
ef3838a47503
worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38730
diff
changeset
|
72 |
_DISALLOW_THREAD_UNSAFE = False |
18636
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
73 |
|
38731
ef3838a47503
worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38730
diff
changeset
|
74 |
def worthwhile(ui, costperop, nops, threadsafe=True): |
18636
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
75 |
'''try to determine whether the benefit of multiple processes can |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
76 |
outweigh the cost of starting them''' |
38731
ef3838a47503
worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38730
diff
changeset
|
77 |
|
ef3838a47503
worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38730
diff
changeset
|
78 |
if not threadsafe and _DISALLOW_THREAD_UNSAFE: |
ef3838a47503
worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38730
diff
changeset
|
79 |
return False |
ef3838a47503
worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38730
diff
changeset
|
80 |
|
18636
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
81 |
linear = costperop * nops |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
82 |
workers = _numworkers(ui) |
38730
69ed2cff4277
worker: rename variable to reflect constant
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38729
diff
changeset
|
83 |
benefit = linear - (_STARTUP_COST * workers + linear / workers) |
18636
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
84 |
return benefit >= 0.15 |
18637
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
85 |
|
38731
ef3838a47503
worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38730
diff
changeset
|
86 |
def worker(ui, costperarg, func, staticargs, args, threadsafe=True): |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
87 |
'''run a function, possibly in parallel in multiple worker |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
88 |
processes. |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
89 |
|
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
90 |
returns a progress iterator |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
91 |
|
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
92 |
costperarg - cost of a single task |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
93 |
|
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
94 |
func - function to run |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
95 |
|
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
96 |
staticargs - arguments to pass to every invocation of the function |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
97 |
|
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
98 |
args - arguments to split into chunks, to pass to individual |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
99 |
workers |
38731
ef3838a47503
worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38730
diff
changeset
|
100 |
|
ef3838a47503
worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38730
diff
changeset
|
101 |
threadsafe - whether work items are thread safe and can be executed using |
ef3838a47503
worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38730
diff
changeset
|
102 |
a thread-based worker. Should be disabled for CPU heavy tasks that don't |
ef3838a47503
worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38730
diff
changeset
|
103 |
release the GIL. |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
104 |
''' |
35431
471918fa7f46
workers: add config to enable/diable workers
Wojciech Lis <wlis@fb.com>
parents:
35428
diff
changeset
|
105 |
enabled = ui.configbool('worker', 'enabled') |
38731
ef3838a47503
worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents:
38730
diff
changeset
|
106 |
if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
107 |
return _platformworker(ui, func, staticargs, args) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
108 |
return func(*staticargs + (args,)) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
109 |
|
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
110 |
def _posixworker(ui, func, staticargs, args): |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
111 |
workers = _numworkers(ui) |
18708
86524a70c0f6
worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents:
18707
diff
changeset
|
112 |
oldhandler = signal.getsignal(signal.SIGINT) |
86524a70c0f6
worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents:
18707
diff
changeset
|
113 |
signal.signal(signal.SIGINT, signal.SIG_IGN) |
30413 | 114 |
pids, problem = set(), [0] |
30410
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30396
diff
changeset
|
115 |
def killworkers(): |
30423
237b2883cbd8
worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents:
30422
diff
changeset
|
116 |
# unregister SIGCHLD handler as all children will be killed. This |
237b2883cbd8
worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents:
30422
diff
changeset
|
117 |
# function shouldn't be interrupted by another SIGCHLD; otherwise pids |
237b2883cbd8
worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents:
30422
diff
changeset
|
118 |
# could be updated while iterating, which would cause inconsistency. |
237b2883cbd8
worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents:
30422
diff
changeset
|
119 |
signal.signal(signal.SIGCHLD, oldchldhandler) |
30410
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30396
diff
changeset
|
120 |
# if one worker bails, there's no good reason to wait for the rest |
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30396
diff
changeset
|
121 |
for p in pids: |
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30396
diff
changeset
|
122 |
try: |
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30396
diff
changeset
|
123 |
os.kill(p, signal.SIGTERM) |
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30396
diff
changeset
|
124 |
except OSError as err: |
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30396
diff
changeset
|
125 |
if err.errno != errno.ESRCH: |
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30396
diff
changeset
|
126 |
raise |
30412
7bc25549e084
worker: allow waitforworkers to be non-blocking
Jun Wu <quark@fb.com>
parents:
30411
diff
changeset
|
127 |
def waitforworkers(blocking=True): |
30414
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30413
diff
changeset
|
128 |
for pid in pids.copy(): |
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30413
diff
changeset
|
129 |
p = st = 0 |
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30413
diff
changeset
|
130 |
while True: |
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30413
diff
changeset
|
131 |
try: |
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30413
diff
changeset
|
132 |
p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG)) |
30422
0e6ce6313e47
worker: fix missed break on successful waitpid()
Yuya Nishihara <yuya@tcha.org>
parents:
30416
diff
changeset
|
133 |
break |
30414
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30413
diff
changeset
|
134 |
except OSError as e: |
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30413
diff
changeset
|
135 |
if e.errno == errno.EINTR: |
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30413
diff
changeset
|
136 |
continue |
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30413
diff
changeset
|
137 |
elif e.errno == errno.ECHILD: |
30425
03f7aa2bd0e3
worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents:
30424
diff
changeset
|
138 |
# child would already be reaped, but pids yet been |
03f7aa2bd0e3
worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents:
30424
diff
changeset
|
139 |
# updated (maybe interrupted just after waitpid) |
03f7aa2bd0e3
worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents:
30424
diff
changeset
|
140 |
pids.discard(pid) |
03f7aa2bd0e3
worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents:
30424
diff
changeset
|
141 |
break |
30414
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30413
diff
changeset
|
142 |
else: |
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30413
diff
changeset
|
143 |
raise |
31063
18fb3cf572b4
worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents:
30639
diff
changeset
|
144 |
if not p: |
18fb3cf572b4
worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents:
30639
diff
changeset
|
145 |
# skip subsequent steps, because child process should |
18fb3cf572b4
worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents:
30639
diff
changeset
|
146 |
# be still running in this case |
18fb3cf572b4
worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents:
30639
diff
changeset
|
147 |
continue |
18fb3cf572b4
worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents:
30639
diff
changeset
|
148 |
pids.discard(p) |
18fb3cf572b4
worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents:
30639
diff
changeset
|
149 |
st = _exitstatus(st) |
30410
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30396
diff
changeset
|
150 |
if st and not problem[0]: |
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30396
diff
changeset
|
151 |
problem[0] = st |
30415
e8fb03cfbbde
worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents:
30414
diff
changeset
|
152 |
def sigchldhandler(signum, frame): |
e8fb03cfbbde
worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents:
30414
diff
changeset
|
153 |
waitforworkers(blocking=False) |
30424
f2d13eb85198
worker: kill workers after all zombie processes are reaped
Yuya Nishihara <yuya@tcha.org>
parents:
30423
diff
changeset
|
154 |
if problem[0]: |
f2d13eb85198
worker: kill workers after all zombie processes are reaped
Yuya Nishihara <yuya@tcha.org>
parents:
30423
diff
changeset
|
155 |
killworkers() |
30415
e8fb03cfbbde
worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents:
30414
diff
changeset
|
156 |
oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) |
31696
9d3d56aa1a9f
worker: flush ui buffers before running the worker
David Soria Parra <davidsp@fb.com>
parents:
31119
diff
changeset
|
157 |
ui.flush() |
32112
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
158 |
parentpid = os.getpid() |
38729
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
159 |
pipes = [] |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
160 |
for pargs in partition(args, workers): |
38729
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
161 |
# Every worker gets its own pipe to send results on, so we don't have to |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
162 |
# implement atomic writes larger than PIPE_BUF. Each forked process has |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
163 |
# its own pipe's descriptors in the local variables, and the parent |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
164 |
# process has the full list of pipe descriptors (and it doesn't really |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
165 |
# care what order they're in). |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
166 |
rfd, wfd = os.pipe() |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
167 |
pipes.append((rfd, wfd)) |
32112
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
168 |
# make sure we use os._exit in all worker code paths. otherwise the |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
169 |
# worker may do some clean-ups which could cause surprises like |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
170 |
# deadlock. see sshpeer.cleanup for example. |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
171 |
# override error handling *before* fork. this is necessary because |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
172 |
# exception (signal) may arrive after fork, before "pid =" assignment |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
173 |
# completes, and other exception handler (dispatch.py) can lead to |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
174 |
# unexpected code path without os._exit. |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
175 |
ret = -1 |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
176 |
try: |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
177 |
pid = os.fork() |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
178 |
if pid == 0: |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
179 |
signal.signal(signal.SIGINT, oldhandler) |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
180 |
signal.signal(signal.SIGCHLD, oldchldhandler) |
30521
86cd09bc13ba
worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents:
30425
diff
changeset
|
181 |
|
32112
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
182 |
def workerfunc(): |
38729
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
183 |
for r, w in pipes[:-1]: |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
184 |
os.close(r) |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
185 |
os.close(w) |
32112
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
186 |
os.close(rfd) |
38535
8c38d2948217
worker: support more return types in posix worker
Danny Hooper <hooper@google.com>
parents:
37844
diff
changeset
|
187 |
for result in func(*(staticargs + (pargs,))): |
8c38d2948217
worker: support more return types in posix worker
Danny Hooper <hooper@google.com>
parents:
37844
diff
changeset
|
188 |
os.write(wfd, util.pickle.dumps(result)) |
32112
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
189 |
return 0 |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
190 |
|
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
191 |
ret = scmutil.callcatch(ui, workerfunc) |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
192 |
except: # parent re-raises, child never returns |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
193 |
if os.getpid() == parentpid: |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
194 |
raise |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
195 |
exctype = sys.exc_info()[0] |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
196 |
force = not issubclass(exctype, KeyboardInterrupt) |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
197 |
ui.traceback(force=force) |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
198 |
finally: |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
199 |
if os.getpid() != parentpid: |
31118
a91c62752d08
worker: flush messages written by child processes before exit
Yuya Nishihara <yuya@tcha.org>
parents:
31063
diff
changeset
|
200 |
try: |
a91c62752d08
worker: flush messages written by child processes before exit
Yuya Nishihara <yuya@tcha.org>
parents:
31063
diff
changeset
|
201 |
ui.flush() |
32112
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
202 |
except: # never returns, no re-raises |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
203 |
pass |
30521
86cd09bc13ba
worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents:
30425
diff
changeset
|
204 |
finally: |
32112
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
205 |
os._exit(ret & 255) |
30413 | 206 |
pids.add(pid) |
38729
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
207 |
selector = selectors.DefaultSelector() |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
208 |
for rfd, wfd in pipes: |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
209 |
os.close(wfd) |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
210 |
selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ) |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
211 |
def cleanup(): |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
212 |
signal.signal(signal.SIGINT, oldhandler) |
30416
c27614f2dec1
worker: stop using a separate thread waiting for children
Jun Wu <quark@fb.com>
parents:
30415
diff
changeset
|
213 |
waitforworkers() |
30415
e8fb03cfbbde
worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents:
30414
diff
changeset
|
214 |
signal.signal(signal.SIGCHLD, oldchldhandler) |
38740
c08ea1e219c0
worker: call selector.close() to release polling resources
Yuya Nishihara <yuya@tcha.org>
parents:
38731
diff
changeset
|
215 |
selector.close() |
40988
03f7d0822ec1
worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents:
38740
diff
changeset
|
216 |
return problem[0] |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
217 |
try: |
38729
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
218 |
openpipes = len(pipes) |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
219 |
while openpipes > 0: |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
220 |
for key, events in selector.select(): |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
221 |
try: |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
222 |
yield util.pickle.load(key.fileobj) |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
223 |
except EOFError: |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
224 |
selector.unregister(key.fileobj) |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
225 |
key.fileobj.close() |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
226 |
openpipes -= 1 |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
227 |
except IOError as e: |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
228 |
if e.errno == errno.EINTR: |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
229 |
continue |
9e6afe7fca31
worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents:
38535
diff
changeset
|
230 |
raise |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
231 |
except: # re-raises |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
232 |
killworkers() |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
233 |
cleanup() |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
234 |
raise |
40988
03f7d0822ec1
worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents:
38740
diff
changeset
|
235 |
status = cleanup() |
03f7d0822ec1
worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents:
38740
diff
changeset
|
236 |
if status: |
03f7d0822ec1
worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents:
38740
diff
changeset
|
237 |
if status < 0: |
03f7d0822ec1
worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents:
38740
diff
changeset
|
238 |
os.kill(os.getpid(), -status) |
03f7d0822ec1
worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents:
38740
diff
changeset
|
239 |
sys.exit(status) |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
240 |
|
18707
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
241 |
def _posixexitstatus(code): |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
242 |
'''convert a posix exit status into the same form returned by |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
243 |
os.spawnv |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
244 |
|
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
245 |
returns None if the process was stopped instead of exiting''' |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
246 |
if os.WIFEXITED(code): |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
247 |
return os.WEXITSTATUS(code) |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
248 |
elif os.WIFSIGNALED(code): |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
249 |
return -os.WTERMSIG(code) |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
250 |
|
35427
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
251 |
def _windowsworker(ui, func, staticargs, args): |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
252 |
class Worker(threading.Thread): |
40443
909c31805f54
py3: roll up threading.Thread constructor args into **kwargs
Matt Harbison <matt_harbison@yahoo.com>
parents:
38740
diff
changeset
|
253 |
def __init__(self, taskqueue, resultqueue, func, staticargs, *args, |
909c31805f54
py3: roll up threading.Thread constructor args into **kwargs
Matt Harbison <matt_harbison@yahoo.com>
parents:
38740
diff
changeset
|
254 |
**kwargs): |
909c31805f54
py3: roll up threading.Thread constructor args into **kwargs
Matt Harbison <matt_harbison@yahoo.com>
parents:
38740
diff
changeset
|
255 |
threading.Thread.__init__(self, *args, **kwargs) |
35427
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
256 |
self._taskqueue = taskqueue |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
257 |
self._resultqueue = resultqueue |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
258 |
self._func = func |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
259 |
self._staticargs = staticargs |
35428
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
260 |
self._interrupted = False |
35432
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
261 |
self.daemon = True |
35428
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
262 |
self.exception = None |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
263 |
|
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
264 |
def interrupt(self): |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
265 |
self._interrupted = True |
35427
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
266 |
|
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
267 |
def run(self): |
35428
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
268 |
try: |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
269 |
while not self._taskqueue.empty(): |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
270 |
try: |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
271 |
args = self._taskqueue.get_nowait() |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
272 |
for res in self._func(*self._staticargs + (args,)): |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
273 |
self._resultqueue.put(res) |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
274 |
# threading doesn't provide a native way to |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
275 |
# interrupt execution. handle it manually at every |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
276 |
# iteration. |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
277 |
if self._interrupted: |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
278 |
return |
37844
8fb9985382be
pycompat: export queue module instead of symbols in module (API)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
36835
diff
changeset
|
279 |
except pycompat.queue.Empty: |
35428
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
280 |
break |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
281 |
except Exception as e: |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
282 |
# store the exception such that the main thread can resurface |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
283 |
# it as if the func was running without workers. |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
284 |
self.exception = e |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
285 |
raise |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
286 |
|
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
287 |
threads = [] |
35432
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
288 |
def trykillworkers(): |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
289 |
# Allow up to 1 second to clean worker threads nicely |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
290 |
cleanupend = time.time() + 1 |
35428
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
291 |
for t in threads: |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
292 |
t.interrupt() |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
293 |
for t in threads: |
35432
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
294 |
remainingtime = cleanupend - time.time() |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
295 |
t.join(remainingtime) |
35428
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35427
diff
changeset
|
296 |
if t.is_alive(): |
35432
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
297 |
# pass over the workers joining failure. it is more |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
298 |
# important to surface the inital exception than the |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
299 |
# fact that one of workers may be processing a large |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
300 |
# task and does not get to handle the interruption. |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
301 |
ui.warn(_("failed to kill worker threads while " |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
302 |
"handling an exception\n")) |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
303 |
return |
35427
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
304 |
|
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
305 |
workers = _numworkers(ui) |
37844
8fb9985382be
pycompat: export queue module instead of symbols in module (API)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
36835
diff
changeset
|
306 |
resultqueue = pycompat.queue.Queue() |
8fb9985382be
pycompat: export queue module instead of symbols in module (API)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
36835
diff
changeset
|
307 |
taskqueue = pycompat.queue.Queue() |
35427
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
308 |
# partition work to more pieces than workers to minimize the chance |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
309 |
# of uneven distribution of large tasks between the workers |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
310 |
for pargs in partition(args, workers * 20): |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
311 |
taskqueue.put(pargs) |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
312 |
for _i in range(workers): |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
313 |
t = Worker(taskqueue, resultqueue, func, staticargs) |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
314 |
threads.append(t) |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
315 |
t.start() |
35432
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
316 |
try: |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
317 |
while len(threads) > 0: |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
318 |
while not resultqueue.empty(): |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
319 |
yield resultqueue.get() |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
320 |
threads[0].join(0.05) |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
321 |
finishedthreads = [_t for _t in threads if not _t.is_alive()] |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
322 |
for t in finishedthreads: |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
323 |
if t.exception is not None: |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
324 |
raise t.exception |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
325 |
threads.remove(t) |
35453
44fd4cfc6c0a
worker: handle interrupt on windows
Wojciech Lis <wlis@fb.com>
parents:
35432
diff
changeset
|
326 |
except (Exception, KeyboardInterrupt): # re-raises |
35432
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
327 |
trykillworkers() |
86b8cc1f244e
worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents:
35431
diff
changeset
|
328 |
raise |
35427
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
329 |
while not resultqueue.empty(): |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
330 |
yield resultqueue.get() |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
331 |
|
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
332 |
if pycompat.iswindows: |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
333 |
_platformworker = _windowsworker |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
334 |
else: |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
335 |
_platformworker = _posixworker |
18707
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
336 |
_exitstatus = _posixexitstatus |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
337 |
|
18637
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
338 |
def partition(lst, nslices): |
28181
f8efc8a3a991
worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26587
diff
changeset
|
339 |
'''partition a list into N slices of roughly equal size |
f8efc8a3a991
worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26587
diff
changeset
|
340 |
|
f8efc8a3a991
worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26587
diff
changeset
|
341 |
The current strategy takes every Nth element from the input. If |
f8efc8a3a991
worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26587
diff
changeset
|
342 |
we ever write workers that need to preserve grouping in input |
f8efc8a3a991
worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26587
diff
changeset
|
343 |
we should consider allowing callers to specify a partition strategy. |
28292
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
344 |
|
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
345 |
mpm is not a fan of this partitioning strategy when files are involved. |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
346 |
In his words: |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
347 |
|
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
348 |
Single-threaded Mercurial makes a point of creating and visiting |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
349 |
files in a fixed order (alphabetical). When creating files in order, |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
350 |
a typical filesystem is likely to allocate them on nearby regions on |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
351 |
disk. Thus, when revisiting in the same order, locality is maximized |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
352 |
and various forms of OS and disk-level caching and read-ahead get a |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
353 |
chance to work. |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
354 |
|
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
355 |
This effect can be quite significant on spinning disks. I discovered it |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
356 |
circa Mercurial v0.4 when revlogs were named by hashes of filenames. |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
357 |
Tarring a repo and copying it to another disk effectively randomized |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
358 |
the revlog ordering on disk by sorting the revlogs by hash and suddenly |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
359 |
performance of my kernel checkout benchmark dropped by ~10x because the |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
360 |
"working set" of sectors visited no longer fit in the drive's cache and |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
361 |
the workload switched from streaming to random I/O. |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
362 |
|
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
363 |
What we should really be doing is have workers read filenames from a |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
364 |
ordered queue. This preserves locality and also keeps any worker from |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
365 |
getting more than one file out of balance. |
28181
f8efc8a3a991
worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26587
diff
changeset
|
366 |
''' |
f8efc8a3a991
worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26587
diff
changeset
|
367 |
for i in range(nslices): |
f8efc8a3a991
worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26587
diff
changeset
|
368 |
yield lst[i::nslices] |