Mercurial > hg
annotate mercurial/worker.py @ 27197:6df3ec5bb813
shelve: widen wlock scope of shelve for consistency while processing
Before this patch, "hg shelve" of shelve extension executes/refers
below before acquisition of wlock:
- 'repo.dirstate.parents()' via 'repo[None].parents()'
- 'repo._activebookmark'
It may cause unintentional result, if another command runs parallelly
(see also issue4368).
This patch widens wlock scope of "hg shelve" of shelve extension for
consistency while processing.
author | FUJIWARA Katsunori <foozy@lares.dti.ne.jp> |
---|---|
date | Wed, 02 Dec 2015 03:12:08 +0900 |
parents | 56b2bcea2529 |
children | f8efc8a3a991 |
rev | line source |
---|---|
18635
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
1 # worker.py - master-slave parallelism support |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
2 # |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
3 # Copyright 2013 Facebook, Inc. |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
4 # |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
5 # This software may be used and distributed according to the terms of the |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
6 # GNU General Public License version 2 or any later version. |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
7 |
25992
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
8 from __future__ import absolute_import |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
9 |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
10 import errno |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
11 import os |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
12 import signal |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
13 import sys |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
14 import threading |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
15 |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
16 from .i18n import _ |
26587
56b2bcea2529
error: get Abort from 'error' instead of 'util'
Pierre-Yves David <pierre-yves.david@fb.com>
parents:
26568
diff
changeset
|
17 from . import error |
18635
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
18 |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
19 def countcpus(): |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
20 '''try to count the number of CPUs on the system''' |
26568
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
21 |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
22 # posix |
18635
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
23 try: |
26568
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
24 n = int(os.sysconf('SC_NPROCESSORS_ONLN')) |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
25 if n > 0: |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
26 return n |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
27 except (AttributeError, ValueError): |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
28 pass |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
29 |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
30 # windows |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
31 try: |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
32 n = int(os.environ['NUMBER_OF_PROCESSORS']) |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
33 if n > 0: |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
34 return n |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
35 except (KeyError, ValueError): |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
36 pass |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
37 |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
38 return 1 |
18636
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
39 |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
40 def _numworkers(ui): |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
41 s = ui.config('worker', 'numcpus') |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
42 if s: |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
43 try: |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
44 n = int(s) |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
45 if n >= 1: |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
46 return n |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
47 except ValueError: |
26587
56b2bcea2529
error: get Abort from 'error' instead of 'util'
Pierre-Yves David <pierre-yves.david@fb.com>
parents:
26568
diff
changeset
|
48 raise error.Abort(_('number of cpus must be an integer')) |
18636
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
49 return min(max(countcpus(), 4), 32) |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
50 |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
51 if os.name == 'posix': |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
52 _startupcost = 0.01 |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
53 else: |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
54 _startupcost = 1e30 |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
55 |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
56 def worthwhile(ui, costperop, nops): |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
57 '''try to determine whether the benefit of multiple processes can |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
58 outweigh the cost of starting them''' |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
59 linear = costperop * nops |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
60 workers = _numworkers(ui) |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
61 benefit = linear - (_startupcost * workers + linear / workers) |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
62 return benefit >= 0.15 |
18637
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
63 |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
64 def worker(ui, costperarg, func, staticargs, args): |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
65 '''run a function, possibly in parallel in multiple worker |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
66 processes. |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
67 |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
68 returns a progress iterator |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
69 |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
70 costperarg - cost of a single task |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
71 |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
72 func - function to run |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
73 |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
74 staticargs - arguments to pass to every invocation of the function |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
75 |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
76 args - arguments to split into chunks, to pass to individual |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
77 workers |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
78 ''' |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
79 if worthwhile(ui, costperarg, len(args)): |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
80 return _platformworker(ui, func, staticargs, args) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
81 return func(*staticargs + (args,)) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
82 |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
83 def _posixworker(ui, func, staticargs, args): |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
84 rfd, wfd = os.pipe() |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
85 workers = _numworkers(ui) |
18708
86524a70c0f6
worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents:
18707
diff
changeset
|
86 oldhandler = signal.getsignal(signal.SIGINT) |
86524a70c0f6
worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents:
18707
diff
changeset
|
87 signal.signal(signal.SIGINT, signal.SIG_IGN) |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
88 pids, problem = [], [0] |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
89 for pargs in partition(args, workers): |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
90 pid = os.fork() |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
91 if pid == 0: |
18708
86524a70c0f6
worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents:
18707
diff
changeset
|
92 signal.signal(signal.SIGINT, oldhandler) |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
93 try: |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
94 os.close(rfd) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
95 for i, item in func(*(staticargs + (pargs,))): |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
96 os.write(wfd, '%d %s\n' % (i, item)) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
97 os._exit(0) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
98 except KeyboardInterrupt: |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
99 os._exit(255) |
19408
c7ec39c1a381
worker: properly report errors from worker processes (issue3982)
Matt Mackall <mpm@selenic.com>
parents:
19406
diff
changeset
|
100 # other exceptions are allowed to propagate, we rely |
c7ec39c1a381
worker: properly report errors from worker processes (issue3982)
Matt Mackall <mpm@selenic.com>
parents:
19406
diff
changeset
|
101 # on lock.py's pid checks to avoid release callbacks |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
102 pids.append(pid) |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
103 pids.reverse() |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
104 os.close(wfd) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
105 fp = os.fdopen(rfd, 'rb', 0) |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
106 def killworkers(): |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
107 # if one worker bails, there's no good reason to wait for the rest |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
108 for p in pids: |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
109 try: |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
110 os.kill(p, signal.SIGTERM) |
25660
328739ea70c3
global: mass rewrite to use modern exception syntax
Gregory Szorc <gregory.szorc@gmail.com>
parents:
22199
diff
changeset
|
111 except OSError as err: |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
112 if err.errno != errno.ESRCH: |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
113 raise |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
114 def waitforworkers(): |
22199
b3e51675f98e
cleanup: avoid _ for local unused tmp variables - that is reserved for i18n
Mads Kiilerich <madski@unity3d.com>
parents:
20034
diff
changeset
|
115 for _pid in pids: |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
116 st = _exitstatus(os.wait()[1]) |
19406
3185b347ae98
worker: check problem state correctly (issue3982)
Matt Mackall <mpm@selenic.com>
parents:
18914
diff
changeset
|
117 if st and not problem[0]: |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
118 problem[0] = st |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
119 killworkers() |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
120 t = threading.Thread(target=waitforworkers) |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
121 t.start() |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
122 def cleanup(): |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
123 signal.signal(signal.SIGINT, oldhandler) |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
124 t.join() |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
125 status = problem[0] |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
126 if status: |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
127 if status < 0: |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
128 os.kill(os.getpid(), -status) |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
129 sys.exit(status) |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
130 try: |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
131 for line in fp: |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
132 l = line.split(' ', 1) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
133 yield int(l[0]), l[1][:-1] |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
134 except: # re-raises |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
135 killworkers() |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
136 cleanup() |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
137 raise |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
138 cleanup() |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
139 |
18707
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
140 def _posixexitstatus(code): |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
141 '''convert a posix exit status into the same form returned by |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
142 os.spawnv |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
143 |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
144 returns None if the process was stopped instead of exiting''' |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
145 if os.WIFEXITED(code): |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
146 return os.WEXITSTATUS(code) |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
147 elif os.WIFSIGNALED(code): |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
148 return -os.WTERMSIG(code) |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
149 |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
150 if os.name != 'nt': |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
151 _platformworker = _posixworker |
18707
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
152 _exitstatus = _posixexitstatus |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
153 |
18637
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
154 def partition(lst, nslices): |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
155 '''partition a list into N slices of equal size''' |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
156 n = len(lst) |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
157 chunk, slop = n / nslices, n % nslices |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
158 end = 0 |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
159 for i in xrange(nslices): |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
160 start = end |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
161 end = start + chunk |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
162 if slop: |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
163 end += 1 |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
164 slop -= 1 |
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
165 yield lst[start:end] |