comparison mercurial/worker.py @ 49545:3556f0392808 stable

lfs: avoid closing connections when the worker doesn't fork Probably not much more than an minor optimization, but could be useful in the case of `hg verify` where missing blobs are fetched one at a time.
author Matt Harbison <matt_harbison@yahoo.com>
date Tue, 18 Oct 2022 13:56:45 -0400
parents d54b213c4380
children 3eef8baf6b92
comparison
equal deleted inserted replaced
49544:abf471862b8e 49545:3556f0392808
123 benefit = linear - (_STARTUP_COST * workers + linear / workers) 123 benefit = linear - (_STARTUP_COST * workers + linear / workers)
124 return benefit >= 0.15 124 return benefit >= 0.15
125 125
126 126
127 def worker( 127 def worker(
128 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True 128 ui,
129 costperarg,
130 func,
131 staticargs,
132 args,
133 hasretval=False,
134 threadsafe=True,
135 prefork=None,
129 ): 136 ):
130 """run a function, possibly in parallel in multiple worker 137 """run a function, possibly in parallel in multiple worker
131 processes. 138 processes.
132 139
133 returns a progress iterator 140 returns a progress iterator
147 overlapping keys are a bad idea. 154 overlapping keys are a bad idea.
148 155
149 threadsafe - whether work items are thread safe and can be executed using 156 threadsafe - whether work items are thread safe and can be executed using
150 a thread-based worker. Should be disabled for CPU heavy tasks that don't 157 a thread-based worker. Should be disabled for CPU heavy tasks that don't
151 release the GIL. 158 release the GIL.
159
160 prefork - a parameterless Callable that is invoked prior to forking the
161 process. fork() is only used on non-Windows platforms, but is also not
162 called on POSIX platforms if the work amount doesn't warrant a worker.
152 """ 163 """
153 enabled = ui.configbool(b'worker', b'enabled') 164 enabled = ui.configbool(b'worker', b'enabled')
154 if enabled and _platformworker is _posixworker and not ismainthread(): 165 if enabled and _platformworker is _posixworker and not ismainthread():
155 # The POSIX worker has to install a handler for SIGCHLD. 166 # The POSIX worker has to install a handler for SIGCHLD.
156 # Python up to 3.9 only allows this in the main thread. 167 # Python up to 3.9 only allows this in the main thread.
157 enabled = False 168 enabled = False
158 169
159 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): 170 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
160 return _platformworker(ui, func, staticargs, args, hasretval) 171 return _platformworker(
172 ui, func, staticargs, args, hasretval, prefork=prefork
173 )
161 return func(*staticargs + (args,)) 174 return func(*staticargs + (args,))
162 175
163 176
164 def _posixworker(ui, func, staticargs, args, hasretval): 177 def _posixworker(ui, func, staticargs, args, hasretval, prefork=None):
165 workers = _numworkers(ui) 178 workers = _numworkers(ui)
166 oldhandler = signal.getsignal(signal.SIGINT) 179 oldhandler = signal.getsignal(signal.SIGINT)
167 signal.signal(signal.SIGINT, signal.SIG_IGN) 180 signal.signal(signal.SIGINT, signal.SIG_IGN)
168 pids, problem = set(), [0] 181 pids, problem = set(), [0]
169 182
205 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) 218 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
206 ui.flush() 219 ui.flush()
207 parentpid = os.getpid() 220 parentpid = os.getpid()
208 pipes = [] 221 pipes = []
209 retval = {} 222 retval = {}
223
224 if prefork:
225 prefork()
226
210 for pargs in partition(args, min(workers, len(args))): 227 for pargs in partition(args, min(workers, len(args))):
211 # Every worker gets its own pipe to send results on, so we don't have to 228 # Every worker gets its own pipe to send results on, so we don't have to
212 # implement atomic writes larger than PIPE_BUF. Each forked process has 229 # implement atomic writes larger than PIPE_BUF. Each forked process has
213 # its own pipe's descriptors in the local variables, and the parent 230 # its own pipe's descriptors in the local variables, and the parent
214 # process has the full list of pipe descriptors (and it doesn't really 231 # process has the full list of pipe descriptors (and it doesn't really
314 return os.WEXITSTATUS(code) 331 return os.WEXITSTATUS(code)
315 elif os.WIFSIGNALED(code): 332 elif os.WIFSIGNALED(code):
316 return -(os.WTERMSIG(code)) 333 return -(os.WTERMSIG(code))
317 334
318 335
319 def _windowsworker(ui, func, staticargs, args, hasretval): 336 def _windowsworker(ui, func, staticargs, args, hasretval, prefork=None):
320 class Worker(threading.Thread): 337 class Worker(threading.Thread):
321 def __init__( 338 def __init__(
322 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs 339 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
323 ): 340 ):
324 threading.Thread.__init__(self, *args, **kwargs) 341 threading.Thread.__init__(self, *args, **kwargs)