|
1 # Copyright 2009 Brian Quinlan. All Rights Reserved. |
|
2 # Licensed to PSF under a Contributor Agreement. |
|
3 |
|
4 """Implements ThreadPoolExecutor.""" |
|
5 |
|
6 import atexit |
|
7 from concurrent.futures import _base |
|
8 import itertools |
|
9 import Queue as queue |
|
10 import threading |
|
11 import weakref |
|
12 import sys |
|
13 |
|
14 try: |
|
15 from multiprocessing import cpu_count |
|
16 except ImportError: |
|
17 # some platforms don't have multiprocessing |
|
18 def cpu_count(): |
|
19 return None |
|
20 |
|
21 __author__ = 'Brian Quinlan (brian@sweetapp.com)' |
|
22 |
|
23 # Workers are created as daemon threads. This is done to allow the interpreter |
|
24 # to exit when there are still idle threads in a ThreadPoolExecutor's thread |
|
25 # pool (i.e. shutdown() was not called). However, allowing workers to die with |
|
26 # the interpreter has two undesirable properties: |
|
27 # - The workers would still be running during interpretor shutdown, |
|
28 # meaning that they would fail in unpredictable ways. |
|
29 # - The workers could be killed while evaluating a work item, which could |
|
30 # be bad if the callable being evaluated has external side-effects e.g. |
|
31 # writing to a file. |
|
32 # |
|
33 # To work around this problem, an exit handler is installed which tells the |
|
34 # workers to exit when their work queues are empty and then waits until the |
|
35 # threads finish. |
|
36 |
|
37 _threads_queues = weakref.WeakKeyDictionary() |
|
38 _shutdown = False |
|
39 |
|
40 def _python_exit(): |
|
41 global _shutdown |
|
42 _shutdown = True |
|
43 items = list(_threads_queues.items()) if _threads_queues else () |
|
44 for t, q in items: |
|
45 q.put(None) |
|
46 for t, q in items: |
|
47 t.join(sys.maxint) |
|
48 |
|
49 atexit.register(_python_exit) |
|
50 |
|
51 class _WorkItem(object): |
|
52 def __init__(self, future, fn, args, kwargs): |
|
53 self.future = future |
|
54 self.fn = fn |
|
55 self.args = args |
|
56 self.kwargs = kwargs |
|
57 |
|
58 def run(self): |
|
59 if not self.future.set_running_or_notify_cancel(): |
|
60 return |
|
61 |
|
62 try: |
|
63 result = self.fn(*self.args, **self.kwargs) |
|
64 except: |
|
65 e, tb = sys.exc_info()[1:] |
|
66 self.future.set_exception_info(e, tb) |
|
67 else: |
|
68 self.future.set_result(result) |
|
69 |
|
70 def _worker(executor_reference, work_queue): |
|
71 try: |
|
72 while True: |
|
73 work_item = work_queue.get(block=True) |
|
74 if work_item is not None: |
|
75 work_item.run() |
|
76 # Delete references to object. See issue16284 |
|
77 del work_item |
|
78 continue |
|
79 executor = executor_reference() |
|
80 # Exit if: |
|
81 # - The interpreter is shutting down OR |
|
82 # - The executor that owns the worker has been collected OR |
|
83 # - The executor that owns the worker has been shutdown. |
|
84 if _shutdown or executor is None or executor._shutdown: |
|
85 # Notice other workers |
|
86 work_queue.put(None) |
|
87 return |
|
88 del executor |
|
89 except: |
|
90 _base.LOGGER.critical('Exception in worker', exc_info=True) |
|
91 |
|
92 |
|
93 class ThreadPoolExecutor(_base.Executor): |
|
94 |
|
95 # Used to assign unique thread names when thread_name_prefix is not supplied. |
|
96 _counter = itertools.count().next |
|
97 |
|
98 def __init__(self, max_workers=None, thread_name_prefix=''): |
|
99 """Initializes a new ThreadPoolExecutor instance. |
|
100 |
|
101 Args: |
|
102 max_workers: The maximum number of threads that can be used to |
|
103 execute the given calls. |
|
104 thread_name_prefix: An optional name prefix to give our threads. |
|
105 """ |
|
106 if max_workers is None: |
|
107 # Use this number because ThreadPoolExecutor is often |
|
108 # used to overlap I/O instead of CPU work. |
|
109 max_workers = (cpu_count() or 1) * 5 |
|
110 if max_workers <= 0: |
|
111 raise ValueError("max_workers must be greater than 0") |
|
112 |
|
113 self._max_workers = max_workers |
|
114 self._work_queue = queue.Queue() |
|
115 self._threads = set() |
|
116 self._shutdown = False |
|
117 self._shutdown_lock = threading.Lock() |
|
118 self._thread_name_prefix = (thread_name_prefix or |
|
119 ("ThreadPoolExecutor-%d" % self._counter())) |
|
120 |
|
121 def submit(self, fn, *args, **kwargs): |
|
122 with self._shutdown_lock: |
|
123 if self._shutdown: |
|
124 raise RuntimeError('cannot schedule new futures after shutdown') |
|
125 |
|
126 f = _base.Future() |
|
127 w = _WorkItem(f, fn, args, kwargs) |
|
128 |
|
129 self._work_queue.put(w) |
|
130 self._adjust_thread_count() |
|
131 return f |
|
132 submit.__doc__ = _base.Executor.submit.__doc__ |
|
133 |
|
134 def _adjust_thread_count(self): |
|
135 # When the executor gets lost, the weakref callback will wake up |
|
136 # the worker threads. |
|
137 def weakref_cb(_, q=self._work_queue): |
|
138 q.put(None) |
|
139 # TODO(bquinlan): Should avoid creating new threads if there are more |
|
140 # idle threads than items in the work queue. |
|
141 num_threads = len(self._threads) |
|
142 if num_threads < self._max_workers: |
|
143 thread_name = '%s_%d' % (self._thread_name_prefix or self, |
|
144 num_threads) |
|
145 t = threading.Thread(name=thread_name, target=_worker, |
|
146 args=(weakref.ref(self, weakref_cb), |
|
147 self._work_queue)) |
|
148 t.daemon = True |
|
149 t.start() |
|
150 self._threads.add(t) |
|
151 _threads_queues[t] = self._work_queue |
|
152 |
|
153 def shutdown(self, wait=True): |
|
154 with self._shutdown_lock: |
|
155 self._shutdown = True |
|
156 self._work_queue.put(None) |
|
157 if wait: |
|
158 for t in self._threads: |
|
159 t.join(sys.maxint) |
|
160 shutdown.__doc__ = _base.Executor.shutdown.__doc__ |