changeset 49545:3556f0392808 stable

lfs: avoid closing connections when the worker doesn't fork Probably not much more than an minor optimization, but could be useful in the case of `hg verify` where missing blobs are fetched one at a time.
author Matt Harbison <matt_harbison@yahoo.com>
date Tue, 18 Oct 2022 13:56:45 -0400
parents abf471862b8e
children 5743e19bb8b0
files hgext/lfs/blobstore.py mercurial/worker.py
diffstat 2 files changed, 26 insertions(+), 6 deletions(-) [+]
line wrap: on
line diff
--- a/hgext/lfs/blobstore.py	Tue Oct 18 13:36:33 2022 -0400
+++ b/hgext/lfs/blobstore.py	Tue Oct 18 13:56:45 2022 -0400
@@ -597,7 +597,9 @@
                             continue
                         raise
 
-        # Until https multiplexing gets sorted out
+        # Until https multiplexing gets sorted out.  It's not clear if
+        # ConnectionManager.set_ready() is externally synchronized for thread
+        # safety with Windows workers.
         if self.ui.configbool(b'experimental', b'lfs.worker-enable'):
             # The POSIX workers are forks of this process, so before spinning
             # them up, close all pooled connections.  Otherwise, there's no way
@@ -608,7 +610,7 @@
             #  ready connections as in use, and roll that back after the fork?
             #  That would allow the existing pool of connections in this process
             #  to be preserved.
-            if not pycompat.iswindows:
+            def prefork():
                 for h in self.urlopener.handlers:
                     getattr(h, "close_all", lambda: None)()
 
@@ -618,6 +620,7 @@
                 transfer,
                 (),
                 sorted(objects, key=lambda o: o.get(b'oid')),
+                prefork=prefork,
             )
         else:
             oids = transfer(sorted(objects, key=lambda o: o.get(b'oid')))
--- a/mercurial/worker.py	Tue Oct 18 13:36:33 2022 -0400
+++ b/mercurial/worker.py	Tue Oct 18 13:56:45 2022 -0400
@@ -125,7 +125,14 @@
 
 
 def worker(
-    ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
+    ui,
+    costperarg,
+    func,
+    staticargs,
+    args,
+    hasretval=False,
+    threadsafe=True,
+    prefork=None,
 ):
     """run a function, possibly in parallel in multiple worker
     processes.
@@ -149,6 +156,10 @@
     threadsafe - whether work items are thread safe and can be executed using
     a thread-based worker. Should be disabled for CPU heavy tasks that don't
     release the GIL.
+
+    prefork - a parameterless Callable that is invoked prior to forking the
+    process.  fork() is only used on non-Windows platforms, but is also not
+    called on POSIX platforms if the work amount doesn't warrant a worker.
     """
     enabled = ui.configbool(b'worker', b'enabled')
     if enabled and _platformworker is _posixworker and not ismainthread():
@@ -157,11 +168,13 @@
         enabled = False
 
     if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
-        return _platformworker(ui, func, staticargs, args, hasretval)
+        return _platformworker(
+            ui, func, staticargs, args, hasretval, prefork=prefork
+        )
     return func(*staticargs + (args,))
 
 
-def _posixworker(ui, func, staticargs, args, hasretval):
+def _posixworker(ui, func, staticargs, args, hasretval, prefork=None):
     workers = _numworkers(ui)
     oldhandler = signal.getsignal(signal.SIGINT)
     signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -207,6 +220,10 @@
     parentpid = os.getpid()
     pipes = []
     retval = {}
+
+    if prefork:
+        prefork()
+
     for pargs in partition(args, min(workers, len(args))):
         # Every worker gets its own pipe to send results on, so we don't have to
         # implement atomic writes larger than PIPE_BUF. Each forked process has
@@ -316,7 +333,7 @@
         return -(os.WTERMSIG(code))
 
 
-def _windowsworker(ui, func, staticargs, args, hasretval):
+def _windowsworker(ui, func, staticargs, args, hasretval, prefork=None):
     class Worker(threading.Thread):
         def __init__(
             self, taskqueue, resultqueue, func, staticargs, *args, **kwargs