worker: ability to disable thread unsafe tasks
The worker on Windows is implemented using a thread pool. If worker
tasks are not thread safe, badness can occur. In addition, if tasks
are executing CPU bound code and holding onto the GIL, there will be
non-substantial overhead in Python context switching between active
threads. This can result in significant slowdowns of tasks.
This commit teaches the code for determining whether to use a worker
to take thread safety into account. Effectively, thread unsafe tasks
don't use the thread-based worker on Windows.
Differential Revision: https://phab.mercurial-scm.org/D3962
--- a/mercurial/worker.py Tue Jul 17 16:57:27 2018 -0700
+++ b/mercurial/worker.py Wed Jul 18 09:46:45 2018 -0700
@@ -63,18 +63,27 @@
if pycompat.isposix or pycompat.iswindows:
_STARTUP_COST = 0.01
+ # The Windows worker is thread based. If tasks are CPU bound, threads
+ # in the presence of the GIL result in excessive context switching and
+ # this overhead can slow down execution.
+ _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
else:
_STARTUP_COST = 1e30
+ _DISALLOW_THREAD_UNSAFE = False
-def worthwhile(ui, costperop, nops):
+def worthwhile(ui, costperop, nops, threadsafe=True):
'''try to determine whether the benefit of multiple processes can
outweigh the cost of starting them'''
+
+ if not threadsafe and _DISALLOW_THREAD_UNSAFE:
+ return False
+
linear = costperop * nops
workers = _numworkers(ui)
benefit = linear - (_STARTUP_COST * workers + linear / workers)
return benefit >= 0.15
-def worker(ui, costperarg, func, staticargs, args):
+def worker(ui, costperarg, func, staticargs, args, threadsafe=True):
'''run a function, possibly in parallel in multiple worker
processes.
@@ -88,9 +97,13 @@
args - arguments to split into chunks, to pass to individual
workers
+
+ threadsafe - whether work items are thread safe and can be executed using
+ a thread-based worker. Should be disabled for CPU heavy tasks that don't
+ release the GIL.
'''
enabled = ui.configbool('worker', 'enabled')
- if enabled and worthwhile(ui, costperarg, len(args)):
+ if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
return _platformworker(ui, func, staticargs, args)
return func(*staticargs + (args,))
--- a/tests/test-simple-update.t Tue Jul 17 16:57:27 2018 -0700
+++ b/tests/test-simple-update.t Wed Jul 18 09:46:45 2018 -0700
@@ -65,7 +65,7 @@
$ cat <<EOF > forceworker.py
> from mercurial import extensions, worker
- > def nocost(orig, ui, costperop, nops):
+ > def nocost(orig, ui, costperop, nops, threadsafe=True):
> return worker._numworkers(ui) > 1
> def uisetup(ui):
> extensions.wrapfunction(worker, 'worthwhile', nocost)