57 return min(max(countcpus(), 4), 32) |
57 return min(max(countcpus(), 4), 32) |
58 |
58 |
59 |
59 |
60 def ismainthread(): |
60 def ismainthread(): |
61 return threading.current_thread() == threading.main_thread() |
61 return threading.current_thread() == threading.main_thread() |
62 |
|
63 |
|
64 class _blockingreader: |
|
65 """Wrap unbuffered stream such that pickle.load() works with it. |
|
66 |
|
67 pickle.load() expects that calls to read() and readinto() read as many |
|
68 bytes as requested. On EOF, it is fine to read fewer bytes. In this case, |
|
69 pickle.load() raises an EOFError. |
|
70 """ |
|
71 |
|
72 def __init__(self, wrapped): |
|
73 self._wrapped = wrapped |
|
74 |
|
75 def readline(self): |
|
76 return self._wrapped.readline() |
|
77 |
|
78 def readinto(self, buf): |
|
79 pos = 0 |
|
80 size = len(buf) |
|
81 |
|
82 with memoryview(buf) as view: |
|
83 while pos < size: |
|
84 with view[pos:] as subview: |
|
85 ret = self._wrapped.readinto(subview) |
|
86 if not ret: |
|
87 break |
|
88 pos += ret |
|
89 |
|
90 return pos |
|
91 |
|
92 # issue multiple reads until size is fulfilled (or EOF is encountered) |
|
93 def read(self, size=-1): |
|
94 if size < 0: |
|
95 return self._wrapped.readall() |
|
96 |
|
97 buf = bytearray(size) |
|
98 n_read = self.readinto(buf) |
|
99 del buf[n_read:] |
|
100 return bytes(buf) |
|
101 |
62 |
102 |
63 |
103 if pycompat.isposix or pycompat.iswindows: |
64 if pycompat.isposix or pycompat.iswindows: |
104 _STARTUP_COST = 0.01 |
65 _STARTUP_COST = 0.01 |
105 # The Windows worker is thread based. If tasks are CPU bound, threads |
66 # The Windows worker is thread based. If tasks are CPU bound, threads |
274 os._exit(ret & 255) |
235 os._exit(ret & 255) |
275 pids.add(pid) |
236 pids.add(pid) |
276 selector = selectors.DefaultSelector() |
237 selector = selectors.DefaultSelector() |
277 for rfd, wfd in pipes: |
238 for rfd, wfd in pipes: |
278 os.close(wfd) |
239 os.close(wfd) |
279 # The stream has to be unbuffered. Otherwise, if all data is read from |
240 # Buffering is needed for performance, but it also presents a problem: |
280 # the raw file into the buffer, the selector thinks that the FD is not |
241 # selector doesn't take the buffered data into account, |
281 # ready to read while pickle.load() could read from the buffer. This |
242 # so we have to arrange it so that the buffers are empty when select is called |
282 # would delay the processing of readable items. |
243 # (see [peek_nonblock]) |
283 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ) |
244 selector.register(os.fdopen(rfd, 'rb', 4096), selectors.EVENT_READ) |
|
245 |
|
246 def peek_nonblock(f): |
|
247 os.set_blocking(f.fileno(), False) |
|
248 res = f.peek() |
|
249 os.set_blocking(f.fileno(), True) |
|
250 return res |
|
251 |
|
252 def load_all(f): |
|
253 # The pytype error likely goes away on a modern version of |
|
254 # pytype having a modern typeshed snapshot. |
|
255 # pytype: disable=wrong-arg-types |
|
256 yield pickle.load(f) |
|
257 while len(peek_nonblock(f)) > 0: |
|
258 yield pickle.load(f) |
|
259 # pytype: enable=wrong-arg-types |
284 |
260 |
285 def cleanup(): |
261 def cleanup(): |
286 signal.signal(signal.SIGINT, oldhandler) |
262 signal.signal(signal.SIGINT, oldhandler) |
287 waitforworkers() |
263 waitforworkers() |
288 signal.signal(signal.SIGCHLD, oldchldhandler) |
264 signal.signal(signal.SIGCHLD, oldchldhandler) |
292 try: |
268 try: |
293 openpipes = len(pipes) |
269 openpipes = len(pipes) |
294 while openpipes > 0: |
270 while openpipes > 0: |
295 for key, events in selector.select(): |
271 for key, events in selector.select(): |
296 try: |
272 try: |
297 # The pytype error likely goes away on a modern version of |
273 for res in load_all(key.fileobj): |
298 # pytype having a modern typeshed snapshot. |
274 if hasretval and res[0]: |
299 # pytype: disable=wrong-arg-types |
275 retval.update(res[1]) |
300 res = pickle.load(_blockingreader(key.fileobj)) |
276 else: |
301 # pytype: enable=wrong-arg-types |
277 yield res |
302 if hasretval and res[0]: |
|
303 retval.update(res[1]) |
|
304 else: |
|
305 yield res |
|
306 except EOFError: |
278 except EOFError: |
307 selector.unregister(key.fileobj) |
279 selector.unregister(key.fileobj) |
308 # pytype: disable=attribute-error |
280 # pytype: disable=attribute-error |
309 key.fileobj.close() |
281 key.fileobj.close() |
310 # pytype: enable=attribute-error |
282 # pytype: enable=attribute-error |