87 if enabled and worthwhile(ui, costperarg, len(args)): |
93 if enabled and worthwhile(ui, costperarg, len(args)): |
88 return _platformworker(ui, func, staticargs, args) |
94 return _platformworker(ui, func, staticargs, args) |
89 return func(*staticargs + (args,)) |
95 return func(*staticargs + (args,)) |
90 |
96 |
91 def _posixworker(ui, func, staticargs, args): |
97 def _posixworker(ui, func, staticargs, args): |
92 rfd, wfd = os.pipe() |
|
93 workers = _numworkers(ui) |
98 workers = _numworkers(ui) |
94 oldhandler = signal.getsignal(signal.SIGINT) |
99 oldhandler = signal.getsignal(signal.SIGINT) |
95 signal.signal(signal.SIGINT, signal.SIG_IGN) |
100 signal.signal(signal.SIGINT, signal.SIG_IGN) |
96 pids, problem = set(), [0] |
101 pids, problem = set(), [0] |
97 def killworkers(): |
102 def killworkers(): |
136 if problem[0]: |
141 if problem[0]: |
137 killworkers() |
142 killworkers() |
138 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) |
143 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) |
139 ui.flush() |
144 ui.flush() |
140 parentpid = os.getpid() |
145 parentpid = os.getpid() |
|
146 pipes = [] |
141 for pargs in partition(args, workers): |
147 for pargs in partition(args, workers): |
|
148 # Every worker gets its own pipe to send results on, so we don't have to |
|
149 # implement atomic writes larger than PIPE_BUF. Each forked process has |
|
150 # its own pipe's descriptors in the local variables, and the parent |
|
151 # process has the full list of pipe descriptors (and it doesn't really |
|
152 # care what order they're in). |
|
153 rfd, wfd = os.pipe() |
|
154 pipes.append((rfd, wfd)) |
142 # make sure we use os._exit in all worker code paths. otherwise the |
155 # make sure we use os._exit in all worker code paths. otherwise the |
143 # worker may do some clean-ups which could cause surprises like |
156 # worker may do some clean-ups which could cause surprises like |
144 # deadlock. see sshpeer.cleanup for example. |
157 # deadlock. see sshpeer.cleanup for example. |
145 # override error handling *before* fork. this is necessary because |
158 # override error handling *before* fork. this is necessary because |
146 # exception (signal) may arrive after fork, before "pid =" assignment |
159 # exception (signal) may arrive after fork, before "pid =" assignment |
152 if pid == 0: |
165 if pid == 0: |
153 signal.signal(signal.SIGINT, oldhandler) |
166 signal.signal(signal.SIGINT, oldhandler) |
154 signal.signal(signal.SIGCHLD, oldchldhandler) |
167 signal.signal(signal.SIGCHLD, oldchldhandler) |
155 |
168 |
156 def workerfunc(): |
169 def workerfunc(): |
|
170 for r, w in pipes[:-1]: |
|
171 os.close(r) |
|
172 os.close(w) |
157 os.close(rfd) |
173 os.close(rfd) |
158 for result in func(*(staticargs + (pargs,))): |
174 for result in func(*(staticargs + (pargs,))): |
159 os.write(wfd, util.pickle.dumps(result)) |
175 os.write(wfd, util.pickle.dumps(result)) |
160 return 0 |
176 return 0 |
161 |
177 |
173 except: # never returns, no re-raises |
189 except: # never returns, no re-raises |
174 pass |
190 pass |
175 finally: |
191 finally: |
176 os._exit(ret & 255) |
192 os._exit(ret & 255) |
177 pids.add(pid) |
193 pids.add(pid) |
178 os.close(wfd) |
194 selector = selectors.DefaultSelector() |
179 fp = os.fdopen(rfd, r'rb', 0) |
195 for rfd, wfd in pipes: |
|
196 os.close(wfd) |
|
197 selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ) |
180 def cleanup(): |
198 def cleanup(): |
181 signal.signal(signal.SIGINT, oldhandler) |
199 signal.signal(signal.SIGINT, oldhandler) |
182 waitforworkers() |
200 waitforworkers() |
183 signal.signal(signal.SIGCHLD, oldchldhandler) |
201 signal.signal(signal.SIGCHLD, oldchldhandler) |
184 status = problem[0] |
202 status = problem[0] |
185 if status: |
203 if status: |
186 if status < 0: |
204 if status < 0: |
187 os.kill(os.getpid(), -status) |
205 os.kill(os.getpid(), -status) |
188 sys.exit(status) |
206 sys.exit(status) |
189 try: |
207 try: |
190 while True: |
208 openpipes = len(pipes) |
191 try: |
209 while openpipes > 0: |
192 yield util.pickle.load(fp) |
210 for key, events in selector.select(): |
193 except EOFError: |
211 try: |
194 break |
212 yield util.pickle.load(key.fileobj) |
195 except IOError as e: |
213 except EOFError: |
196 if e.errno == errno.EINTR: |
214 selector.unregister(key.fileobj) |
197 continue |
215 key.fileobj.close() |
198 raise |
216 openpipes -= 1 |
|
217 except IOError as e: |
|
218 if e.errno == errno.EINTR: |
|
219 continue |
|
220 raise |
199 except: # re-raises |
221 except: # re-raises |
200 killworkers() |
222 killworkers() |
201 cleanup() |
223 cleanup() |
202 raise |
224 raise |
203 cleanup() |
225 cleanup() |