Mercurial > hg-stable
annotate mercurial/worker.py @ 26882:c4895f9b8ab1 stable
changegroup: back code change of e7c618cee8df out
The previous changeset is a simpler way of fixing issue4934 without changing the
spirit of the code. We can remove the dual call to 'delayupdate' but we keep the
tests to show that the issue is still fixed.
author | Pierre-Yves David <pierre-yves.david@fb.com> |
---|---|
date | Fri, 06 Nov 2015 13:01:15 -0500 |
parents | 56b2bcea2529 |
children | f8efc8a3a991 |
rev | line source |
---|---|
18635
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
1 # worker.py - master-slave parallelism support |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
2 # |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
3 # Copyright 2013 Facebook, Inc. |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
4 # |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
5 # This software may be used and distributed according to the terms of the |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
6 # GNU General Public License version 2 or any later version. |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
7 |
25992
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
8 from __future__ import absolute_import |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
9 |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
10 import errno |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
11 import os |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
12 import signal |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
13 import sys |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
14 import threading |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
15 |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
16 from .i18n import _ |
26587
56b2bcea2529
error: get Abort from 'error' instead of 'util'
Pierre-Yves David <pierre-yves.david@fb.com>
parents:
26568
diff
changeset
|
17 from . import error |
18635
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
18 |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
19 def countcpus(): |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
20 '''try to count the number of CPUs on the system''' |
26568
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
21 |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
22 # posix |
18635
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
23 try: |
26568
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
24 n = int(os.sysconf('SC_NPROCESSORS_ONLN')) |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
25 if n > 0: |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
26 return n |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
27 except (AttributeError, ValueError): |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
28 pass |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
29 |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
30 # windows |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
31 try: |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
32 n = int(os.environ['NUMBER_OF_PROCESSORS']) |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
33 if n > 0: |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
34 return n |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
35 except (KeyError, ValueError): |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
36 pass |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
37 |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
38 return 1 |
18636
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
39 |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
40 def _numworkers(ui): |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
41 s = ui.config('worker', 'numcpus') |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
42 if s: |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
43 try: |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
44 n = int(s) |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
45 if n >= 1: |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
46 return n |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
47 except ValueError: |
26587
56b2bcea2529
error: get Abort from 'error' instead of 'util'
Pierre-Yves David <pierre-yves.david@fb.com>
parents:
26568
diff
changeset
|
48 raise error.Abort(_('number of cpus must be an integer')) |
18636
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
49 return min(max(countcpus(), 4), 32) |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
50 |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
51 if os.name == 'posix': |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
52 _startupcost = 0.01 |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
53 else: |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
54 _startupcost = 1e30 |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
55 |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
56 def worthwhile(ui, costperop, nops): |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
57 '''try to determine whether the benefit of multiple processes can |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
58 outweigh the cost of starting them''' |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
59 linear = costperop * nops |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
60 workers = _numworkers(ui) |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
61 benefit = linear - (_startupcost * workers + linear / workers) |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
62 return benefit >= 0.15 |
18637
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
63 |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
64 def worker(ui, costperarg, func, staticargs, args): |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
65 '''run a function, possibly in parallel in multiple worker |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
66 processes. |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
67 |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
68 returns a progress iterator |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
69 |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
70 costperarg - cost of a single task |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
71 |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
72 func - function to run |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
73 |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
74 staticargs - arguments to pass to every invocation of the function |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
75 |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
76 args - arguments to split into chunks, to pass to individual |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
77 workers |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
78 ''' |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
79 if worthwhile(ui, costperarg, len(args)): |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
80 return _platformworker(ui, func, staticargs, args) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
81 return func(*staticargs + (args,)) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
82 |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
83 def _posixworker(ui, func, staticargs, args): |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
84 rfd, wfd = os.pipe() |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
85 workers = _numworkers(ui) |
18708
86524a70c0f6
worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents:
18707
diff
changeset
|
86 oldhandler = signal.getsignal(signal.SIGINT) |
86524a70c0f6
worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents:
18707
diff
changeset
|
87 signal.signal(signal.SIGINT, signal.SIG_IGN) |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
88 pids, problem = [], [0] |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
89 for pargs in partition(args, workers): |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
90 pid = os.fork() |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
91 if pid == 0: |
18708
86524a70c0f6
worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents:
18707
diff
changeset
|
92 signal.signal(signal.SIGINT, oldhandler) |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
93 try: |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
94 os.close(rfd) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
95 for i, item in func(*(staticargs + (pargs,))): |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
96 os.write(wfd, '%d %s\n' % (i, item)) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
97 os._exit(0) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
98 except KeyboardInterrupt: |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
99 os._exit(255) |
19408
c7ec39c1a381
worker: properly report errors from worker processes (issue3982)
Matt Mackall <mpm@selenic.com>
parents:
19406
diff
changeset
|
100 # other exceptions are allowed to propagate, we rely |
c7ec39c1a381
worker: properly report errors from worker processes (issue3982)
Matt Mackall <mpm@selenic.com>
parents:
19406
diff
changeset
|
101 # on lock.py's pid checks to avoid release callbacks |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
102 pids.append(pid) |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
103 pids.reverse() |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
104 os.close(wfd) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
105 fp = os.fdopen(rfd, 'rb', 0) |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
106 def killworkers(): |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
107 # if one worker bails, there's no good reason to wait for the rest |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
108 for p in pids: |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
109 try: |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
110 os.kill(p, signal.SIGTERM) |
25660
328739ea70c3
global: mass rewrite to use modern exception syntax
Gregory Szorc <gregory.szorc@gmail.com>
parents:
22199
diff
changeset
|
111 except OSError as err: |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
112 if err.errno != errno.ESRCH: |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
113 raise |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
114 def waitforworkers(): |
22199
b3e51675f98e
cleanup: avoid _ for local unused tmp variables - that is reserved for i18n
Mads Kiilerich <madski@unity3d.com>
parents:
20034
diff
changeset
|
115 for _pid in pids: |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
116 st = _exitstatus(os.wait()[1]) |
19406
3185b347ae98
worker: check problem state correctly (issue3982)
Matt Mackall <mpm@selenic.com>
parents:
18914
diff
changeset
|
117 if st and not problem[0]: |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
118 problem[0] = st |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
119 killworkers() |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
120 t = threading.Thread(target=waitforworkers) |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
121 t.start() |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
122 def cleanup(): |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
123 signal.signal(signal.SIGINT, oldhandler) |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
124 t.join() |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
125 status = problem[0] |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
126 if status: |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
127 if status < 0: |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
128 os.kill(os.getpid(), -status) |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
129 sys.exit(status) |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
130 try: |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
131 for line in fp: |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
132 l = line.split(' ', 1) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
133 yield int(l[0]), l[1][:-1] |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
134 except: # re-raises |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
135 killworkers() |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
136 cleanup() |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
137 raise |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
138 cleanup() |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
139 |
18707
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
140 def _posixexitstatus(code): |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
141 '''convert a posix exit status into the same form returned by |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
142 os.spawnv |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
143 |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
144 returns None if the process was stopped instead of exiting''' |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
145 if os.WIFEXITED(code): |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
146 return os.WEXITSTATUS(code) |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
147 elif os.WIFSIGNALED(code): |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
148 return -os.WTERMSIG(code) |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
149 |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
150 if os.name != 'nt': |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
151 _platformworker = _posixworker |
18707
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
152 _exitstatus = _posixexitstatus |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
153 |
18637
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
154 def partition(lst, nslices): |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
155 '''partition a list into N slices of equal size''' |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
156 n = len(lst) |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
157 chunk, slop = n / nslices, n % nslices |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
158 end = 0 |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
159 for i in xrange(nslices): |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
160 start = end |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
161 end = start + chunk |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
162 if slop: |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
163 end += 1 |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
164 slop -= 1 |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
165 yield lst[start:end] |