Mercurial > hg
changeset 49545:3556f0392808 stable
lfs: avoid closing connections when the worker doesn't fork
Probably not much more than an minor optimization, but could be useful in the
case of `hg verify` where missing blobs are fetched one at a time.
author | Matt Harbison <matt_harbison@yahoo.com> |
---|---|
date | Tue, 18 Oct 2022 13:56:45 -0400 |
parents | abf471862b8e |
children | 5743e19bb8b0 |
files | hgext/lfs/blobstore.py mercurial/worker.py |
diffstat | 2 files changed, 26 insertions(+), 6 deletions(-) [+] |
line wrap: on
line diff
--- a/hgext/lfs/blobstore.py Tue Oct 18 13:36:33 2022 -0400 +++ b/hgext/lfs/blobstore.py Tue Oct 18 13:56:45 2022 -0400 @@ -597,7 +597,9 @@ continue raise - # Until https multiplexing gets sorted out + # Until https multiplexing gets sorted out. It's not clear if + # ConnectionManager.set_ready() is externally synchronized for thread + # safety with Windows workers. if self.ui.configbool(b'experimental', b'lfs.worker-enable'): # The POSIX workers are forks of this process, so before spinning # them up, close all pooled connections. Otherwise, there's no way @@ -608,7 +610,7 @@ # ready connections as in use, and roll that back after the fork? # That would allow the existing pool of connections in this process # to be preserved. - if not pycompat.iswindows: + def prefork(): for h in self.urlopener.handlers: getattr(h, "close_all", lambda: None)() @@ -618,6 +620,7 @@ transfer, (), sorted(objects, key=lambda o: o.get(b'oid')), + prefork=prefork, ) else: oids = transfer(sorted(objects, key=lambda o: o.get(b'oid')))
--- a/mercurial/worker.py Tue Oct 18 13:36:33 2022 -0400 +++ b/mercurial/worker.py Tue Oct 18 13:56:45 2022 -0400 @@ -125,7 +125,14 @@ def worker( - ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True + ui, + costperarg, + func, + staticargs, + args, + hasretval=False, + threadsafe=True, + prefork=None, ): """run a function, possibly in parallel in multiple worker processes. @@ -149,6 +156,10 @@ threadsafe - whether work items are thread safe and can be executed using a thread-based worker. Should be disabled for CPU heavy tasks that don't release the GIL. + + prefork - a parameterless Callable that is invoked prior to forking the + process. fork() is only used on non-Windows platforms, but is also not + called on POSIX platforms if the work amount doesn't warrant a worker. """ enabled = ui.configbool(b'worker', b'enabled') if enabled and _platformworker is _posixworker and not ismainthread(): @@ -157,11 +168,13 @@ enabled = False if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): - return _platformworker(ui, func, staticargs, args, hasretval) + return _platformworker( + ui, func, staticargs, args, hasretval, prefork=prefork + ) return func(*staticargs + (args,)) -def _posixworker(ui, func, staticargs, args, hasretval): +def _posixworker(ui, func, staticargs, args, hasretval, prefork=None): workers = _numworkers(ui) oldhandler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -207,6 +220,10 @@ parentpid = os.getpid() pipes = [] retval = {} + + if prefork: + prefork() + for pargs in partition(args, min(workers, len(args))): # Every worker gets its own pipe to send results on, so we don't have to # implement atomic writes larger than PIPE_BUF. Each forked process has @@ -316,7 +333,7 @@ return -(os.WTERMSIG(code)) -def _windowsworker(ui, func, staticargs, args, hasretval): +def _windowsworker(ui, func, staticargs, args, hasretval, prefork=None): class Worker(threading.Thread): def __init__( self, taskqueue, resultqueue, func, staticargs, *args, **kwargs