mercurial/worker.py
changeset 38729 9e6afe7fca31
parent 38535 8c38d2948217
child 38730 69ed2cff4277
equal deleted inserted replaced
38728:88be288e8ac1 38729:9e6afe7fca31
    11 import os
    11 import os
    12 import signal
    12 import signal
    13 import sys
    13 import sys
    14 import threading
    14 import threading
    15 import time
    15 import time
       
    16 
       
    17 try:
       
    18     import selectors
       
    19     selectors.BaseSelector
       
    20 except ImportError:
       
    21     from .thirdparty import selectors2 as selectors
    16 
    22 
    17 from .i18n import _
    23 from .i18n import _
    18 from . import (
    24 from . import (
    19     encoding,
    25     encoding,
    20     error,
    26     error,
    87     if enabled and worthwhile(ui, costperarg, len(args)):
    93     if enabled and worthwhile(ui, costperarg, len(args)):
    88         return _platformworker(ui, func, staticargs, args)
    94         return _platformworker(ui, func, staticargs, args)
    89     return func(*staticargs + (args,))
    95     return func(*staticargs + (args,))
    90 
    96 
    91 def _posixworker(ui, func, staticargs, args):
    97 def _posixworker(ui, func, staticargs, args):
    92     rfd, wfd = os.pipe()
       
    93     workers = _numworkers(ui)
    98     workers = _numworkers(ui)
    94     oldhandler = signal.getsignal(signal.SIGINT)
    99     oldhandler = signal.getsignal(signal.SIGINT)
    95     signal.signal(signal.SIGINT, signal.SIG_IGN)
   100     signal.signal(signal.SIGINT, signal.SIG_IGN)
    96     pids, problem = set(), [0]
   101     pids, problem = set(), [0]
    97     def killworkers():
   102     def killworkers():
   136         if problem[0]:
   141         if problem[0]:
   137             killworkers()
   142             killworkers()
   138     oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
   143     oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
   139     ui.flush()
   144     ui.flush()
   140     parentpid = os.getpid()
   145     parentpid = os.getpid()
       
   146     pipes = []
   141     for pargs in partition(args, workers):
   147     for pargs in partition(args, workers):
       
   148         # Every worker gets its own pipe to send results on, so we don't have to
       
   149         # implement atomic writes larger than PIPE_BUF. Each forked process has
       
   150         # its own pipe's descriptors in the local variables, and the parent
       
   151         # process has the full list of pipe descriptors (and it doesn't really
       
   152         # care what order they're in).
       
   153         rfd, wfd = os.pipe()
       
   154         pipes.append((rfd, wfd))
   142         # make sure we use os._exit in all worker code paths. otherwise the
   155         # make sure we use os._exit in all worker code paths. otherwise the
   143         # worker may do some clean-ups which could cause surprises like
   156         # worker may do some clean-ups which could cause surprises like
   144         # deadlock. see sshpeer.cleanup for example.
   157         # deadlock. see sshpeer.cleanup for example.
   145         # override error handling *before* fork. this is necessary because
   158         # override error handling *before* fork. this is necessary because
   146         # exception (signal) may arrive after fork, before "pid =" assignment
   159         # exception (signal) may arrive after fork, before "pid =" assignment
   152             if pid == 0:
   165             if pid == 0:
   153                 signal.signal(signal.SIGINT, oldhandler)
   166                 signal.signal(signal.SIGINT, oldhandler)
   154                 signal.signal(signal.SIGCHLD, oldchldhandler)
   167                 signal.signal(signal.SIGCHLD, oldchldhandler)
   155 
   168 
   156                 def workerfunc():
   169                 def workerfunc():
       
   170                     for r, w in pipes[:-1]:
       
   171                         os.close(r)
       
   172                         os.close(w)
   157                     os.close(rfd)
   173                     os.close(rfd)
   158                     for result in func(*(staticargs + (pargs,))):
   174                     for result in func(*(staticargs + (pargs,))):
   159                         os.write(wfd, util.pickle.dumps(result))
   175                         os.write(wfd, util.pickle.dumps(result))
   160                     return 0
   176                     return 0
   161 
   177 
   173                 except: # never returns, no re-raises
   189                 except: # never returns, no re-raises
   174                     pass
   190                     pass
   175                 finally:
   191                 finally:
   176                     os._exit(ret & 255)
   192                     os._exit(ret & 255)
   177         pids.add(pid)
   193         pids.add(pid)
   178     os.close(wfd)
   194     selector = selectors.DefaultSelector()
   179     fp = os.fdopen(rfd, r'rb', 0)
   195     for rfd, wfd in pipes:
       
   196         os.close(wfd)
       
   197         selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
   180     def cleanup():
   198     def cleanup():
   181         signal.signal(signal.SIGINT, oldhandler)
   199         signal.signal(signal.SIGINT, oldhandler)
   182         waitforworkers()
   200         waitforworkers()
   183         signal.signal(signal.SIGCHLD, oldchldhandler)
   201         signal.signal(signal.SIGCHLD, oldchldhandler)
   184         status = problem[0]
   202         status = problem[0]
   185         if status:
   203         if status:
   186             if status < 0:
   204             if status < 0:
   187                 os.kill(os.getpid(), -status)
   205                 os.kill(os.getpid(), -status)
   188             sys.exit(status)
   206             sys.exit(status)
   189     try:
   207     try:
   190         while True:
   208         openpipes = len(pipes)
   191             try:
   209         while openpipes > 0:
   192                 yield util.pickle.load(fp)
   210             for key, events in selector.select():
   193             except EOFError:
   211                 try:
   194                 break
   212                     yield util.pickle.load(key.fileobj)
   195             except IOError as e:
   213                 except EOFError:
   196                 if e.errno == errno.EINTR:
   214                     selector.unregister(key.fileobj)
   197                     continue
   215                     key.fileobj.close()
   198                 raise
   216                     openpipes -= 1
       
   217                 except IOError as e:
       
   218                     if e.errno == errno.EINTR:
       
   219                         continue
       
   220                     raise
   199     except: # re-raises
   221     except: # re-raises
   200         killworkers()
   222         killworkers()
   201         cleanup()
   223         cleanup()
   202         raise
   224         raise
   203     cleanup()
   225     cleanup()