worker: avoid reading 1 byte at a time from the OS pipe
Apparently `pickle.load` does a lot of small reads, many of them
literally 1-byte, so it benefits greatly from buffering.
This change enables the buffering, at the cost of more complicated
interaction with the `selector` API.
On one repository with ~400k files this reduces the time by about ~30s,
from ~60 to ~30s. The difference is so large because the actual updating
work is parallellized, while these small reads are bottlenecking the
central hg process.
--- a/mercurial/worker.py Tue Jan 10 12:55:49 2023 -0500
+++ b/mercurial/worker.py Fri Jan 06 15:17:14 2023 +0000
@@ -61,45 +61,6 @@
return threading.current_thread() == threading.main_thread()
-class _blockingreader:
- """Wrap unbuffered stream such that pickle.load() works with it.
-
- pickle.load() expects that calls to read() and readinto() read as many
- bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
- pickle.load() raises an EOFError.
- """
-
- def __init__(self, wrapped):
- self._wrapped = wrapped
-
- def readline(self):
- return self._wrapped.readline()
-
- def readinto(self, buf):
- pos = 0
- size = len(buf)
-
- with memoryview(buf) as view:
- while pos < size:
- with view[pos:] as subview:
- ret = self._wrapped.readinto(subview)
- if not ret:
- break
- pos += ret
-
- return pos
-
- # issue multiple reads until size is fulfilled (or EOF is encountered)
- def read(self, size=-1):
- if size < 0:
- return self._wrapped.readall()
-
- buf = bytearray(size)
- n_read = self.readinto(buf)
- del buf[n_read:]
- return bytes(buf)
-
-
if pycompat.isposix or pycompat.iswindows:
_STARTUP_COST = 0.01
# The Windows worker is thread based. If tasks are CPU bound, threads
@@ -276,11 +237,26 @@
selector = selectors.DefaultSelector()
for rfd, wfd in pipes:
os.close(wfd)
- # The stream has to be unbuffered. Otherwise, if all data is read from
- # the raw file into the buffer, the selector thinks that the FD is not
- # ready to read while pickle.load() could read from the buffer. This
- # would delay the processing of readable items.
- selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
+ # Buffering is needed for performance, but it also presents a problem:
+ # selector doesn't take the buffered data into account,
+ # so we have to arrange it so that the buffers are empty when select is called
+ # (see [peek_nonblock])
+ selector.register(os.fdopen(rfd, 'rb', 4096), selectors.EVENT_READ)
+
+ def peek_nonblock(f):
+ os.set_blocking(f.fileno(), False)
+ res = f.peek()
+ os.set_blocking(f.fileno(), True)
+ return res
+
+ def load_all(f):
+ # The pytype error likely goes away on a modern version of
+ # pytype having a modern typeshed snapshot.
+ # pytype: disable=wrong-arg-types
+ yield pickle.load(f)
+ while len(peek_nonblock(f)) > 0:
+ yield pickle.load(f)
+ # pytype: enable=wrong-arg-types
def cleanup():
signal.signal(signal.SIGINT, oldhandler)
@@ -294,15 +270,11 @@
while openpipes > 0:
for key, events in selector.select():
try:
- # The pytype error likely goes away on a modern version of
- # pytype having a modern typeshed snapshot.
- # pytype: disable=wrong-arg-types
- res = pickle.load(_blockingreader(key.fileobj))
- # pytype: enable=wrong-arg-types
- if hasretval and res[0]:
- retval.update(res[1])
- else:
- yield res
+ for res in load_all(key.fileobj):
+ if hasretval and res[0]:
+ retval.update(res[1])
+ else:
+ yield res
except EOFError:
selector.unregister(key.fileobj)
# pytype: disable=attribute-error