Mercurial > hg-stable
diff mercurial/worker.py @ 35436:86b8cc1f244e
worker: make windows workers daemons
The windows workers weren't daemons and were not correctly killed when ctrl-c'd from the terminal. Withi this change when the main thread is killed, all daemons get killed as well.
I also reduced the time we give to workers to cleanup nicely to not have people ctrl-c'ing when they get inpatient.
The output when threads clened up nicely:
PS C:\<dir>> hg.exe sparse --disable-profile SparseProfiles/<profile>.sparse
interrupted!
The output when threads don't clenup in 1 sec:
PS C:\<dir> hg.exe sparse --enable-profile SparseProfiles/<profile>.sparse
failed to kill worker threads while handling an exception
interrupted!
Exception in thread Thread-4 (most likely raised during interpreter shutdown):
PS C:\<dir>>
Test Plan:
Run hg command on windows (pull/update/sparse). Ctrl-C'd sparse --enable-profile command that was using threads and observed in proces explorer that all threads got killed.
ran tests on CentOS
Differential Revision: https://phab.mercurial-scm.org/D1564
author | Wojciech Lis <wlis@fb.com> |
---|---|
date | Thu, 30 Nov 2017 16:01:53 -0800 |
parents | 471918fa7f46 |
children | 44fd4cfc6c0a |
line wrap: on
line diff
--- a/mercurial/worker.py Sun Dec 17 11:26:25 2017 -0800 +++ b/mercurial/worker.py Thu Nov 30 16:01:53 2017 -0800 @@ -12,6 +12,7 @@ import signal import sys import threading +import time from .i18n import _ from . import ( @@ -216,6 +217,7 @@ self._func = func self._staticargs = staticargs self._interrupted = False + self.daemon = True self.exception = None def interrupt(self): @@ -242,16 +244,22 @@ raise threads = [] - def killworkers(): + def trykillworkers(): + # Allow up to 1 second to clean worker threads nicely + cleanupend = time.time() + 1 for t in threads: t.interrupt() for t in threads: - # try to let the threads handle interruption, but don't wait - # indefintely. the thread could be in infinite loop, handling - # a very long task or in a deadlock situation - t.join(5) + remainingtime = cleanupend - time.time() + t.join(remainingtime) if t.is_alive(): - raise error.Abort(_('failed to join worker thread')) + # pass over the workers joining failure. it is more + # important to surface the inital exception than the + # fact that one of workers may be processing a large + # task and does not get to handle the interruption. + ui.warn(_("failed to kill worker threads while " + "handling an exception\n")) + return workers = _numworkers(ui) resultqueue = util.queue() @@ -264,25 +272,19 @@ t = Worker(taskqueue, resultqueue, func, staticargs) threads.append(t) t.start() - - while len(threads) > 0: - while not resultqueue.empty(): - yield resultqueue.get() - threads[0].join(0.05) - finishedthreads = [_t for _t in threads if not _t.is_alive()] - for t in finishedthreads: - if t.exception is not None: - try: - killworkers() - except Exception: - # pass over the workers joining failure. it is more - # important to surface the inital exception than the - # fact that one of workers may be processing a large - # task and does not get to handle the interruption. - ui.warn(_("failed to kill worker threads while handling " - "an exception")) - raise t.exception - threads.remove(t) + try: + while len(threads) > 0: + while not resultqueue.empty(): + yield resultqueue.get() + threads[0].join(0.05) + finishedthreads = [_t for _t in threads if not _t.is_alive()] + for t in finishedthreads: + if t.exception is not None: + raise t.exception + threads.remove(t) + except Exception: # re-raises + trykillworkers() + raise while not resultqueue.empty(): yield resultqueue.get()