Mercurial > hg
changeset 37614:a81d02ea65db
wireproto: move version 1 peer functionality to standalone module (API)
wireproto.py contains code for both the client and the server. There
*should* be a somewhat strong separation between the two.
This commit extracts the client-side code from wireproto.py into a new
module - wireprotov1peer.
Differential Revision: https://phab.mercurial-scm.org/D3259
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 11 Apr 2018 12:49:08 -0700 |
parents | 96d735601ca1 |
children | f3dc8239e3a9 |
files | hgext/infinitepush/__init__.py hgext/largefiles/proto.py mercurial/httppeer.py mercurial/sshpeer.py mercurial/wireproto.py mercurial/wireprotov1peer.py tests/test-batching.py tests/test-wireproto.py |
diffstat | 8 files changed, 436 insertions(+), 411 deletions(-) [+] |
line wrap: on
line diff
--- a/hgext/infinitepush/__init__.py Wed Apr 11 10:51:38 2018 -0700 +++ b/hgext/infinitepush/__init__.py Wed Apr 11 12:49:08 2018 -0700 @@ -128,6 +128,7 @@ util, wireproto, wireprototypes, + wireprotov1peer, ) from . import ( @@ -319,7 +320,7 @@ extensions.wrapfunction(discovery, 'checkheads', _checkheads) - wireproto.wirepeer.listkeyspatterns = listkeyspatterns + wireprotov1peer.wirepeer.listkeyspatterns = listkeyspatterns partorder = exchange.b2partsgenorder index = partorder.index('changeset')
--- a/hgext/largefiles/proto.py Wed Apr 11 10:51:38 2018 -0700 +++ b/hgext/largefiles/proto.py Wed Apr 11 12:49:08 2018 -0700 @@ -13,8 +13,8 @@ error, httppeer, util, - wireproto, wireprototypes, + wireprotov1peer, ) from . import ( @@ -145,9 +145,9 @@ self._abort(error.ResponseError(_("unexpected response:"), chunk)) - @wireproto.batchable + @wireprotov1peer.batchable def statlfile(self, sha): - f = wireproto.future() + f = wireprotov1peer.future() result = {'sha': sha} yield result, f try:
--- a/mercurial/httppeer.py Wed Apr 11 10:51:38 2018 -0700 +++ b/mercurial/httppeer.py Wed Apr 11 12:49:08 2018 -0700 @@ -34,6 +34,7 @@ wireproto, wireprotoframing, wireprototypes, + wireprotov1peer, wireprotov2server, ) @@ -382,7 +383,7 @@ return respurl, proto, resp -class httppeer(wireproto.wirepeer): +class httppeer(wireprotov1peer.wirepeer): def __init__(self, ui, path, url, opener, requestbuilder, caps): self.ui = ui self._path = path
--- a/mercurial/sshpeer.py Wed Apr 11 10:51:38 2018 -0700 +++ b/mercurial/sshpeer.py Wed Apr 11 12:49:08 2018 -0700 @@ -18,6 +18,7 @@ wireproto, wireprotoserver, wireprototypes, + wireprotov1peer, ) from .utils import ( procutil, @@ -352,7 +353,7 @@ return protoname, caps -class sshv1peer(wireproto.wirepeer): +class sshv1peer(wireprotov1peer.wirepeer): def __init__(self, ui, url, proc, stdin, stdout, stderr, caps, autoreadstderr=True): """Create a peer from an existing SSH connection. @@ -589,7 +590,7 @@ def instance(ui, path, create): """Create an SSH peer. - The returned object conforms to the ``wireproto.wirepeer`` interface. + The returned object conforms to the ``wireprotov1peer.wirepeer`` interface. """ u = util.url(path, parsequery=False, parsefragment=False) if u.scheme != 'ssh' or not u.host or u.path is None:
--- a/mercurial/wireproto.py Wed Apr 11 10:51:38 2018 -0700 +++ b/mercurial/wireproto.py Wed Apr 11 12:49:08 2018 -0700 @@ -7,13 +7,11 @@ from __future__ import absolute_import -import hashlib import os import tempfile from .i18n import _ from .node import ( - bin, hex, nullid, ) @@ -25,10 +23,8 @@ encoding, error, exchange, - peer, pushkey as pushkeymod, pycompat, - repository, streamclone, util, wireprototypes, @@ -47,92 +43,6 @@ 'IncompatibleClient') bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint) -class remoteiterbatcher(peer.iterbatcher): - def __init__(self, remote): - super(remoteiterbatcher, self).__init__() - self._remote = remote - - def __getattr__(self, name): - # Validate this method is batchable, since submit() only supports - # batchable methods. - fn = getattr(self._remote, name) - if not getattr(fn, 'batchable', None): - raise error.ProgrammingError('Attempted to batch a non-batchable ' - 'call to %r' % name) - - return super(remoteiterbatcher, self).__getattr__(name) - - def submit(self): - """Break the batch request into many patch calls and pipeline them. - - This is mostly valuable over http where request sizes can be - limited, but can be used in other places as well. - """ - # 2-tuple of (command, arguments) that represents what will be - # sent over the wire. - requests = [] - - # 4-tuple of (command, final future, @batchable generator, remote - # future). - results = [] - - for command, args, opts, finalfuture in self.calls: - mtd = getattr(self._remote, command) - batchable = mtd.batchable(mtd.__self__, *args, **opts) - - commandargs, fremote = next(batchable) - assert fremote - requests.append((command, commandargs)) - results.append((command, finalfuture, batchable, fremote)) - - if requests: - self._resultiter = self._remote._submitbatch(requests) - - self._results = results - - def results(self): - for command, finalfuture, batchable, remotefuture in self._results: - # Get the raw result, set it in the remote future, feed it - # back into the @batchable generator so it can be decoded, and - # set the result on the final future to this value. - remoteresult = next(self._resultiter) - remotefuture.set(remoteresult) - finalfuture.set(next(batchable)) - - # Verify our @batchable generators only emit 2 values. - try: - next(batchable) - except StopIteration: - pass - else: - raise error.ProgrammingError('%s @batchable generator emitted ' - 'unexpected value count' % command) - - yield finalfuture.value - -# Forward a couple of names from peer to make wireproto interactions -# slightly more sensible. -batchable = peer.batchable -future = peer.future - - -def encodebatchcmds(req): - """Return a ``cmds`` argument value for the ``batch`` command.""" - escapearg = wireprototypes.escapebatcharg - - cmds = [] - for op, argsdict in req: - # Old servers didn't properly unescape argument names. So prevent - # the sending of argument names that may not be decoded properly by - # servers. - assert all(escapearg(k) == k for k in argsdict) - - args = ','.join('%s=%s' % (escapearg(k), escapearg(v)) - for k, v in argsdict.iteritems()) - cmds.append('%s %s' % (op, args)) - - return ';'.join(cmds) - def clientcompressionsupport(proto): """Returns a list of compression methods supported by the client. @@ -145,315 +55,6 @@ return cap[5:].split(',') return ['zlib', 'none'] -# client side - -class wirepeer(repository.legacypeer): - """Client-side interface for communicating with a peer repository. - - Methods commonly call wire protocol commands of the same name. - - See also httppeer.py and sshpeer.py for protocol-specific - implementations of this interface. - """ - # Begin of ipeercommands interface. - - def iterbatch(self): - return remoteiterbatcher(self) - - @batchable - def lookup(self, key): - self.requirecap('lookup', _('look up remote revision')) - f = future() - yield {'key': encoding.fromlocal(key)}, f - d = f.value - success, data = d[:-1].split(" ", 1) - if int(success): - yield bin(data) - else: - self._abort(error.RepoError(data)) - - @batchable - def heads(self): - f = future() - yield {}, f - d = f.value - try: - yield wireprototypes.decodelist(d[:-1]) - except ValueError: - self._abort(error.ResponseError(_("unexpected response:"), d)) - - @batchable - def known(self, nodes): - f = future() - yield {'nodes': wireprototypes.encodelist(nodes)}, f - d = f.value - try: - yield [bool(int(b)) for b in d] - except ValueError: - self._abort(error.ResponseError(_("unexpected response:"), d)) - - @batchable - def branchmap(self): - f = future() - yield {}, f - d = f.value - try: - branchmap = {} - for branchpart in d.splitlines(): - branchname, branchheads = branchpart.split(' ', 1) - branchname = encoding.tolocal(urlreq.unquote(branchname)) - branchheads = wireprototypes.decodelist(branchheads) - branchmap[branchname] = branchheads - yield branchmap - except TypeError: - self._abort(error.ResponseError(_("unexpected response:"), d)) - - @batchable - def listkeys(self, namespace): - if not self.capable('pushkey'): - yield {}, None - f = future() - self.ui.debug('preparing listkeys for "%s"\n' % namespace) - yield {'namespace': encoding.fromlocal(namespace)}, f - d = f.value - self.ui.debug('received listkey for "%s": %i bytes\n' - % (namespace, len(d))) - yield pushkeymod.decodekeys(d) - - @batchable - def pushkey(self, namespace, key, old, new): - if not self.capable('pushkey'): - yield False, None - f = future() - self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key)) - yield {'namespace': encoding.fromlocal(namespace), - 'key': encoding.fromlocal(key), - 'old': encoding.fromlocal(old), - 'new': encoding.fromlocal(new)}, f - d = f.value - d, output = d.split('\n', 1) - try: - d = bool(int(d)) - except ValueError: - raise error.ResponseError( - _('push failed (unexpected response):'), d) - for l in output.splitlines(True): - self.ui.status(_('remote: '), l) - yield d - - def stream_out(self): - return self._callstream('stream_out') - - def getbundle(self, source, **kwargs): - kwargs = pycompat.byteskwargs(kwargs) - self.requirecap('getbundle', _('look up remote changes')) - opts = {} - bundlecaps = kwargs.get('bundlecaps') or set() - for key, value in kwargs.iteritems(): - if value is None: - continue - keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key) - if keytype is None: - raise error.ProgrammingError( - 'Unexpectedly None keytype for key %s' % key) - elif keytype == 'nodes': - value = wireprototypes.encodelist(value) - elif keytype == 'csv': - value = ','.join(value) - elif keytype == 'scsv': - value = ','.join(sorted(value)) - elif keytype == 'boolean': - value = '%i' % bool(value) - elif keytype != 'plain': - raise KeyError('unknown getbundle option type %s' - % keytype) - opts[key] = value - f = self._callcompressable("getbundle", **pycompat.strkwargs(opts)) - if any((cap.startswith('HG2') for cap in bundlecaps)): - return bundle2.getunbundler(self.ui, f) - else: - return changegroupmod.cg1unpacker(f, 'UN') - - def unbundle(self, cg, heads, url): - '''Send cg (a readable file-like object representing the - changegroup to push, typically a chunkbuffer object) to the - remote server as a bundle. - - When pushing a bundle10 stream, return an integer indicating the - result of the push (see changegroup.apply()). - - When pushing a bundle20 stream, return a bundle20 stream. - - `url` is the url the client thinks it's pushing to, which is - visible to hooks. - ''' - - if heads != ['force'] and self.capable('unbundlehash'): - heads = wireprototypes.encodelist( - ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()]) - else: - heads = wireprototypes.encodelist(heads) - - if util.safehasattr(cg, 'deltaheader'): - # this a bundle10, do the old style call sequence - ret, output = self._callpush("unbundle", cg, heads=heads) - if ret == "": - raise error.ResponseError( - _('push failed:'), output) - try: - ret = int(ret) - except ValueError: - raise error.ResponseError( - _('push failed (unexpected response):'), ret) - - for l in output.splitlines(True): - self.ui.status(_('remote: '), l) - else: - # bundle2 push. Send a stream, fetch a stream. - stream = self._calltwowaystream('unbundle', cg, heads=heads) - ret = bundle2.getunbundler(self.ui, stream) - return ret - - # End of ipeercommands interface. - - # Begin of ipeerlegacycommands interface. - - def branches(self, nodes): - n = wireprototypes.encodelist(nodes) - d = self._call("branches", nodes=n) - try: - br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()] - return br - except ValueError: - self._abort(error.ResponseError(_("unexpected response:"), d)) - - def between(self, pairs): - batch = 8 # avoid giant requests - r = [] - for i in xrange(0, len(pairs), batch): - n = " ".join([wireprototypes.encodelist(p, '-') - for p in pairs[i:i + batch]]) - d = self._call("between", pairs=n) - try: - r.extend(l and wireprototypes.decodelist(l) or [] - for l in d.splitlines()) - except ValueError: - self._abort(error.ResponseError(_("unexpected response:"), d)) - return r - - def changegroup(self, nodes, kind): - n = wireprototypes.encodelist(nodes) - f = self._callcompressable("changegroup", roots=n) - return changegroupmod.cg1unpacker(f, 'UN') - - def changegroupsubset(self, bases, heads, kind): - self.requirecap('changegroupsubset', _('look up remote changes')) - bases = wireprototypes.encodelist(bases) - heads = wireprototypes.encodelist(heads) - f = self._callcompressable("changegroupsubset", - bases=bases, heads=heads) - return changegroupmod.cg1unpacker(f, 'UN') - - # End of ipeerlegacycommands interface. - - def _submitbatch(self, req): - """run batch request <req> on the server - - Returns an iterator of the raw responses from the server. - """ - ui = self.ui - if ui.debugflag and ui.configbool('devel', 'debug.peer-request'): - ui.debug('devel-peer-request: batched-content\n') - for op, args in req: - msg = 'devel-peer-request: - %s (%d arguments)\n' - ui.debug(msg % (op, len(args))) - - unescapearg = wireprototypes.unescapebatcharg - - rsp = self._callstream("batch", cmds=encodebatchcmds(req)) - chunk = rsp.read(1024) - work = [chunk] - while chunk: - while ';' not in chunk and chunk: - chunk = rsp.read(1024) - work.append(chunk) - merged = ''.join(work) - while ';' in merged: - one, merged = merged.split(';', 1) - yield unescapearg(one) - chunk = rsp.read(1024) - work = [merged, chunk] - yield unescapearg(''.join(work)) - - def _submitone(self, op, args): - return self._call(op, **pycompat.strkwargs(args)) - - def debugwireargs(self, one, two, three=None, four=None, five=None): - # don't pass optional arguments left at their default value - opts = {} - if three is not None: - opts[r'three'] = three - if four is not None: - opts[r'four'] = four - return self._call('debugwireargs', one=one, two=two, **opts) - - def _call(self, cmd, **args): - """execute <cmd> on the server - - The command is expected to return a simple string. - - returns the server reply as a string.""" - raise NotImplementedError() - - def _callstream(self, cmd, **args): - """execute <cmd> on the server - - The command is expected to return a stream. Note that if the - command doesn't return a stream, _callstream behaves - differently for ssh and http peers. - - returns the server reply as a file like object. - """ - raise NotImplementedError() - - def _callcompressable(self, cmd, **args): - """execute <cmd> on the server - - The command is expected to return a stream. - - The stream may have been compressed in some implementations. This - function takes care of the decompression. This is the only difference - with _callstream. - - returns the server reply as a file like object. - """ - raise NotImplementedError() - - def _callpush(self, cmd, fp, **args): - """execute a <cmd> on server - - The command is expected to be related to a push. Push has a special - return method. - - returns the server reply as a (ret, output) tuple. ret is either - empty (error) or a stringified int. - """ - raise NotImplementedError() - - def _calltwowaystream(self, cmd, fp, **args): - """execute <cmd> on server - - The command will send a stream to the server and get a stream in reply. - """ - raise NotImplementedError() - - def _abort(self, exception): - """clearly abort the wire protocol connection and raise the exception - """ - raise NotImplementedError() - -# server side - # wire protocol command can either return a string or one of these classes. def getdispatchrepo(repo, proto, command):
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mercurial/wireprotov1peer.py Wed Apr 11 12:49:08 2018 -0700 @@ -0,0 +1,420 @@ +# wireprotov1peer.py - Client-side functionality for wire protocol version 1. +# +# Copyright 2005-2010 Matt Mackall <mpm@selenic.com> +# +# This software may be used and distributed according to the terms of the +# GNU General Public License version 2 or any later version. + +from __future__ import absolute_import + +import hashlib + +from .i18n import _ +from .node import ( + bin, +) + +from . import ( + bundle2, + changegroup as changegroupmod, + encoding, + error, + peer, + pushkey as pushkeymod, + pycompat, + repository, + util, + wireprototypes, +) + +urlreq = util.urlreq + +class remoteiterbatcher(peer.iterbatcher): + def __init__(self, remote): + super(remoteiterbatcher, self).__init__() + self._remote = remote + + def __getattr__(self, name): + # Validate this method is batchable, since submit() only supports + # batchable methods. + fn = getattr(self._remote, name) + if not getattr(fn, 'batchable', None): + raise error.ProgrammingError('Attempted to batch a non-batchable ' + 'call to %r' % name) + + return super(remoteiterbatcher, self).__getattr__(name) + + def submit(self): + """Break the batch request into many patch calls and pipeline them. + + This is mostly valuable over http where request sizes can be + limited, but can be used in other places as well. + """ + # 2-tuple of (command, arguments) that represents what will be + # sent over the wire. + requests = [] + + # 4-tuple of (command, final future, @batchable generator, remote + # future). + results = [] + + for command, args, opts, finalfuture in self.calls: + mtd = getattr(self._remote, command) + batchable = mtd.batchable(mtd.__self__, *args, **opts) + + commandargs, fremote = next(batchable) + assert fremote + requests.append((command, commandargs)) + results.append((command, finalfuture, batchable, fremote)) + + if requests: + self._resultiter = self._remote._submitbatch(requests) + + self._results = results + + def results(self): + for command, finalfuture, batchable, remotefuture in self._results: + # Get the raw result, set it in the remote future, feed it + # back into the @batchable generator so it can be decoded, and + # set the result on the final future to this value. + remoteresult = next(self._resultiter) + remotefuture.set(remoteresult) + finalfuture.set(next(batchable)) + + # Verify our @batchable generators only emit 2 values. + try: + next(batchable) + except StopIteration: + pass + else: + raise error.ProgrammingError('%s @batchable generator emitted ' + 'unexpected value count' % command) + + yield finalfuture.value + +# Forward a couple of names from peer to make wireproto interactions +# slightly more sensible. +batchable = peer.batchable +future = peer.future + +def encodebatchcmds(req): + """Return a ``cmds`` argument value for the ``batch`` command.""" + escapearg = wireprototypes.escapebatcharg + + cmds = [] + for op, argsdict in req: + # Old servers didn't properly unescape argument names. So prevent + # the sending of argument names that may not be decoded properly by + # servers. + assert all(escapearg(k) == k for k in argsdict) + + args = ','.join('%s=%s' % (escapearg(k), escapearg(v)) + for k, v in argsdict.iteritems()) + cmds.append('%s %s' % (op, args)) + + return ';'.join(cmds) + +class wirepeer(repository.legacypeer): + """Client-side interface for communicating with a peer repository. + + Methods commonly call wire protocol commands of the same name. + + See also httppeer.py and sshpeer.py for protocol-specific + implementations of this interface. + """ + # Begin of ipeercommands interface. + + def iterbatch(self): + return remoteiterbatcher(self) + + @batchable + def lookup(self, key): + self.requirecap('lookup', _('look up remote revision')) + f = future() + yield {'key': encoding.fromlocal(key)}, f + d = f.value + success, data = d[:-1].split(" ", 1) + if int(success): + yield bin(data) + else: + self._abort(error.RepoError(data)) + + @batchable + def heads(self): + f = future() + yield {}, f + d = f.value + try: + yield wireprototypes.decodelist(d[:-1]) + except ValueError: + self._abort(error.ResponseError(_("unexpected response:"), d)) + + @batchable + def known(self, nodes): + f = future() + yield {'nodes': wireprototypes.encodelist(nodes)}, f + d = f.value + try: + yield [bool(int(b)) for b in d] + except ValueError: + self._abort(error.ResponseError(_("unexpected response:"), d)) + + @batchable + def branchmap(self): + f = future() + yield {}, f + d = f.value + try: + branchmap = {} + for branchpart in d.splitlines(): + branchname, branchheads = branchpart.split(' ', 1) + branchname = encoding.tolocal(urlreq.unquote(branchname)) + branchheads = wireprototypes.decodelist(branchheads) + branchmap[branchname] = branchheads + yield branchmap + except TypeError: + self._abort(error.ResponseError(_("unexpected response:"), d)) + + @batchable + def listkeys(self, namespace): + if not self.capable('pushkey'): + yield {}, None + f = future() + self.ui.debug('preparing listkeys for "%s"\n' % namespace) + yield {'namespace': encoding.fromlocal(namespace)}, f + d = f.value + self.ui.debug('received listkey for "%s": %i bytes\n' + % (namespace, len(d))) + yield pushkeymod.decodekeys(d) + + @batchable + def pushkey(self, namespace, key, old, new): + if not self.capable('pushkey'): + yield False, None + f = future() + self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key)) + yield {'namespace': encoding.fromlocal(namespace), + 'key': encoding.fromlocal(key), + 'old': encoding.fromlocal(old), + 'new': encoding.fromlocal(new)}, f + d = f.value + d, output = d.split('\n', 1) + try: + d = bool(int(d)) + except ValueError: + raise error.ResponseError( + _('push failed (unexpected response):'), d) + for l in output.splitlines(True): + self.ui.status(_('remote: '), l) + yield d + + def stream_out(self): + return self._callstream('stream_out') + + def getbundle(self, source, **kwargs): + kwargs = pycompat.byteskwargs(kwargs) + self.requirecap('getbundle', _('look up remote changes')) + opts = {} + bundlecaps = kwargs.get('bundlecaps') or set() + for key, value in kwargs.iteritems(): + if value is None: + continue + keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key) + if keytype is None: + raise error.ProgrammingError( + 'Unexpectedly None keytype for key %s' % key) + elif keytype == 'nodes': + value = wireprototypes.encodelist(value) + elif keytype == 'csv': + value = ','.join(value) + elif keytype == 'scsv': + value = ','.join(sorted(value)) + elif keytype == 'boolean': + value = '%i' % bool(value) + elif keytype != 'plain': + raise KeyError('unknown getbundle option type %s' + % keytype) + opts[key] = value + f = self._callcompressable("getbundle", **pycompat.strkwargs(opts)) + if any((cap.startswith('HG2') for cap in bundlecaps)): + return bundle2.getunbundler(self.ui, f) + else: + return changegroupmod.cg1unpacker(f, 'UN') + + def unbundle(self, cg, heads, url): + '''Send cg (a readable file-like object representing the + changegroup to push, typically a chunkbuffer object) to the + remote server as a bundle. + + When pushing a bundle10 stream, return an integer indicating the + result of the push (see changegroup.apply()). + + When pushing a bundle20 stream, return a bundle20 stream. + + `url` is the url the client thinks it's pushing to, which is + visible to hooks. + ''' + + if heads != ['force'] and self.capable('unbundlehash'): + heads = wireprototypes.encodelist( + ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()]) + else: + heads = wireprototypes.encodelist(heads) + + if util.safehasattr(cg, 'deltaheader'): + # this a bundle10, do the old style call sequence + ret, output = self._callpush("unbundle", cg, heads=heads) + if ret == "": + raise error.ResponseError( + _('push failed:'), output) + try: + ret = int(ret) + except ValueError: + raise error.ResponseError( + _('push failed (unexpected response):'), ret) + + for l in output.splitlines(True): + self.ui.status(_('remote: '), l) + else: + # bundle2 push. Send a stream, fetch a stream. + stream = self._calltwowaystream('unbundle', cg, heads=heads) + ret = bundle2.getunbundler(self.ui, stream) + return ret + + # End of ipeercommands interface. + + # Begin of ipeerlegacycommands interface. + + def branches(self, nodes): + n = wireprototypes.encodelist(nodes) + d = self._call("branches", nodes=n) + try: + br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()] + return br + except ValueError: + self._abort(error.ResponseError(_("unexpected response:"), d)) + + def between(self, pairs): + batch = 8 # avoid giant requests + r = [] + for i in xrange(0, len(pairs), batch): + n = " ".join([wireprototypes.encodelist(p, '-') + for p in pairs[i:i + batch]]) + d = self._call("between", pairs=n) + try: + r.extend(l and wireprototypes.decodelist(l) or [] + for l in d.splitlines()) + except ValueError: + self._abort(error.ResponseError(_("unexpected response:"), d)) + return r + + def changegroup(self, nodes, kind): + n = wireprototypes.encodelist(nodes) + f = self._callcompressable("changegroup", roots=n) + return changegroupmod.cg1unpacker(f, 'UN') + + def changegroupsubset(self, bases, heads, kind): + self.requirecap('changegroupsubset', _('look up remote changes')) + bases = wireprototypes.encodelist(bases) + heads = wireprototypes.encodelist(heads) + f = self._callcompressable("changegroupsubset", + bases=bases, heads=heads) + return changegroupmod.cg1unpacker(f, 'UN') + + # End of ipeerlegacycommands interface. + + def _submitbatch(self, req): + """run batch request <req> on the server + + Returns an iterator of the raw responses from the server. + """ + ui = self.ui + if ui.debugflag and ui.configbool('devel', 'debug.peer-request'): + ui.debug('devel-peer-request: batched-content\n') + for op, args in req: + msg = 'devel-peer-request: - %s (%d arguments)\n' + ui.debug(msg % (op, len(args))) + + unescapearg = wireprototypes.unescapebatcharg + + rsp = self._callstream("batch", cmds=encodebatchcmds(req)) + chunk = rsp.read(1024) + work = [chunk] + while chunk: + while ';' not in chunk and chunk: + chunk = rsp.read(1024) + work.append(chunk) + merged = ''.join(work) + while ';' in merged: + one, merged = merged.split(';', 1) + yield unescapearg(one) + chunk = rsp.read(1024) + work = [merged, chunk] + yield unescapearg(''.join(work)) + + def _submitone(self, op, args): + return self._call(op, **pycompat.strkwargs(args)) + + def debugwireargs(self, one, two, three=None, four=None, five=None): + # don't pass optional arguments left at their default value + opts = {} + if three is not None: + opts[r'three'] = three + if four is not None: + opts[r'four'] = four + return self._call('debugwireargs', one=one, two=two, **opts) + + def _call(self, cmd, **args): + """execute <cmd> on the server + + The command is expected to return a simple string. + + returns the server reply as a string.""" + raise NotImplementedError() + + def _callstream(self, cmd, **args): + """execute <cmd> on the server + + The command is expected to return a stream. Note that if the + command doesn't return a stream, _callstream behaves + differently for ssh and http peers. + + returns the server reply as a file like object. + """ + raise NotImplementedError() + + def _callcompressable(self, cmd, **args): + """execute <cmd> on the server + + The command is expected to return a stream. + + The stream may have been compressed in some implementations. This + function takes care of the decompression. This is the only difference + with _callstream. + + returns the server reply as a file like object. + """ + raise NotImplementedError() + + def _callpush(self, cmd, fp, **args): + """execute a <cmd> on server + + The command is expected to be related to a push. Push has a special + return method. + + returns the server reply as a (ret, output) tuple. ret is either + empty (error) or a stringified int. + """ + raise NotImplementedError() + + def _calltwowaystream(self, cmd, fp, **args): + """execute <cmd> on server + + The command will send a stream to the server and get a stream in reply. + """ + raise NotImplementedError() + + def _abort(self, exception): + """clearly abort the wire protocol connection and raise the exception + """ + raise NotImplementedError()
--- a/tests/test-batching.py Wed Apr 11 10:51:38 2018 -0700 +++ b/tests/test-batching.py Wed Apr 11 12:49:08 2018 -0700 @@ -11,7 +11,7 @@ error, peer, util, - wireproto, + wireprotov1peer, ) # equivalent of repo.repository @@ -177,7 +177,7 @@ yield r def batchiter(self): - return wireproto.remoteiterbatcher(self) + return wireprotov1peer.remoteiterbatcher(self) @peer.batchable def foo(self, one, two=None):
--- a/tests/test-wireproto.py Wed Apr 11 10:51:38 2018 -0700 +++ b/tests/test-wireproto.py Wed Apr 11 12:49:08 2018 -0700 @@ -7,6 +7,7 @@ util, wireproto, wireprototypes, + wireprotov1peer, ) stringio = util.stringio @@ -29,7 +30,7 @@ 'version': 1, } -class clientpeer(wireproto.wirepeer): +class clientpeer(wireprotov1peer.wirepeer): def __init__(self, serverrepo, ui): self.serverrepo = serverrepo self.ui = ui @@ -65,9 +66,9 @@ def _callstream(self, cmd, **args): return stringio(self._call(cmd, **args)) - @wireproto.batchable + @wireprotov1peer.batchable def greet(self, name): - f = wireproto.future() + f = wireprotov1peer.future() yield {b'name': mangle(name)}, f yield unmangle(f.value)