mercurial/worker.py
changeset 43076 2372284d9457
parent 42529 d29db0a0c4eb
child 43077 687b865b95ad
equal deleted inserted replaced
43075:57875cf423c9 43076:2372284d9457
    14 import threading
    14 import threading
    15 import time
    15 import time
    16 
    16 
    17 try:
    17 try:
    18     import selectors
    18     import selectors
       
    19 
    19     selectors.BaseSelector
    20     selectors.BaseSelector
    20 except ImportError:
    21 except ImportError:
    21     from .thirdparty import selectors2 as selectors
    22     from .thirdparty import selectors2 as selectors
    22 
    23 
    23 from .i18n import _
    24 from .i18n import _
    27     pycompat,
    28     pycompat,
    28     scmutil,
    29     scmutil,
    29     util,
    30     util,
    30 )
    31 )
    31 
    32 
       
    33 
    32 def countcpus():
    34 def countcpus():
    33     '''try to count the number of CPUs on the system'''
    35     '''try to count the number of CPUs on the system'''
    34 
    36 
    35     # posix
    37     # posix
    36     try:
    38     try:
    47             return n
    49             return n
    48     except (KeyError, ValueError):
    50     except (KeyError, ValueError):
    49         pass
    51         pass
    50 
    52 
    51     return 1
    53     return 1
       
    54 
    52 
    55 
    53 def _numworkers(ui):
    56 def _numworkers(ui):
    54     s = ui.config('worker', 'numcpus')
    57     s = ui.config('worker', 'numcpus')
    55     if s:
    58     if s:
    56         try:
    59         try:
    59                 return n
    62                 return n
    60         except ValueError:
    63         except ValueError:
    61             raise error.Abort(_('number of cpus must be an integer'))
    64             raise error.Abort(_('number of cpus must be an integer'))
    62     return min(max(countcpus(), 4), 32)
    65     return min(max(countcpus(), 4), 32)
    63 
    66 
       
    67 
    64 if pycompat.isposix or pycompat.iswindows:
    68 if pycompat.isposix or pycompat.iswindows:
    65     _STARTUP_COST = 0.01
    69     _STARTUP_COST = 0.01
    66     # The Windows worker is thread based. If tasks are CPU bound, threads
    70     # The Windows worker is thread based. If tasks are CPU bound, threads
    67     # in the presence of the GIL result in excessive context switching and
    71     # in the presence of the GIL result in excessive context switching and
    68     # this overhead can slow down execution.
    72     # this overhead can slow down execution.
    69     _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
    73     _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
    70 else:
    74 else:
    71     _STARTUP_COST = 1e30
    75     _STARTUP_COST = 1e30
    72     _DISALLOW_THREAD_UNSAFE = False
    76     _DISALLOW_THREAD_UNSAFE = False
    73 
    77 
       
    78 
    74 def worthwhile(ui, costperop, nops, threadsafe=True):
    79 def worthwhile(ui, costperop, nops, threadsafe=True):
    75     '''try to determine whether the benefit of multiple processes can
    80     '''try to determine whether the benefit of multiple processes can
    76     outweigh the cost of starting them'''
    81     outweigh the cost of starting them'''
    77 
    82 
    78     if not threadsafe and _DISALLOW_THREAD_UNSAFE:
    83     if not threadsafe and _DISALLOW_THREAD_UNSAFE:
    81     linear = costperop * nops
    86     linear = costperop * nops
    82     workers = _numworkers(ui)
    87     workers = _numworkers(ui)
    83     benefit = linear - (_STARTUP_COST * workers + linear / workers)
    88     benefit = linear - (_STARTUP_COST * workers + linear / workers)
    84     return benefit >= 0.15
    89     return benefit >= 0.15
    85 
    90 
    86 def worker(ui, costperarg, func, staticargs, args, hasretval=False,
    91 
    87            threadsafe=True):
    92 def worker(
       
    93     ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
       
    94 ):
    88     '''run a function, possibly in parallel in multiple worker
    95     '''run a function, possibly in parallel in multiple worker
    89     processes.
    96     processes.
    90 
    97 
    91     returns a progress iterator
    98     returns a progress iterator
    92 
    99 
   111     enabled = ui.configbool('worker', 'enabled')
   118     enabled = ui.configbool('worker', 'enabled')
   112     if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
   119     if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
   113         return _platformworker(ui, func, staticargs, args, hasretval)
   120         return _platformworker(ui, func, staticargs, args, hasretval)
   114     return func(*staticargs + (args,))
   121     return func(*staticargs + (args,))
   115 
   122 
       
   123 
   116 def _posixworker(ui, func, staticargs, args, hasretval):
   124 def _posixworker(ui, func, staticargs, args, hasretval):
   117     workers = _numworkers(ui)
   125     workers = _numworkers(ui)
   118     oldhandler = signal.getsignal(signal.SIGINT)
   126     oldhandler = signal.getsignal(signal.SIGINT)
   119     signal.signal(signal.SIGINT, signal.SIG_IGN)
   127     signal.signal(signal.SIGINT, signal.SIG_IGN)
   120     pids, problem = set(), [0]
   128     pids, problem = set(), [0]
       
   129 
   121     def killworkers():
   130     def killworkers():
   122         # unregister SIGCHLD handler as all children will be killed. This
   131         # unregister SIGCHLD handler as all children will be killed. This
   123         # function shouldn't be interrupted by another SIGCHLD; otherwise pids
   132         # function shouldn't be interrupted by another SIGCHLD; otherwise pids
   124         # could be updated while iterating, which would cause inconsistency.
   133         # could be updated while iterating, which would cause inconsistency.
   125         signal.signal(signal.SIGCHLD, oldchldhandler)
   134         signal.signal(signal.SIGCHLD, oldchldhandler)
   128             try:
   137             try:
   129                 os.kill(p, signal.SIGTERM)
   138                 os.kill(p, signal.SIGTERM)
   130             except OSError as err:
   139             except OSError as err:
   131                 if err.errno != errno.ESRCH:
   140                 if err.errno != errno.ESRCH:
   132                     raise
   141                     raise
       
   142 
   133     def waitforworkers(blocking=True):
   143     def waitforworkers(blocking=True):
   134         for pid in pids.copy():
   144         for pid in pids.copy():
   135             p = st = 0
   145             p = st = 0
   136             while True:
   146             while True:
   137                 try:
   147                 try:
   153                 continue
   163                 continue
   154             pids.discard(p)
   164             pids.discard(p)
   155             st = _exitstatus(st)
   165             st = _exitstatus(st)
   156             if st and not problem[0]:
   166             if st and not problem[0]:
   157                 problem[0] = st
   167                 problem[0] = st
       
   168 
   158     def sigchldhandler(signum, frame):
   169     def sigchldhandler(signum, frame):
   159         waitforworkers(blocking=False)
   170         waitforworkers(blocking=False)
   160         if problem[0]:
   171         if problem[0]:
   161             killworkers()
   172             killworkers()
       
   173 
   162     oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
   174     oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
   163     ui.flush()
   175     ui.flush()
   164     parentpid = os.getpid()
   176     parentpid = os.getpid()
   165     pipes = []
   177     pipes = []
   166     retval = {}
   178     retval = {}
   194                     for result in func(*(staticargs + (pargs,))):
   206                     for result in func(*(staticargs + (pargs,))):
   195                         os.write(wfd, util.pickle.dumps(result))
   207                         os.write(wfd, util.pickle.dumps(result))
   196                     return 0
   208                     return 0
   197 
   209 
   198                 ret = scmutil.callcatch(ui, workerfunc)
   210                 ret = scmutil.callcatch(ui, workerfunc)
   199         except: # parent re-raises, child never returns
   211         except:  # parent re-raises, child never returns
   200             if os.getpid() == parentpid:
   212             if os.getpid() == parentpid:
   201                 raise
   213                 raise
   202             exctype = sys.exc_info()[0]
   214             exctype = sys.exc_info()[0]
   203             force = not issubclass(exctype, KeyboardInterrupt)
   215             force = not issubclass(exctype, KeyboardInterrupt)
   204             ui.traceback(force=force)
   216             ui.traceback(force=force)
   205         finally:
   217         finally:
   206             if os.getpid() != parentpid:
   218             if os.getpid() != parentpid:
   207                 try:
   219                 try:
   208                     ui.flush()
   220                     ui.flush()
   209                 except: # never returns, no re-raises
   221                 except:  # never returns, no re-raises
   210                     pass
   222                     pass
   211                 finally:
   223                 finally:
   212                     os._exit(ret & 255)
   224                     os._exit(ret & 255)
   213         pids.add(pid)
   225         pids.add(pid)
   214     selector = selectors.DefaultSelector()
   226     selector = selectors.DefaultSelector()
   215     for rfd, wfd in pipes:
   227     for rfd, wfd in pipes:
   216         os.close(wfd)
   228         os.close(wfd)
   217         selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
   229         selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
       
   230 
   218     def cleanup():
   231     def cleanup():
   219         signal.signal(signal.SIGINT, oldhandler)
   232         signal.signal(signal.SIGINT, oldhandler)
   220         waitforworkers()
   233         waitforworkers()
   221         signal.signal(signal.SIGCHLD, oldchldhandler)
   234         signal.signal(signal.SIGCHLD, oldchldhandler)
   222         selector.close()
   235         selector.close()
   223         return problem[0]
   236         return problem[0]
       
   237 
   224     try:
   238     try:
   225         openpipes = len(pipes)
   239         openpipes = len(pipes)
   226         while openpipes > 0:
   240         while openpipes > 0:
   227             for key, events in selector.select():
   241             for key, events in selector.select():
   228                 try:
   242                 try:
   237                     openpipes -= 1
   251                     openpipes -= 1
   238                 except IOError as e:
   252                 except IOError as e:
   239                     if e.errno == errno.EINTR:
   253                     if e.errno == errno.EINTR:
   240                         continue
   254                         continue
   241                     raise
   255                     raise
   242     except: # re-raises
   256     except:  # re-raises
   243         killworkers()
   257         killworkers()
   244         cleanup()
   258         cleanup()
   245         raise
   259         raise
   246     status = cleanup()
   260     status = cleanup()
   247     if status:
   261     if status:
   249             os.kill(os.getpid(), -status)
   263             os.kill(os.getpid(), -status)
   250         sys.exit(status)
   264         sys.exit(status)
   251     if hasretval:
   265     if hasretval:
   252         yield True, retval
   266         yield True, retval
   253 
   267 
       
   268 
   254 def _posixexitstatus(code):
   269 def _posixexitstatus(code):
   255     '''convert a posix exit status into the same form returned by
   270     '''convert a posix exit status into the same form returned by
   256     os.spawnv
   271     os.spawnv
   257 
   272 
   258     returns None if the process was stopped instead of exiting'''
   273     returns None if the process was stopped instead of exiting'''
   259     if os.WIFEXITED(code):
   274     if os.WIFEXITED(code):
   260         return os.WEXITSTATUS(code)
   275         return os.WEXITSTATUS(code)
   261     elif os.WIFSIGNALED(code):
   276     elif os.WIFSIGNALED(code):
   262         return -os.WTERMSIG(code)
   277         return -(os.WTERMSIG(code))
       
   278 
   263 
   279 
   264 def _windowsworker(ui, func, staticargs, args, hasretval):
   280 def _windowsworker(ui, func, staticargs, args, hasretval):
   265     class Worker(threading.Thread):
   281     class Worker(threading.Thread):
   266         def __init__(self, taskqueue, resultqueue, func, staticargs, *args,
   282         def __init__(
   267                      **kwargs):
   283             self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
       
   284         ):
   268             threading.Thread.__init__(self, *args, **kwargs)
   285             threading.Thread.__init__(self, *args, **kwargs)
   269             self._taskqueue = taskqueue
   286             self._taskqueue = taskqueue
   270             self._resultqueue = resultqueue
   287             self._resultqueue = resultqueue
   271             self._func = func
   288             self._func = func
   272             self._staticargs = staticargs
   289             self._staticargs = staticargs
   296                 # it as if the func was running without workers.
   313                 # it as if the func was running without workers.
   297                 self.exception = e
   314                 self.exception = e
   298                 raise
   315                 raise
   299 
   316 
   300     threads = []
   317     threads = []
       
   318 
   301     def trykillworkers():
   319     def trykillworkers():
   302         # Allow up to 1 second to clean worker threads nicely
   320         # Allow up to 1 second to clean worker threads nicely
   303         cleanupend = time.time() + 1
   321         cleanupend = time.time() + 1
   304         for t in threads:
   322         for t in threads:
   305             t.interrupt()
   323             t.interrupt()
   309             if t.is_alive():
   327             if t.is_alive():
   310                 # pass over the workers joining failure. it is more
   328                 # pass over the workers joining failure. it is more
   311                 # important to surface the inital exception than the
   329                 # important to surface the inital exception than the
   312                 # fact that one of workers may be processing a large
   330                 # fact that one of workers may be processing a large
   313                 # task and does not get to handle the interruption.
   331                 # task and does not get to handle the interruption.
   314                 ui.warn(_("failed to kill worker threads while "
   332                 ui.warn(
   315                           "handling an exception\n"))
   333                     _(
       
   334                         "failed to kill worker threads while "
       
   335                         "handling an exception\n"
       
   336                     )
       
   337                 )
   316                 return
   338                 return
   317 
   339 
   318     workers = _numworkers(ui)
   340     workers = _numworkers(ui)
   319     resultqueue = pycompat.queue.Queue()
   341     resultqueue = pycompat.queue.Queue()
   320     taskqueue = pycompat.queue.Queue()
   342     taskqueue = pycompat.queue.Queue()
   339             finishedthreads = [_t for _t in threads if not _t.is_alive()]
   361             finishedthreads = [_t for _t in threads if not _t.is_alive()]
   340             for t in finishedthreads:
   362             for t in finishedthreads:
   341                 if t.exception is not None:
   363                 if t.exception is not None:
   342                     raise t.exception
   364                     raise t.exception
   343                 threads.remove(t)
   365                 threads.remove(t)
   344     except (Exception, KeyboardInterrupt): # re-raises
   366     except (Exception, KeyboardInterrupt):  # re-raises
   345         trykillworkers()
   367         trykillworkers()
   346         raise
   368         raise
   347     while not resultqueue.empty():
   369     while not resultqueue.empty():
   348         res = resultqueue.get()
   370         res = resultqueue.get()
   349         if hasretval and res[0]:
   371         if hasretval and res[0]:
   351         else:
   373         else:
   352             yield res
   374             yield res
   353     if hasretval:
   375     if hasretval:
   354         yield True, retval
   376         yield True, retval
   355 
   377 
       
   378 
   356 if pycompat.iswindows:
   379 if pycompat.iswindows:
   357     _platformworker = _windowsworker
   380     _platformworker = _windowsworker
   358 else:
   381 else:
   359     _platformworker = _posixworker
   382     _platformworker = _posixworker
   360     _exitstatus = _posixexitstatus
   383     _exitstatus = _posixexitstatus
       
   384 
   361 
   385 
   362 def partition(lst, nslices):
   386 def partition(lst, nslices):
   363     '''partition a list into N slices of roughly equal size
   387     '''partition a list into N slices of roughly equal size
   364 
   388 
   365     The current strategy takes every Nth element from the input. If
   389     The current strategy takes every Nth element from the input. If