# HG changeset patch # User Wojciech Lis # Date 1511202461 28800 # Node ID 71427ff1dff8c8af665a4c4a1605d7ffffec9a77 # Parent 02b36e860e0b893928d5f565417d55b5dd6495fc workers: handling exceptions in windows workers This adds handling of exceptions from worker threads and resurfaces them as if the function ran without workers. If any of the threads throws, the main thread kills all running threads giving them 5 sec to handle the interruption and raises the first exception received. We don't have to join threads if is_alive() is false Test Plan: Ran multiple updates/enable/disable sparse profile and things worked well Ran test on CentOS- all tests passing on @ passed here Added a forged exception into the worker code and got it properly resurfaced and the rest of workers killed: P58642088 PS C:\open\> ..\facebook-hg-rpms\build\hg\hg.exe --config extensions.fsmonitor=! sparse --enable-profile updating [==> ] 1300/39166 1m57sException in thread Thread-3: Traceback (most recent call last): File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\threading.py", line 801, in __bootstrap_inner self.run() File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 244, in run raise e Exception: Forged exception Exception in thread Thread-2: Traceback (most recent call last): File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\threading.py", line 801, in __bootstrap_inner self.run() File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 244, in run raise e Exception: Forged exception <...> Traceback (most recent call last): File "C:\open\facebook-hg-rpms\build\hg\hgexe.py", line 41, in dispatch.run() File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 85, in run status = (dispatch(req) or 0) & 255 File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 173, in dispatch ret = _runcatch(req) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 324, in _runcatch return _callcatch(ui, _runcatchfunc) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 332, in _callcatch return scmutil.callcatch(ui, func) File "C:\open\facebook-hg-rpms\build\hg\mercurial\scmutil.py", line 154, in callcatch return func() File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 314, in _runcatchfunc return _dispatch(req) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 951, in _dispatch cmdpats, cmdoptions) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\remotefilelog\__init__.py", line 415, in runcommand return orig(lui, repo, *args, **kwargs) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\undo.py", line 118, in _runcommandwrapper result = orig(lui, repo, cmd, fullargs, *args) File "C:\open\facebook-hg-rpms\build\hg\hgext\journal.py", line 84, in runcommand return orig(lui, repo, cmd, fullargs, *args) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\perftweaks.py", line 268, in _tracksparseprofiles res = runcommand(lui, repo, *args) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\perftweaks.py", line 256, in _trackdirstatesizes res = runcommand(lui, repo, *args) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\copytrace.py", line 144, in _runcommand return orig(lui, repo, cmd, fullargs, ui, *args, **kwargs) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbamend\hiddenoverride.py", line 119, in runcommand result = orig(lui, repo, cmd, fullargs, *args) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 712, in runcommand ret = _runcommand(ui, options, cmd, d) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 959, in _runcommand return cmdfunc() File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 948, in d = lambda: util.checksignature(func)(ui, *args, **strcmdopt) File "C:\open\facebook-hg-rpms\build\hg\mercurial\util.py", line 1183, in check return func(*args, **kwargs) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 860, in sparse disableprofile=disableprofile, force=force) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 949, in _config len, _refresh(ui, repo, oldstatus, oldsparsematch, force)) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 1116, in _refresh mergemod.applyupdates(repo, typeactions, repo[None], repo['.'], False) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\remotefilelog\__init__.py", line 311, in applyupdates return orig(repo, actions, wctx, mctx, overwrite, labels=labels) File "C:\open\facebook-hg-rpms\build\hg\mercurial\merge.py", line 1464, in applyupdates for i, item in prog: File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 286, in _windowsworker raise t.exception Exception: Forged exception PS C:\open\ovrsource> Differential Revision: https://phab.mercurial-scm.org/D1459 diff -r 02b36e860e0b -r 71427ff1dff8 mercurial/worker.py --- a/mercurial/worker.py Mon Nov 20 10:25:29 2017 -0800 +++ b/mercurial/worker.py Mon Nov 20 10:27:41 2017 -0800 @@ -214,18 +214,45 @@ self._resultqueue = resultqueue self._func = func self._staticargs = staticargs + self._interrupted = False + self.exception = None + + def interrupt(self): + self._interrupted = True def run(self): - while not self._taskqueue.empty(): - try: - args = self._taskqueue.get_nowait() - for res in self._func(*self._staticargs + (args,)): - self._resultqueue.put(res) - except util.empty: - break + try: + while not self._taskqueue.empty(): + try: + args = self._taskqueue.get_nowait() + for res in self._func(*self._staticargs + (args,)): + self._resultqueue.put(res) + # threading doesn't provide a native way to + # interrupt execution. handle it manually at every + # iteration. + if self._interrupted: + return + except util.empty: + break + except Exception as e: + # store the exception such that the main thread can resurface + # it as if the func was running without workers. + self.exception = e + raise + + threads = [] + def killworkers(): + for t in threads: + t.interrupt() + for t in threads: + # try to let the threads handle interruption, but don't wait + # indefintely. the thread could be in infinite loop, handling + # a very long task or in a deadlock situation + t.join(5) + if t.is_alive(): + raise error.Abort(_('failed to join worker thread')) workers = _numworkers(ui) - threads = [] resultqueue = util.queue() taskqueue = util.queue() # partition work to more pieces than workers to minimize the chance @@ -236,12 +263,24 @@ t = Worker(taskqueue, resultqueue, func, staticargs) threads.append(t) t.start() - while any(t.is_alive() for t in threads): + + while len(threads) > 0: while not resultqueue.empty(): yield resultqueue.get() - t = threads[0] - t.join(0.05) - if not t.is_alive(): + threads[0].join(0.05) + finishedthreads = [_t for _t in threads if not _t.is_alive()] + for t in finishedthreads: + if t.exception is not None: + try: + killworkers() + except Exception: + # pass over the workers joining failure. it is more + # important to surface the inital exception than the + # fact that one of workers may be processing a large + # task and does not get to handle the interruption. + ui.warn(_("failed to kill worker threads while handling " + "an exception")) + raise t.exception threads.remove(t) while not resultqueue.empty(): yield resultqueue.get()