workers: handling exceptions in windows workers
This adds handling of exceptions from worker threads and resurfaces them as if the function ran without workers.
If any of the threads throws, the main thread kills all running threads giving them 5 sec to handle the interruption and raises the first exception received.
We don't have to join threads if is_alive() is false
Test Plan:
Ran multiple updates/enable/disable sparse profile and things worked well
Ran test on CentOS- all tests passing on @ passed here
Added a forged exception into the worker code and got it properly resurfaced and the rest of workers killed: P
58642088
PS C:\open\<repo>> ..\facebook-hg-rpms\build\hg\hg.exe --config extensions.fsmonitor=! sparse --enable-profile <profile>
updating [==> ] 1300/39166 1m57sException in thread Thread-3:
Traceback (most recent call last):
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\threading.py", line 801, in __bootstrap_inner
self.run()
File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 244, in run
raise e
Exception: Forged exception
Exception in thread Thread-2:
Traceback (most recent call last):
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\threading.py", line 801, in __bootstrap_inner
self.run()
File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 244, in run
raise e
Exception: Forged exception
<...>
Traceback (most recent call last):
File "C:\open\facebook-hg-rpms\build\hg\hgexe.py", line 41, in <module>
dispatch.run()
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 85, in run
status = (dispatch(req) or 0) & 255
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 173, in dispatch
ret = _runcatch(req)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 324, in _runcatch
return _callcatch(ui, _runcatchfunc)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 332, in _callcatch
return scmutil.callcatch(ui, func)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\scmutil.py", line 154, in callcatch
return func()
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 314, in _runcatchfunc
return _dispatch(req)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 951, in _dispatch
cmdpats, cmdoptions)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\remotefilelog\__init__.py", line 415, in runcommand
return orig(lui, repo, *args, **kwargs)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\undo.py", line 118, in _runcommandwrapper
result = orig(lui, repo, cmd, fullargs, *args)
File "C:\open\facebook-hg-rpms\build\hg\hgext\journal.py", line 84, in runcommand
return orig(lui, repo, cmd, fullargs, *args)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\perftweaks.py", line 268, in _tracksparseprofiles
res = runcommand(lui, repo, *args)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\perftweaks.py", line 256, in _trackdirstatesizes
res = runcommand(lui, repo, *args)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\copytrace.py", line 144, in _runcommand
return orig(lui, repo, cmd, fullargs, ui, *args, **kwargs)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbamend\hiddenoverride.py", line 119, in runcommand
result = orig(lui, repo, cmd, fullargs, *args)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 712, in runcommand
ret = _runcommand(ui, options, cmd, d)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 959, in _runcommand
return cmdfunc()
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 948, in <lambda>
d = lambda: util.checksignature(func)(ui, *args, **strcmdopt)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\util.py", line 1183, in check
return func(*args, **kwargs)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 860, in sparse
disableprofile=disableprofile, force=force)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 949, in _config
len, _refresh(ui, repo, oldstatus, oldsparsematch, force))
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 1116, in _refresh
mergemod.applyupdates(repo, typeactions, repo[None], repo['.'], False)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\remotefilelog\__init__.py", line 311, in applyupdates
return orig(repo, actions, wctx, mctx, overwrite, labels=labels)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\merge.py", line 1464, in applyupdates
for i, item in prog:
File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 286, in _windowsworker
raise t.exception
Exception: Forged exception
PS C:\open\ovrsource>
Differential Revision: https://phab.mercurial-scm.org/D1459
--- a/mercurial/worker.py Mon Nov 20 10:25:29 2017 -0800
+++ b/mercurial/worker.py Mon Nov 20 10:27:41 2017 -0800
@@ -214,18 +214,45 @@
self._resultqueue = resultqueue
self._func = func
self._staticargs = staticargs
+ self._interrupted = False
+ self.exception = None
+
+ def interrupt(self):
+ self._interrupted = True
def run(self):
- while not self._taskqueue.empty():
- try:
- args = self._taskqueue.get_nowait()
- for res in self._func(*self._staticargs + (args,)):
- self._resultqueue.put(res)
- except util.empty:
- break
+ try:
+ while not self._taskqueue.empty():
+ try:
+ args = self._taskqueue.get_nowait()
+ for res in self._func(*self._staticargs + (args,)):
+ self._resultqueue.put(res)
+ # threading doesn't provide a native way to
+ # interrupt execution. handle it manually at every
+ # iteration.
+ if self._interrupted:
+ return
+ except util.empty:
+ break
+ except Exception as e:
+ # store the exception such that the main thread can resurface
+ # it as if the func was running without workers.
+ self.exception = e
+ raise
+
+ threads = []
+ def killworkers():
+ for t in threads:
+ t.interrupt()
+ for t in threads:
+ # try to let the threads handle interruption, but don't wait
+ # indefintely. the thread could be in infinite loop, handling
+ # a very long task or in a deadlock situation
+ t.join(5)
+ if t.is_alive():
+ raise error.Abort(_('failed to join worker thread'))
workers = _numworkers(ui)
- threads = []
resultqueue = util.queue()
taskqueue = util.queue()
# partition work to more pieces than workers to minimize the chance
@@ -236,12 +263,24 @@
t = Worker(taskqueue, resultqueue, func, staticargs)
threads.append(t)
t.start()
- while any(t.is_alive() for t in threads):
+
+ while len(threads) > 0:
while not resultqueue.empty():
yield resultqueue.get()
- t = threads[0]
- t.join(0.05)
- if not t.is_alive():
+ threads[0].join(0.05)
+ finishedthreads = [_t for _t in threads if not _t.is_alive()]
+ for t in finishedthreads:
+ if t.exception is not None:
+ try:
+ killworkers()
+ except Exception:
+ # pass over the workers joining failure. it is more
+ # important to surface the inital exception than the
+ # fact that one of workers may be processing a large
+ # task and does not get to handle the interruption.
+ ui.warn(_("failed to kill worker threads while handling "
+ "an exception"))
+ raise t.exception
threads.remove(t)
while not resultqueue.empty():
yield resultqueue.get()