40 except (AttributeError, ValueError): |
42 except (AttributeError, ValueError): |
41 pass |
43 pass |
42 |
44 |
43 # windows |
45 # windows |
44 try: |
46 try: |
45 n = int(encoding.environ['NUMBER_OF_PROCESSORS']) |
47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS']) |
46 if n > 0: |
48 if n > 0: |
47 return n |
49 return n |
48 except (KeyError, ValueError): |
50 except (KeyError, ValueError): |
49 pass |
51 pass |
50 |
52 |
51 return 1 |
53 return 1 |
52 |
54 |
|
55 |
53 def _numworkers(ui): |
56 def _numworkers(ui): |
54 s = ui.config('worker', 'numcpus') |
57 s = ui.config(b'worker', b'numcpus') |
55 if s: |
58 if s: |
56 try: |
59 try: |
57 n = int(s) |
60 n = int(s) |
58 if n >= 1: |
61 if n >= 1: |
59 return n |
62 return n |
60 except ValueError: |
63 except ValueError: |
61 raise error.Abort(_('number of cpus must be an integer')) |
64 raise error.Abort(_(b'number of cpus must be an integer')) |
62 return min(max(countcpus(), 4), 32) |
65 return min(max(countcpus(), 4), 32) |
|
66 |
63 |
67 |
64 if pycompat.isposix or pycompat.iswindows: |
68 if pycompat.isposix or pycompat.iswindows: |
65 _STARTUP_COST = 0.01 |
69 _STARTUP_COST = 0.01 |
66 # The Windows worker is thread based. If tasks are CPU bound, threads |
70 # The Windows worker is thread based. If tasks are CPU bound, threads |
67 # in the presence of the GIL result in excessive context switching and |
71 # in the presence of the GIL result in excessive context switching and |
69 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows |
73 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows |
70 else: |
74 else: |
71 _STARTUP_COST = 1e30 |
75 _STARTUP_COST = 1e30 |
72 _DISALLOW_THREAD_UNSAFE = False |
76 _DISALLOW_THREAD_UNSAFE = False |
73 |
77 |
|
78 |
74 def worthwhile(ui, costperop, nops, threadsafe=True): |
79 def worthwhile(ui, costperop, nops, threadsafe=True): |
75 '''try to determine whether the benefit of multiple processes can |
80 '''try to determine whether the benefit of multiple processes can |
76 outweigh the cost of starting them''' |
81 outweigh the cost of starting them''' |
77 |
82 |
78 if not threadsafe and _DISALLOW_THREAD_UNSAFE: |
83 if not threadsafe and _DISALLOW_THREAD_UNSAFE: |
81 linear = costperop * nops |
86 linear = costperop * nops |
82 workers = _numworkers(ui) |
87 workers = _numworkers(ui) |
83 benefit = linear - (_STARTUP_COST * workers + linear / workers) |
88 benefit = linear - (_STARTUP_COST * workers + linear / workers) |
84 return benefit >= 0.15 |
89 return benefit >= 0.15 |
85 |
90 |
86 def worker(ui, costperarg, func, staticargs, args, hasretval=False, |
91 |
87 threadsafe=True): |
92 def worker( |
|
93 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True |
|
94 ): |
88 '''run a function, possibly in parallel in multiple worker |
95 '''run a function, possibly in parallel in multiple worker |
89 processes. |
96 processes. |
90 |
97 |
91 returns a progress iterator |
98 returns a progress iterator |
92 |
99 |
106 |
113 |
107 threadsafe - whether work items are thread safe and can be executed using |
114 threadsafe - whether work items are thread safe and can be executed using |
108 a thread-based worker. Should be disabled for CPU heavy tasks that don't |
115 a thread-based worker. Should be disabled for CPU heavy tasks that don't |
109 release the GIL. |
116 release the GIL. |
110 ''' |
117 ''' |
111 enabled = ui.configbool('worker', 'enabled') |
118 enabled = ui.configbool(b'worker', b'enabled') |
112 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): |
119 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): |
113 return _platformworker(ui, func, staticargs, args, hasretval) |
120 return _platformworker(ui, func, staticargs, args, hasretval) |
114 return func(*staticargs + (args,)) |
121 return func(*staticargs + (args,)) |
|
122 |
115 |
123 |
116 def _posixworker(ui, func, staticargs, args, hasretval): |
124 def _posixworker(ui, func, staticargs, args, hasretval): |
117 workers = _numworkers(ui) |
125 workers = _numworkers(ui) |
118 oldhandler = signal.getsignal(signal.SIGINT) |
126 oldhandler = signal.getsignal(signal.SIGINT) |
119 signal.signal(signal.SIGINT, signal.SIG_IGN) |
127 signal.signal(signal.SIGINT, signal.SIG_IGN) |
120 pids, problem = set(), [0] |
128 pids, problem = set(), [0] |
|
129 |
121 def killworkers(): |
130 def killworkers(): |
122 # unregister SIGCHLD handler as all children will be killed. This |
131 # unregister SIGCHLD handler as all children will be killed. This |
123 # function shouldn't be interrupted by another SIGCHLD; otherwise pids |
132 # function shouldn't be interrupted by another SIGCHLD; otherwise pids |
124 # could be updated while iterating, which would cause inconsistency. |
133 # could be updated while iterating, which would cause inconsistency. |
125 signal.signal(signal.SIGCHLD, oldchldhandler) |
134 signal.signal(signal.SIGCHLD, oldchldhandler) |
128 try: |
137 try: |
129 os.kill(p, signal.SIGTERM) |
138 os.kill(p, signal.SIGTERM) |
130 except OSError as err: |
139 except OSError as err: |
131 if err.errno != errno.ESRCH: |
140 if err.errno != errno.ESRCH: |
132 raise |
141 raise |
|
142 |
133 def waitforworkers(blocking=True): |
143 def waitforworkers(blocking=True): |
134 for pid in pids.copy(): |
144 for pid in pids.copy(): |
135 p = st = 0 |
145 p = st = 0 |
136 while True: |
146 while True: |
137 try: |
147 try: |
153 continue |
163 continue |
154 pids.discard(p) |
164 pids.discard(p) |
155 st = _exitstatus(st) |
165 st = _exitstatus(st) |
156 if st and not problem[0]: |
166 if st and not problem[0]: |
157 problem[0] = st |
167 problem[0] = st |
|
168 |
158 def sigchldhandler(signum, frame): |
169 def sigchldhandler(signum, frame): |
159 waitforworkers(blocking=False) |
170 waitforworkers(blocking=False) |
160 if problem[0]: |
171 if problem[0]: |
161 killworkers() |
172 killworkers() |
|
173 |
162 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) |
174 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) |
163 ui.flush() |
175 ui.flush() |
164 parentpid = os.getpid() |
176 parentpid = os.getpid() |
165 pipes = [] |
177 pipes = [] |
166 retval = {} |
178 retval = {} |
194 for result in func(*(staticargs + (pargs,))): |
206 for result in func(*(staticargs + (pargs,))): |
195 os.write(wfd, util.pickle.dumps(result)) |
207 os.write(wfd, util.pickle.dumps(result)) |
196 return 0 |
208 return 0 |
197 |
209 |
198 ret = scmutil.callcatch(ui, workerfunc) |
210 ret = scmutil.callcatch(ui, workerfunc) |
199 except: # parent re-raises, child never returns |
211 except: # parent re-raises, child never returns |
200 if os.getpid() == parentpid: |
212 if os.getpid() == parentpid: |
201 raise |
213 raise |
202 exctype = sys.exc_info()[0] |
214 exctype = sys.exc_info()[0] |
203 force = not issubclass(exctype, KeyboardInterrupt) |
215 force = not issubclass(exctype, KeyboardInterrupt) |
204 ui.traceback(force=force) |
216 ui.traceback(force=force) |
205 finally: |
217 finally: |
206 if os.getpid() != parentpid: |
218 if os.getpid() != parentpid: |
207 try: |
219 try: |
208 ui.flush() |
220 ui.flush() |
209 except: # never returns, no re-raises |
221 except: # never returns, no re-raises |
210 pass |
222 pass |
211 finally: |
223 finally: |
212 os._exit(ret & 255) |
224 os._exit(ret & 255) |
213 pids.add(pid) |
225 pids.add(pid) |
214 selector = selectors.DefaultSelector() |
226 selector = selectors.DefaultSelector() |
215 for rfd, wfd in pipes: |
227 for rfd, wfd in pipes: |
216 os.close(wfd) |
228 os.close(wfd) |
217 selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ) |
229 selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ) |
|
230 |
218 def cleanup(): |
231 def cleanup(): |
219 signal.signal(signal.SIGINT, oldhandler) |
232 signal.signal(signal.SIGINT, oldhandler) |
220 waitforworkers() |
233 waitforworkers() |
221 signal.signal(signal.SIGCHLD, oldchldhandler) |
234 signal.signal(signal.SIGCHLD, oldchldhandler) |
222 selector.close() |
235 selector.close() |
223 return problem[0] |
236 return problem[0] |
|
237 |
224 try: |
238 try: |
225 openpipes = len(pipes) |
239 openpipes = len(pipes) |
226 while openpipes > 0: |
240 while openpipes > 0: |
227 for key, events in selector.select(): |
241 for key, events in selector.select(): |
228 try: |
242 try: |
249 os.kill(os.getpid(), -status) |
263 os.kill(os.getpid(), -status) |
250 sys.exit(status) |
264 sys.exit(status) |
251 if hasretval: |
265 if hasretval: |
252 yield True, retval |
266 yield True, retval |
253 |
267 |
|
268 |
254 def _posixexitstatus(code): |
269 def _posixexitstatus(code): |
255 '''convert a posix exit status into the same form returned by |
270 '''convert a posix exit status into the same form returned by |
256 os.spawnv |
271 os.spawnv |
257 |
272 |
258 returns None if the process was stopped instead of exiting''' |
273 returns None if the process was stopped instead of exiting''' |
259 if os.WIFEXITED(code): |
274 if os.WIFEXITED(code): |
260 return os.WEXITSTATUS(code) |
275 return os.WEXITSTATUS(code) |
261 elif os.WIFSIGNALED(code): |
276 elif os.WIFSIGNALED(code): |
262 return -os.WTERMSIG(code) |
277 return -(os.WTERMSIG(code)) |
|
278 |
263 |
279 |
264 def _windowsworker(ui, func, staticargs, args, hasretval): |
280 def _windowsworker(ui, func, staticargs, args, hasretval): |
265 class Worker(threading.Thread): |
281 class Worker(threading.Thread): |
266 def __init__(self, taskqueue, resultqueue, func, staticargs, *args, |
282 def __init__( |
267 **kwargs): |
283 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs |
|
284 ): |
268 threading.Thread.__init__(self, *args, **kwargs) |
285 threading.Thread.__init__(self, *args, **kwargs) |
269 self._taskqueue = taskqueue |
286 self._taskqueue = taskqueue |
270 self._resultqueue = resultqueue |
287 self._resultqueue = resultqueue |
271 self._func = func |
288 self._func = func |
272 self._staticargs = staticargs |
289 self._staticargs = staticargs |
309 if t.is_alive(): |
327 if t.is_alive(): |
310 # pass over the workers joining failure. it is more |
328 # pass over the workers joining failure. it is more |
311 # important to surface the inital exception than the |
329 # important to surface the inital exception than the |
312 # fact that one of workers may be processing a large |
330 # fact that one of workers may be processing a large |
313 # task and does not get to handle the interruption. |
331 # task and does not get to handle the interruption. |
314 ui.warn(_("failed to kill worker threads while " |
332 ui.warn( |
315 "handling an exception\n")) |
333 _( |
|
334 b"failed to kill worker threads while " |
|
335 b"handling an exception\n" |
|
336 ) |
|
337 ) |
316 return |
338 return |
317 |
339 |
318 workers = _numworkers(ui) |
340 workers = _numworkers(ui) |
319 resultqueue = pycompat.queue.Queue() |
341 resultqueue = pycompat.queue.Queue() |
320 taskqueue = pycompat.queue.Queue() |
342 taskqueue = pycompat.queue.Queue() |
339 finishedthreads = [_t for _t in threads if not _t.is_alive()] |
361 finishedthreads = [_t for _t in threads if not _t.is_alive()] |
340 for t in finishedthreads: |
362 for t in finishedthreads: |
341 if t.exception is not None: |
363 if t.exception is not None: |
342 raise t.exception |
364 raise t.exception |
343 threads.remove(t) |
365 threads.remove(t) |
344 except (Exception, KeyboardInterrupt): # re-raises |
366 except (Exception, KeyboardInterrupt): # re-raises |
345 trykillworkers() |
367 trykillworkers() |
346 raise |
368 raise |
347 while not resultqueue.empty(): |
369 while not resultqueue.empty(): |
348 res = resultqueue.get() |
370 res = resultqueue.get() |
349 if hasretval and res[0]: |
371 if hasretval and res[0]: |
351 else: |
373 else: |
352 yield res |
374 yield res |
353 if hasretval: |
375 if hasretval: |
354 yield True, retval |
376 yield True, retval |
355 |
377 |
|
378 |
356 if pycompat.iswindows: |
379 if pycompat.iswindows: |
357 _platformworker = _windowsworker |
380 _platformworker = _windowsworker |
358 else: |
381 else: |
359 _platformworker = _posixworker |
382 _platformworker = _posixworker |
360 _exitstatus = _posixexitstatus |
383 _exitstatus = _posixexitstatus |
|
384 |
361 |
385 |
362 def partition(lst, nslices): |
386 def partition(lst, nslices): |
363 '''partition a list into N slices of roughly equal size |
387 '''partition a list into N slices of roughly equal size |
364 |
388 |
365 The current strategy takes every Nth element from the input. If |
389 The current strategy takes every Nth element from the input. If |