changeset 25406:be930f16a52a

util: introduce a bufferedinputpipe utility To restore real time server output through ssh, we need to using polling feature (like select) on the pipes used to communicate with the ssh client. However we cannot use select alongside python level buffering of these pipe (because we need to know if the buffer is non-empty before calling select). However, unbuffered performance are terrible, presumably because the 'readline' call is issuing 'read(1)' call until it find a '\n'. To work around that we introduces our own overlay that do buffering by hand, exposing the state of the buffer to the outside world. The usage of polling IO will be introduced later in the 'sshpeer' module. All its logic will be very specific to the way mercurial communicate over ssh and does not belong to the generic 'util' module.
author Pierre-Yves David <pierre-yves.david@fb.com>
date Sat, 30 May 2015 23:55:24 -0700
parents 220a220ed088
children e461230cc95b
files mercurial/util.py
diffstat 1 files changed, 97 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/util.py	Wed May 27 22:11:37 2015 -0700
+++ b/mercurial/util.py	Sat May 30 23:55:24 2015 -0700
@@ -232,6 +232,103 @@
 import subprocess
 closefds = os.name == 'posix'
 
+_chunksize = 4096
+
+class bufferedinputpipe(object):
+    """a manually buffered input pipe
+
+    Python will not let us use buffered IO and lazy reading with 'polling' at
+    the same time. We cannot probe the buffer state and select will not detect
+    that data are ready to read if they are already buffered.
+
+    This class let us work around that by implementing its own buffering
+    (allowing efficient readline) while offering a way to know if the buffer is
+    empty from the output (allowing collaboration of the buffer with polling).
+
+    This class lives in the 'util' module because it makes use of the 'os'
+    module from the python stdlib.
+    """
+
+    def __init__(self, input):
+        self._input = input
+        self._buffer = []
+        self._eof = False
+
+    @property
+    def hasbuffer(self):
+        """True is any data is currently buffered
+
+        This will be used externally a pre-step for polling IO. If there is
+        already data then no polling should be set in place."""
+        return bool(self._buffer)
+
+    @property
+    def closed(self):
+        return self._input.closed
+
+    def fileno(self):
+        return self._input.fileno()
+
+    def close(self):
+        return self._input.close()
+
+    def read(self, size):
+        while (not self._eof) and (self._lenbuf < size):
+            self._fillbuffer()
+        return self._frombuffer(size)
+
+    def readline(self, *args, **kwargs):
+        if 1 < len(self._buffer):
+            # this should not happen because both read and readline end with a
+            # _frombuffer call that collapse it.
+            self._buffer = [''.join(self._buffer)]
+        lfi = -1
+        if self._buffer:
+            lfi = self._buffer[-1].find('\n')
+        while (not self._eof) and lfi < 0:
+            self._fillbuffer()
+            if self._buffer:
+                lfi = self._buffer[-1].find('\n')
+        size = lfi + 1
+        if lfi < 0: # end of file
+            size = self._lenbuf
+        elif 1 < len(self._buffer):
+            # we need to take previous chunks into account
+            size += self._lenbuf - len(self._buffer[-1])
+        return self._frombuffer(size)
+
+    @property
+    def _lenbuf(self):
+        """return the current lengh of buffered data"""
+        return sum(len(d) for d in self._buffer)
+
+    def _frombuffer(self, size):
+        """return at most 'size' data from the buffer
+
+        The data are removed from the buffer."""
+        if size == 0 or not self._buffer:
+            return ''
+        buf = self._buffer[0]
+        if 1 < len(self._buffer):
+            buf = ''.join(self._buffer)
+
+        data = buf[:size]
+        buf = buf[len(data):]
+        if buf:
+            self._buffer = [buf]
+        else:
+            self._buffer = []
+        return data
+
+    def _fillbuffer(self):
+        """read data to the buffer"""
+        data = os.read(self._input.fileno(), _chunksize)
+        if not data:
+            self._eof = True
+        else:
+            # inefficient add
+            self._buffer.append(data)
+
 def popen2(cmd, env=None, newlines=False):
     # Setting bufsize to -1 lets the system decide the buffer size.
     # The default for bufsize is 0, meaning unbuffered. This leads to