comparison mercurial/worker.py @ 35428:71427ff1dff8

workers: handling exceptions in windows workers This adds handling of exceptions from worker threads and resurfaces them as if the function ran without workers. If any of the threads throws, the main thread kills all running threads giving them 5 sec to handle the interruption and raises the first exception received. We don't have to join threads if is_alive() is false Test Plan: Ran multiple updates/enable/disable sparse profile and things worked well Ran test on CentOS- all tests passing on @ passed here Added a forged exception into the worker code and got it properly resurfaced and the rest of workers killed: P58642088 PS C:\open\<repo>> ..\facebook-hg-rpms\build\hg\hg.exe --config extensions.fsmonitor=! sparse --enable-profile <profile> updating [==> ] 1300/39166 1m57sException in thread Thread-3: Traceback (most recent call last): File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\threading.py", line 801, in __bootstrap_inner self.run() File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 244, in run raise e Exception: Forged exception Exception in thread Thread-2: Traceback (most recent call last): File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\threading.py", line 801, in __bootstrap_inner self.run() File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 244, in run raise e Exception: Forged exception <...> Traceback (most recent call last): File "C:\open\facebook-hg-rpms\build\hg\hgexe.py", line 41, in <module> dispatch.run() File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 85, in run status = (dispatch(req) or 0) & 255 File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 173, in dispatch ret = _runcatch(req) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 324, in _runcatch return _callcatch(ui, _runcatchfunc) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 332, in _callcatch return scmutil.callcatch(ui, func) File "C:\open\facebook-hg-rpms\build\hg\mercurial\scmutil.py", line 154, in callcatch return func() File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 314, in _runcatchfunc return _dispatch(req) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 951, in _dispatch cmdpats, cmdoptions) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\remotefilelog\__init__.py", line 415, in runcommand return orig(lui, repo, *args, **kwargs) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\undo.py", line 118, in _runcommandwrapper result = orig(lui, repo, cmd, fullargs, *args) File "C:\open\facebook-hg-rpms\build\hg\hgext\journal.py", line 84, in runcommand return orig(lui, repo, cmd, fullargs, *args) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\perftweaks.py", line 268, in _tracksparseprofiles res = runcommand(lui, repo, *args) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\perftweaks.py", line 256, in _trackdirstatesizes res = runcommand(lui, repo, *args) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\copytrace.py", line 144, in _runcommand return orig(lui, repo, cmd, fullargs, ui, *args, **kwargs) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbamend\hiddenoverride.py", line 119, in runcommand result = orig(lui, repo, cmd, fullargs, *args) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 712, in runcommand ret = _runcommand(ui, options, cmd, d) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 959, in _runcommand return cmdfunc() File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 948, in <lambda> d = lambda: util.checksignature(func)(ui, *args, **strcmdopt) File "C:\open\facebook-hg-rpms\build\hg\mercurial\util.py", line 1183, in check return func(*args, **kwargs) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 860, in sparse disableprofile=disableprofile, force=force) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 949, in _config len, _refresh(ui, repo, oldstatus, oldsparsematch, force)) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 1116, in _refresh mergemod.applyupdates(repo, typeactions, repo[None], repo['.'], False) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\remotefilelog\__init__.py", line 311, in applyupdates return orig(repo, actions, wctx, mctx, overwrite, labels=labels) File "C:\open\facebook-hg-rpms\build\hg\mercurial\merge.py", line 1464, in applyupdates for i, item in prog: File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 286, in _windowsworker raise t.exception Exception: Forged exception PS C:\open\ovrsource> Differential Revision: https://phab.mercurial-scm.org/D1459
author Wojciech Lis <wlis@fb.com>
date Mon, 20 Nov 2017 10:27:41 -0800
parents 02b36e860e0b
children 471918fa7f46
comparison
equal deleted inserted replaced
35427:02b36e860e0b 35428:71427ff1dff8
212 name=name, verbose=verbose) 212 name=name, verbose=verbose)
213 self._taskqueue = taskqueue 213 self._taskqueue = taskqueue
214 self._resultqueue = resultqueue 214 self._resultqueue = resultqueue
215 self._func = func 215 self._func = func
216 self._staticargs = staticargs 216 self._staticargs = staticargs
217 self._interrupted = False
218 self.exception = None
219
220 def interrupt(self):
221 self._interrupted = True
217 222
218 def run(self): 223 def run(self):
219 while not self._taskqueue.empty(): 224 try:
220 try: 225 while not self._taskqueue.empty():
221 args = self._taskqueue.get_nowait() 226 try:
222 for res in self._func(*self._staticargs + (args,)): 227 args = self._taskqueue.get_nowait()
223 self._resultqueue.put(res) 228 for res in self._func(*self._staticargs + (args,)):
224 except util.empty: 229 self._resultqueue.put(res)
225 break 230 # threading doesn't provide a native way to
231 # interrupt execution. handle it manually at every
232 # iteration.
233 if self._interrupted:
234 return
235 except util.empty:
236 break
237 except Exception as e:
238 # store the exception such that the main thread can resurface
239 # it as if the func was running without workers.
240 self.exception = e
241 raise
242
243 threads = []
244 def killworkers():
245 for t in threads:
246 t.interrupt()
247 for t in threads:
248 # try to let the threads handle interruption, but don't wait
249 # indefintely. the thread could be in infinite loop, handling
250 # a very long task or in a deadlock situation
251 t.join(5)
252 if t.is_alive():
253 raise error.Abort(_('failed to join worker thread'))
226 254
227 workers = _numworkers(ui) 255 workers = _numworkers(ui)
228 threads = []
229 resultqueue = util.queue() 256 resultqueue = util.queue()
230 taskqueue = util.queue() 257 taskqueue = util.queue()
231 # partition work to more pieces than workers to minimize the chance 258 # partition work to more pieces than workers to minimize the chance
232 # of uneven distribution of large tasks between the workers 259 # of uneven distribution of large tasks between the workers
233 for pargs in partition(args, workers * 20): 260 for pargs in partition(args, workers * 20):
234 taskqueue.put(pargs) 261 taskqueue.put(pargs)
235 for _i in range(workers): 262 for _i in range(workers):
236 t = Worker(taskqueue, resultqueue, func, staticargs) 263 t = Worker(taskqueue, resultqueue, func, staticargs)
237 threads.append(t) 264 threads.append(t)
238 t.start() 265 t.start()
239 while any(t.is_alive() for t in threads): 266
267 while len(threads) > 0:
240 while not resultqueue.empty(): 268 while not resultqueue.empty():
241 yield resultqueue.get() 269 yield resultqueue.get()
242 t = threads[0] 270 threads[0].join(0.05)
243 t.join(0.05) 271 finishedthreads = [_t for _t in threads if not _t.is_alive()]
244 if not t.is_alive(): 272 for t in finishedthreads:
273 if t.exception is not None:
274 try:
275 killworkers()
276 except Exception:
277 # pass over the workers joining failure. it is more
278 # important to surface the inital exception than the
279 # fact that one of workers may be processing a large
280 # task and does not get to handle the interruption.
281 ui.warn(_("failed to kill worker threads while handling "
282 "an exception"))
283 raise t.exception
245 threads.remove(t) 284 threads.remove(t)
246 while not resultqueue.empty(): 285 while not resultqueue.empty():
247 yield resultqueue.get() 286 yield resultqueue.get()
248 287
249 if pycompat.iswindows: 288 if pycompat.iswindows: