mercurial/thirdparty/concurrent/futures/thread.py
changeset 37623 eb687c28a915
child 37626 0a9c0d3480b2
equal deleted inserted replaced
37622:bfdd20d22a86 37623:eb687c28a915
       
     1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
       
     2 # Licensed to PSF under a Contributor Agreement.
       
     3 
       
     4 """Implements ThreadPoolExecutor."""
       
     5 
       
     6 import atexit
       
     7 from concurrent.futures import _base
       
     8 import itertools
       
     9 import Queue as queue
       
    10 import threading
       
    11 import weakref
       
    12 import sys
       
    13 
       
    14 try:
       
    15     from multiprocessing import cpu_count
       
    16 except ImportError:
       
    17     # some platforms don't have multiprocessing
       
    18     def cpu_count():
       
    19         return None
       
    20 
       
    21 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
       
    22 
       
    23 # Workers are created as daemon threads. This is done to allow the interpreter
       
    24 # to exit when there are still idle threads in a ThreadPoolExecutor's thread
       
    25 # pool (i.e. shutdown() was not called). However, allowing workers to die with
       
    26 # the interpreter has two undesirable properties:
       
    27 #   - The workers would still be running during interpretor shutdown,
       
    28 #     meaning that they would fail in unpredictable ways.
       
    29 #   - The workers could be killed while evaluating a work item, which could
       
    30 #     be bad if the callable being evaluated has external side-effects e.g.
       
    31 #     writing to a file.
       
    32 #
       
    33 # To work around this problem, an exit handler is installed which tells the
       
    34 # workers to exit when their work queues are empty and then waits until the
       
    35 # threads finish.
       
    36 
       
    37 _threads_queues = weakref.WeakKeyDictionary()
       
    38 _shutdown = False
       
    39 
       
    40 def _python_exit():
       
    41     global _shutdown
       
    42     _shutdown = True
       
    43     items = list(_threads_queues.items()) if _threads_queues else ()
       
    44     for t, q in items:
       
    45         q.put(None)
       
    46     for t, q in items:
       
    47         t.join(sys.maxint)
       
    48 
       
    49 atexit.register(_python_exit)
       
    50 
       
    51 class _WorkItem(object):
       
    52     def __init__(self, future, fn, args, kwargs):
       
    53         self.future = future
       
    54         self.fn = fn
       
    55         self.args = args
       
    56         self.kwargs = kwargs
       
    57 
       
    58     def run(self):
       
    59         if not self.future.set_running_or_notify_cancel():
       
    60             return
       
    61 
       
    62         try:
       
    63             result = self.fn(*self.args, **self.kwargs)
       
    64         except:
       
    65             e, tb = sys.exc_info()[1:]
       
    66             self.future.set_exception_info(e, tb)
       
    67         else:
       
    68             self.future.set_result(result)
       
    69 
       
    70 def _worker(executor_reference, work_queue):
       
    71     try:
       
    72         while True:
       
    73             work_item = work_queue.get(block=True)
       
    74             if work_item is not None:
       
    75                 work_item.run()
       
    76                 # Delete references to object. See issue16284
       
    77                 del work_item
       
    78                 continue
       
    79             executor = executor_reference()
       
    80             # Exit if:
       
    81             #   - The interpreter is shutting down OR
       
    82             #   - The executor that owns the worker has been collected OR
       
    83             #   - The executor that owns the worker has been shutdown.
       
    84             if _shutdown or executor is None or executor._shutdown:
       
    85                 # Notice other workers
       
    86                 work_queue.put(None)
       
    87                 return
       
    88             del executor
       
    89     except:
       
    90         _base.LOGGER.critical('Exception in worker', exc_info=True)
       
    91 
       
    92 
       
    93 class ThreadPoolExecutor(_base.Executor):
       
    94 
       
    95     # Used to assign unique thread names when thread_name_prefix is not supplied.
       
    96     _counter = itertools.count().next
       
    97 
       
    98     def __init__(self, max_workers=None, thread_name_prefix=''):
       
    99         """Initializes a new ThreadPoolExecutor instance.
       
   100 
       
   101         Args:
       
   102             max_workers: The maximum number of threads that can be used to
       
   103                 execute the given calls.
       
   104             thread_name_prefix: An optional name prefix to give our threads.
       
   105         """
       
   106         if max_workers is None:
       
   107             # Use this number because ThreadPoolExecutor is often
       
   108             # used to overlap I/O instead of CPU work.
       
   109             max_workers = (cpu_count() or 1) * 5
       
   110         if max_workers <= 0:
       
   111             raise ValueError("max_workers must be greater than 0")
       
   112 
       
   113         self._max_workers = max_workers
       
   114         self._work_queue = queue.Queue()
       
   115         self._threads = set()
       
   116         self._shutdown = False
       
   117         self._shutdown_lock = threading.Lock()
       
   118         self._thread_name_prefix = (thread_name_prefix or
       
   119                                     ("ThreadPoolExecutor-%d" % self._counter()))
       
   120 
       
   121     def submit(self, fn, *args, **kwargs):
       
   122         with self._shutdown_lock:
       
   123             if self._shutdown:
       
   124                 raise RuntimeError('cannot schedule new futures after shutdown')
       
   125 
       
   126             f = _base.Future()
       
   127             w = _WorkItem(f, fn, args, kwargs)
       
   128 
       
   129             self._work_queue.put(w)
       
   130             self._adjust_thread_count()
       
   131             return f
       
   132     submit.__doc__ = _base.Executor.submit.__doc__
       
   133 
       
   134     def _adjust_thread_count(self):
       
   135         # When the executor gets lost, the weakref callback will wake up
       
   136         # the worker threads.
       
   137         def weakref_cb(_, q=self._work_queue):
       
   138             q.put(None)
       
   139         # TODO(bquinlan): Should avoid creating new threads if there are more
       
   140         # idle threads than items in the work queue.
       
   141         num_threads = len(self._threads)
       
   142         if num_threads < self._max_workers:
       
   143             thread_name = '%s_%d' % (self._thread_name_prefix or self,
       
   144                                      num_threads)
       
   145             t = threading.Thread(name=thread_name, target=_worker,
       
   146                                  args=(weakref.ref(self, weakref_cb),
       
   147                                        self._work_queue))
       
   148             t.daemon = True
       
   149             t.start()
       
   150             self._threads.add(t)
       
   151             _threads_queues[t] = self._work_queue
       
   152 
       
   153     def shutdown(self, wait=True):
       
   154         with self._shutdown_lock:
       
   155             self._shutdown = True
       
   156             self._work_queue.put(None)
       
   157         if wait:
       
   158             for t in self._threads:
       
   159                 t.join(sys.maxint)
       
   160     shutdown.__doc__ = _base.Executor.shutdown.__doc__