sshpeer: introduce a "doublepipe" class
This class is responsible for ensuring we still process the server output
streamed through the ssh's 'stderr' pipe during the initial wait for other
protocol streams.
It currently only works on posix system because of its use of 'select.select'.
--- a/mercurial/sshpeer.py Wed May 20 18:00:05 2015 -0500
+++ b/mercurial/sshpeer.py Fri May 22 10:48:11 2015 -0500
@@ -36,6 +36,71 @@
for l in s.splitlines():
ui.status(_("remote: "), l, '\n')
+class doublepipe(object):
+ """Operate a side-channel pipe in addition of a main one
+
+ The side-channel pipe contains server output to be forwarded to the user
+ input. The double pipe will behave as the "main" pipe, but will ensure the
+ content of the "side" pipe is properly processed while we wait for blocking
+ call on the "main" pipe.
+
+ If large amounts of data are read from "main", the forward will cease after
+ the first bytes start to appear. This simplifies the implementation
+ without affecting actual output of sshpeer too much as we rarely issue
+ large read for data not yet emitted by the server.
+
+ The main pipe is expected to be a 'bufferedinputpipe' from the util module
+ that handle all the os specific bites. This class lives in this module
+ because it focus on behavior specifig to the ssh protocol."""
+
+ def __init__(self, ui, main, side):
+ self._ui = ui
+ self._main = main
+ self._side = side
+
+ def _wait(self):
+ """wait until some data are available on main or side
+
+ return a pair of boolean (ismainready, issideready)
+
+ (This will only wait for data if the setup is supported by `util.poll`)
+ """
+ if self._main.hasbuffer:
+ return (True, True) # main has data, assume side is worth poking at.
+ fds = [self._main.fileno(), self._side.fileno()]
+ try:
+ act = util.poll(fds)
+ except NotImplementedError:
+ # non supported yet case, assume all have data.
+ act = fds
+ return (self._main.fileno() in act, self._side.fileno() in act)
+
+ def read(self, size):
+ return self._call('read', size)
+
+ def readline(self):
+ return self._call('readline')
+
+ def _call(self, methname, size=None):
+ """call <methname> on "main", forward output of "side" while blocking
+ """
+ if size == 0 or self._main.closed:
+ _forwardoutput(self._ui, self._side)
+ return ''
+ while True:
+ mainready, sideready = self._wait()
+ if sideready:
+ _forwardoutput(self._ui, self._side)
+ if mainready:
+ meth = getattr(self._main, methname)
+ if size is None:
+ return meth()
+ else:
+ return meth(size)
+
+ def close(self):
+ return self._main.close()
+
class sshpeer(wireproto.wirepeer):
def __init__(self, ui, path, create=False):
self._url = path