lfs: avoid closing connections when the worker doesn't fork
Probably not much more than an minor optimization, but could be useful in the
case of `hg verify` where missing blobs are fetched one at a time.
--- a/hgext/lfs/blobstore.py Tue Oct 18 13:36:33 2022 -0400
+++ b/hgext/lfs/blobstore.py Tue Oct 18 13:56:45 2022 -0400
@@ -597,7 +597,9 @@
continue
raise
- # Until https multiplexing gets sorted out
+ # Until https multiplexing gets sorted out. It's not clear if
+ # ConnectionManager.set_ready() is externally synchronized for thread
+ # safety with Windows workers.
if self.ui.configbool(b'experimental', b'lfs.worker-enable'):
# The POSIX workers are forks of this process, so before spinning
# them up, close all pooled connections. Otherwise, there's no way
@@ -608,7 +610,7 @@
# ready connections as in use, and roll that back after the fork?
# That would allow the existing pool of connections in this process
# to be preserved.
- if not pycompat.iswindows:
+ def prefork():
for h in self.urlopener.handlers:
getattr(h, "close_all", lambda: None)()
@@ -618,6 +620,7 @@
transfer,
(),
sorted(objects, key=lambda o: o.get(b'oid')),
+ prefork=prefork,
)
else:
oids = transfer(sorted(objects, key=lambda o: o.get(b'oid')))
--- a/mercurial/worker.py Tue Oct 18 13:36:33 2022 -0400
+++ b/mercurial/worker.py Tue Oct 18 13:56:45 2022 -0400
@@ -125,7 +125,14 @@
def worker(
- ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
+ ui,
+ costperarg,
+ func,
+ staticargs,
+ args,
+ hasretval=False,
+ threadsafe=True,
+ prefork=None,
):
"""run a function, possibly in parallel in multiple worker
processes.
@@ -149,6 +156,10 @@
threadsafe - whether work items are thread safe and can be executed using
a thread-based worker. Should be disabled for CPU heavy tasks that don't
release the GIL.
+
+ prefork - a parameterless Callable that is invoked prior to forking the
+ process. fork() is only used on non-Windows platforms, but is also not
+ called on POSIX platforms if the work amount doesn't warrant a worker.
"""
enabled = ui.configbool(b'worker', b'enabled')
if enabled and _platformworker is _posixworker and not ismainthread():
@@ -157,11 +168,13 @@
enabled = False
if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
- return _platformworker(ui, func, staticargs, args, hasretval)
+ return _platformworker(
+ ui, func, staticargs, args, hasretval, prefork=prefork
+ )
return func(*staticargs + (args,))
-def _posixworker(ui, func, staticargs, args, hasretval):
+def _posixworker(ui, func, staticargs, args, hasretval, prefork=None):
workers = _numworkers(ui)
oldhandler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -207,6 +220,10 @@
parentpid = os.getpid()
pipes = []
retval = {}
+
+ if prefork:
+ prefork()
+
for pargs in partition(args, min(workers, len(args))):
# Every worker gets its own pipe to send results on, so we don't have to
# implement atomic writes larger than PIPE_BUF. Each forked process has
@@ -316,7 +333,7 @@
return -(os.WTERMSIG(code))
-def _windowsworker(ui, func, staticargs, args, hasretval):
+def _windowsworker(ui, func, staticargs, args, hasretval, prefork=None):
class Worker(threading.Thread):
def __init__(
self, taskqueue, resultqueue, func, staticargs, *args, **kwargs