# HG changeset patch # User Augie Fackler # Date 1456875703 18000 # Node ID 48fd02dac1d4a170b10e1d5216f4ef456e8909e8 # Parent c3eacee01c7ebc8808e3ca596acf26912baf446d wireproto: make iterbatcher behave streamily over http(s) Unfortunately, the ssh and http implementations are slightly different due to differences in their _callstream implementations, which prevents ssh from behaving streamily. We should probably introduce a new batch command that can stream results over ssh at some point in the near future. The streamy behavior of batch over http(s) is an enormous win for remotefilelog over http: in my testing, it's saving about 40% on file fetches with a cold cache against a server on localhost. diff -r c3eacee01c7e -r 48fd02dac1d4 mercurial/sshpeer.py --- a/mercurial/sshpeer.py Tue Mar 01 17:44:41 2016 -0500 +++ b/mercurial/sshpeer.py Tue Mar 01 18:41:43 2016 -0500 @@ -231,6 +231,31 @@ __del__ = cleanup + def _submitbatch(self, req): + cmds = [] + for op, argsdict in req: + args = ','.join('%s=%s' % (wireproto.escapearg(k), + wireproto.escapearg(v)) + for k, v in argsdict.iteritems()) + cmds.append('%s %s' % (op, args)) + rsp = self._callstream("batch", cmds=';'.join(cmds)) + available = self._getamount() + # TODO this response parsing is probably suboptimal for large + # batches with large responses. + toread = min(available, 1024) + work = rsp.read(toread) + available -= toread + chunk = work + while chunk: + while ';' in work: + one, work = work.split(';', 1) + yield wireproto.unescapearg(one) + toread = min(available, 1024) + chunk = rsp.read(toread) + available -= toread + work += chunk + yield wireproto.unescapearg(work) + def _callstream(self, cmd, **args): self.ui.debug("sending %s command\n" % cmd) self.pipeo.write("%s\n" % cmd) @@ -291,7 +316,7 @@ self._send("", flush=True) return self.pipei - def _recv(self): + def _getamount(self): l = self.pipei.readline() if l == '\n': self.readerr() @@ -299,10 +324,12 @@ self._abort(error.OutOfBandError(hint=msg)) self.readerr() try: - l = int(l) + return int(l) except ValueError: self._abort(error.ResponseError(_("unexpected response:"), l)) - return self.pipei.read(l) + + def _recv(self): + return self.pipei.read(self._getamount()) def _send(self, data, flush=False): self.pipeo.write("%d\n" % len(data)) diff -r c3eacee01c7e -r 48fd02dac1d4 mercurial/wireproto.py --- a/mercurial/wireproto.py Tue Mar 01 17:44:41 2016 -0500 +++ b/mercurial/wireproto.py Tue Mar 01 18:41:43 2016 -0500 @@ -7,6 +7,7 @@ from __future__ import absolute_import +import itertools import os import sys import tempfile @@ -119,19 +120,35 @@ super(remoteiterbatcher, self).__init__() self._remote = remote + def __getattr__(self, name): + if not getattr(self._remote, name, False): + raise AttributeError( + 'Attempted to iterbatch non-batchable call to %r' % name) + return super(remoteiterbatcher, self).__getattr__(name) + def submit(self): """Break the batch request into many patch calls and pipeline them. This is mostly valuable over http where request sizes can be limited, but can be used in other places as well. """ - rb = self._remote.batch() - rb.calls = self.calls - rb.submit() + req, rsp = [], [] + for name, args, opts, resref in self.calls: + mtd = getattr(self._remote, name) + batchable = mtd.batchable(mtd.im_self, *args, **opts) + encargsorres, encresref = batchable.next() + assert encresref + req.append((name, encargsorres)) + rsp.append((batchable, encresref)) + if req: + self._resultiter = self._remote._submitbatch(req) + self._rsp = rsp def results(self): - for name, args, opts, resref in self.calls: - yield resref.value + for (batchable, encresref), encres in itertools.izip( + self._rsp, self._resultiter): + encresref.set(encres) + yield batchable.next() # Forward a couple of names from peer to make wireproto interactions # slightly more sensible. @@ -202,13 +219,28 @@ else: return peer.localbatch(self) def _submitbatch(self, req): + """run batch request on the server + + Returns an iterator of the raw responses from the server. + """ cmds = [] for op, argsdict in req: args = ','.join('%s=%s' % (escapearg(k), escapearg(v)) for k, v in argsdict.iteritems()) cmds.append('%s %s' % (op, args)) - rsp = self._call("batch", cmds=';'.join(cmds)) - return [unescapearg(r) for r in rsp.split(';')] + rsp = self._callstream("batch", cmds=';'.join(cmds)) + # TODO this response parsing is probably suboptimal for large + # batches with large responses. + work = rsp.read(1024) + chunk = work + while chunk: + while ';' in work: + one, work = work.split(';', 1) + yield unescapearg(one) + chunk = rsp.read(1024) + work += chunk + yield unescapearg(work) + def _submitone(self, op, args): return self._call(op, **args) diff -r c3eacee01c7e -r 48fd02dac1d4 tests/test-wireproto.py --- a/tests/test-wireproto.py Tue Mar 01 17:44:41 2016 -0500 +++ b/tests/test-wireproto.py Tue Mar 01 18:41:43 2016 -0500 @@ -1,5 +1,7 @@ from __future__ import absolute_import +import StringIO + from mercurial import wireproto class proto(object): @@ -21,6 +23,9 @@ def _call(self, cmd, **args): return wireproto.dispatch(self.serverrepo, proto(args), cmd) + def _callstream(self, cmd, **args): + return StringIO.StringIO(self._call(cmd, **args)) + @wireproto.batchable def greet(self, name): f = wireproto.future()