worker: make windows workers daemons
The windows workers weren't daemons and were not correctly killed when ctrl-c'd from the terminal. Withi this change when the main thread is killed, all daemons get killed as well.
I also reduced the time we give to workers to cleanup nicely to not have people ctrl-c'ing when they get inpatient.
The output when threads clened up nicely:
PS C:\<dir>> hg.exe sparse --disable-profile SparseProfiles/<profile>.sparse
interrupted!
The output when threads don't clenup in 1 sec:
PS C:\<dir> hg.exe sparse --enable-profile SparseProfiles/<profile>.sparse
failed to kill worker threads while handling an exception
interrupted!
Exception in thread Thread-4 (most likely raised during interpreter shutdown):
PS C:\<dir>>
Test Plan:
Run hg command on windows (pull/update/sparse). Ctrl-C'd sparse --enable-profile command that was using threads and observed in proces explorer that all threads got killed.
ran tests on CentOS
Differential Revision: https://phab.mercurial-scm.org/D1564
--- a/mercurial/worker.py Sun Dec 17 11:26:25 2017 -0800
+++ b/mercurial/worker.py Thu Nov 30 16:01:53 2017 -0800
@@ -12,6 +12,7 @@
import signal
import sys
import threading
+import time
from .i18n import _
from . import (
@@ -216,6 +217,7 @@
self._func = func
self._staticargs = staticargs
self._interrupted = False
+ self.daemon = True
self.exception = None
def interrupt(self):
@@ -242,16 +244,22 @@
raise
threads = []
- def killworkers():
+ def trykillworkers():
+ # Allow up to 1 second to clean worker threads nicely
+ cleanupend = time.time() + 1
for t in threads:
t.interrupt()
for t in threads:
- # try to let the threads handle interruption, but don't wait
- # indefintely. the thread could be in infinite loop, handling
- # a very long task or in a deadlock situation
- t.join(5)
+ remainingtime = cleanupend - time.time()
+ t.join(remainingtime)
if t.is_alive():
- raise error.Abort(_('failed to join worker thread'))
+ # pass over the workers joining failure. it is more
+ # important to surface the inital exception than the
+ # fact that one of workers may be processing a large
+ # task and does not get to handle the interruption.
+ ui.warn(_("failed to kill worker threads while "
+ "handling an exception\n"))
+ return
workers = _numworkers(ui)
resultqueue = util.queue()
@@ -264,25 +272,19 @@
t = Worker(taskqueue, resultqueue, func, staticargs)
threads.append(t)
t.start()
-
- while len(threads) > 0:
- while not resultqueue.empty():
- yield resultqueue.get()
- threads[0].join(0.05)
- finishedthreads = [_t for _t in threads if not _t.is_alive()]
- for t in finishedthreads:
- if t.exception is not None:
- try:
- killworkers()
- except Exception:
- # pass over the workers joining failure. it is more
- # important to surface the inital exception than the
- # fact that one of workers may be processing a large
- # task and does not get to handle the interruption.
- ui.warn(_("failed to kill worker threads while handling "
- "an exception"))
- raise t.exception
- threads.remove(t)
+ try:
+ while len(threads) > 0:
+ while not resultqueue.empty():
+ yield resultqueue.get()
+ threads[0].join(0.05)
+ finishedthreads = [_t for _t in threads if not _t.is_alive()]
+ for t in finishedthreads:
+ if t.exception is not None:
+ raise t.exception
+ threads.remove(t)
+ except Exception: # re-raises
+ trykillworkers()
+ raise
while not resultqueue.empty():
yield resultqueue.get()