changeset 37614:a81d02ea65db

wireproto: move version 1 peer functionality to standalone module (API) wireproto.py contains code for both the client and the server. There *should* be a somewhat strong separation between the two. This commit extracts the client-side code from wireproto.py into a new module - wireprotov1peer. Differential Revision: https://phab.mercurial-scm.org/D3259
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 11 Apr 2018 12:49:08 -0700
parents 96d735601ca1
children f3dc8239e3a9
files hgext/infinitepush/__init__.py hgext/largefiles/proto.py mercurial/httppeer.py mercurial/sshpeer.py mercurial/wireproto.py mercurial/wireprotov1peer.py tests/test-batching.py tests/test-wireproto.py
diffstat 8 files changed, 436 insertions(+), 411 deletions(-) [+]
line wrap: on
line diff
--- a/hgext/infinitepush/__init__.py	Wed Apr 11 10:51:38 2018 -0700
+++ b/hgext/infinitepush/__init__.py	Wed Apr 11 12:49:08 2018 -0700
@@ -128,6 +128,7 @@
     util,
     wireproto,
     wireprototypes,
+    wireprotov1peer,
 )
 
 from . import (
@@ -319,7 +320,7 @@
 
     extensions.wrapfunction(discovery, 'checkheads', _checkheads)
 
-    wireproto.wirepeer.listkeyspatterns = listkeyspatterns
+    wireprotov1peer.wirepeer.listkeyspatterns = listkeyspatterns
 
     partorder = exchange.b2partsgenorder
     index = partorder.index('changeset')
--- a/hgext/largefiles/proto.py	Wed Apr 11 10:51:38 2018 -0700
+++ b/hgext/largefiles/proto.py	Wed Apr 11 12:49:08 2018 -0700
@@ -13,8 +13,8 @@
     error,
     httppeer,
     util,
-    wireproto,
     wireprototypes,
+    wireprotov1peer,
 )
 
 from . import (
@@ -145,9 +145,9 @@
                     self._abort(error.ResponseError(_("unexpected response:"),
                                                     chunk))
 
-        @wireproto.batchable
+        @wireprotov1peer.batchable
         def statlfile(self, sha):
-            f = wireproto.future()
+            f = wireprotov1peer.future()
             result = {'sha': sha}
             yield result, f
             try:
--- a/mercurial/httppeer.py	Wed Apr 11 10:51:38 2018 -0700
+++ b/mercurial/httppeer.py	Wed Apr 11 12:49:08 2018 -0700
@@ -34,6 +34,7 @@
     wireproto,
     wireprotoframing,
     wireprototypes,
+    wireprotov1peer,
     wireprotov2server,
 )
 
@@ -382,7 +383,7 @@
 
     return respurl, proto, resp
 
-class httppeer(wireproto.wirepeer):
+class httppeer(wireprotov1peer.wirepeer):
     def __init__(self, ui, path, url, opener, requestbuilder, caps):
         self.ui = ui
         self._path = path
--- a/mercurial/sshpeer.py	Wed Apr 11 10:51:38 2018 -0700
+++ b/mercurial/sshpeer.py	Wed Apr 11 12:49:08 2018 -0700
@@ -18,6 +18,7 @@
     wireproto,
     wireprotoserver,
     wireprototypes,
+    wireprotov1peer,
 )
 from .utils import (
     procutil,
@@ -352,7 +353,7 @@
 
     return protoname, caps
 
-class sshv1peer(wireproto.wirepeer):
+class sshv1peer(wireprotov1peer.wirepeer):
     def __init__(self, ui, url, proc, stdin, stdout, stderr, caps,
                  autoreadstderr=True):
         """Create a peer from an existing SSH connection.
@@ -589,7 +590,7 @@
 def instance(ui, path, create):
     """Create an SSH peer.
 
-    The returned object conforms to the ``wireproto.wirepeer`` interface.
+    The returned object conforms to the ``wireprotov1peer.wirepeer`` interface.
     """
     u = util.url(path, parsequery=False, parsefragment=False)
     if u.scheme != 'ssh' or not u.host or u.path is None:
--- a/mercurial/wireproto.py	Wed Apr 11 10:51:38 2018 -0700
+++ b/mercurial/wireproto.py	Wed Apr 11 12:49:08 2018 -0700
@@ -7,13 +7,11 @@
 
 from __future__ import absolute_import
 
-import hashlib
 import os
 import tempfile
 
 from .i18n import _
 from .node import (
-    bin,
     hex,
     nullid,
 )
@@ -25,10 +23,8 @@
     encoding,
     error,
     exchange,
-    peer,
     pushkey as pushkeymod,
     pycompat,
-    repository,
     streamclone,
     util,
     wireprototypes,
@@ -47,92 +43,6 @@
                         'IncompatibleClient')
 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
 
-class remoteiterbatcher(peer.iterbatcher):
-    def __init__(self, remote):
-        super(remoteiterbatcher, self).__init__()
-        self._remote = remote
-
-    def __getattr__(self, name):
-        # Validate this method is batchable, since submit() only supports
-        # batchable methods.
-        fn = getattr(self._remote, name)
-        if not getattr(fn, 'batchable', None):
-            raise error.ProgrammingError('Attempted to batch a non-batchable '
-                                         'call to %r' % name)
-
-        return super(remoteiterbatcher, self).__getattr__(name)
-
-    def submit(self):
-        """Break the batch request into many patch calls and pipeline them.
-
-        This is mostly valuable over http where request sizes can be
-        limited, but can be used in other places as well.
-        """
-        # 2-tuple of (command, arguments) that represents what will be
-        # sent over the wire.
-        requests = []
-
-        # 4-tuple of (command, final future, @batchable generator, remote
-        # future).
-        results = []
-
-        for command, args, opts, finalfuture in self.calls:
-            mtd = getattr(self._remote, command)
-            batchable = mtd.batchable(mtd.__self__, *args, **opts)
-
-            commandargs, fremote = next(batchable)
-            assert fremote
-            requests.append((command, commandargs))
-            results.append((command, finalfuture, batchable, fremote))
-
-        if requests:
-            self._resultiter = self._remote._submitbatch(requests)
-
-        self._results = results
-
-    def results(self):
-        for command, finalfuture, batchable, remotefuture in self._results:
-            # Get the raw result, set it in the remote future, feed it
-            # back into the @batchable generator so it can be decoded, and
-            # set the result on the final future to this value.
-            remoteresult = next(self._resultiter)
-            remotefuture.set(remoteresult)
-            finalfuture.set(next(batchable))
-
-            # Verify our @batchable generators only emit 2 values.
-            try:
-                next(batchable)
-            except StopIteration:
-                pass
-            else:
-                raise error.ProgrammingError('%s @batchable generator emitted '
-                                             'unexpected value count' % command)
-
-            yield finalfuture.value
-
-# Forward a couple of names from peer to make wireproto interactions
-# slightly more sensible.
-batchable = peer.batchable
-future = peer.future
-
-
-def encodebatchcmds(req):
-    """Return a ``cmds`` argument value for the ``batch`` command."""
-    escapearg = wireprototypes.escapebatcharg
-
-    cmds = []
-    for op, argsdict in req:
-        # Old servers didn't properly unescape argument names. So prevent
-        # the sending of argument names that may not be decoded properly by
-        # servers.
-        assert all(escapearg(k) == k for k in argsdict)
-
-        args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
-                        for k, v in argsdict.iteritems())
-        cmds.append('%s %s' % (op, args))
-
-    return ';'.join(cmds)
-
 def clientcompressionsupport(proto):
     """Returns a list of compression methods supported by the client.
 
@@ -145,315 +55,6 @@
             return cap[5:].split(',')
     return ['zlib', 'none']
 
-# client side
-
-class wirepeer(repository.legacypeer):
-    """Client-side interface for communicating with a peer repository.
-
-    Methods commonly call wire protocol commands of the same name.
-
-    See also httppeer.py and sshpeer.py for protocol-specific
-    implementations of this interface.
-    """
-    # Begin of ipeercommands interface.
-
-    def iterbatch(self):
-        return remoteiterbatcher(self)
-
-    @batchable
-    def lookup(self, key):
-        self.requirecap('lookup', _('look up remote revision'))
-        f = future()
-        yield {'key': encoding.fromlocal(key)}, f
-        d = f.value
-        success, data = d[:-1].split(" ", 1)
-        if int(success):
-            yield bin(data)
-        else:
-            self._abort(error.RepoError(data))
-
-    @batchable
-    def heads(self):
-        f = future()
-        yield {}, f
-        d = f.value
-        try:
-            yield wireprototypes.decodelist(d[:-1])
-        except ValueError:
-            self._abort(error.ResponseError(_("unexpected response:"), d))
-
-    @batchable
-    def known(self, nodes):
-        f = future()
-        yield {'nodes': wireprototypes.encodelist(nodes)}, f
-        d = f.value
-        try:
-            yield [bool(int(b)) for b in d]
-        except ValueError:
-            self._abort(error.ResponseError(_("unexpected response:"), d))
-
-    @batchable
-    def branchmap(self):
-        f = future()
-        yield {}, f
-        d = f.value
-        try:
-            branchmap = {}
-            for branchpart in d.splitlines():
-                branchname, branchheads = branchpart.split(' ', 1)
-                branchname = encoding.tolocal(urlreq.unquote(branchname))
-                branchheads = wireprototypes.decodelist(branchheads)
-                branchmap[branchname] = branchheads
-            yield branchmap
-        except TypeError:
-            self._abort(error.ResponseError(_("unexpected response:"), d))
-
-    @batchable
-    def listkeys(self, namespace):
-        if not self.capable('pushkey'):
-            yield {}, None
-        f = future()
-        self.ui.debug('preparing listkeys for "%s"\n' % namespace)
-        yield {'namespace': encoding.fromlocal(namespace)}, f
-        d = f.value
-        self.ui.debug('received listkey for "%s": %i bytes\n'
-                      % (namespace, len(d)))
-        yield pushkeymod.decodekeys(d)
-
-    @batchable
-    def pushkey(self, namespace, key, old, new):
-        if not self.capable('pushkey'):
-            yield False, None
-        f = future()
-        self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
-        yield {'namespace': encoding.fromlocal(namespace),
-               'key': encoding.fromlocal(key),
-               'old': encoding.fromlocal(old),
-               'new': encoding.fromlocal(new)}, f
-        d = f.value
-        d, output = d.split('\n', 1)
-        try:
-            d = bool(int(d))
-        except ValueError:
-            raise error.ResponseError(
-                _('push failed (unexpected response):'), d)
-        for l in output.splitlines(True):
-            self.ui.status(_('remote: '), l)
-        yield d
-
-    def stream_out(self):
-        return self._callstream('stream_out')
-
-    def getbundle(self, source, **kwargs):
-        kwargs = pycompat.byteskwargs(kwargs)
-        self.requirecap('getbundle', _('look up remote changes'))
-        opts = {}
-        bundlecaps = kwargs.get('bundlecaps') or set()
-        for key, value in kwargs.iteritems():
-            if value is None:
-                continue
-            keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
-            if keytype is None:
-                raise error.ProgrammingError(
-                    'Unexpectedly None keytype for key %s' % key)
-            elif keytype == 'nodes':
-                value = wireprototypes.encodelist(value)
-            elif keytype == 'csv':
-                value = ','.join(value)
-            elif keytype == 'scsv':
-                value = ','.join(sorted(value))
-            elif keytype == 'boolean':
-                value = '%i' % bool(value)
-            elif keytype != 'plain':
-                raise KeyError('unknown getbundle option type %s'
-                               % keytype)
-            opts[key] = value
-        f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
-        if any((cap.startswith('HG2') for cap in bundlecaps)):
-            return bundle2.getunbundler(self.ui, f)
-        else:
-            return changegroupmod.cg1unpacker(f, 'UN')
-
-    def unbundle(self, cg, heads, url):
-        '''Send cg (a readable file-like object representing the
-        changegroup to push, typically a chunkbuffer object) to the
-        remote server as a bundle.
-
-        When pushing a bundle10 stream, return an integer indicating the
-        result of the push (see changegroup.apply()).
-
-        When pushing a bundle20 stream, return a bundle20 stream.
-
-        `url` is the url the client thinks it's pushing to, which is
-        visible to hooks.
-        '''
-
-        if heads != ['force'] and self.capable('unbundlehash'):
-            heads = wireprototypes.encodelist(
-                ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
-        else:
-            heads = wireprototypes.encodelist(heads)
-
-        if util.safehasattr(cg, 'deltaheader'):
-            # this a bundle10, do the old style call sequence
-            ret, output = self._callpush("unbundle", cg, heads=heads)
-            if ret == "":
-                raise error.ResponseError(
-                    _('push failed:'), output)
-            try:
-                ret = int(ret)
-            except ValueError:
-                raise error.ResponseError(
-                    _('push failed (unexpected response):'), ret)
-
-            for l in output.splitlines(True):
-                self.ui.status(_('remote: '), l)
-        else:
-            # bundle2 push. Send a stream, fetch a stream.
-            stream = self._calltwowaystream('unbundle', cg, heads=heads)
-            ret = bundle2.getunbundler(self.ui, stream)
-        return ret
-
-    # End of ipeercommands interface.
-
-    # Begin of ipeerlegacycommands interface.
-
-    def branches(self, nodes):
-        n = wireprototypes.encodelist(nodes)
-        d = self._call("branches", nodes=n)
-        try:
-            br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
-            return br
-        except ValueError:
-            self._abort(error.ResponseError(_("unexpected response:"), d))
-
-    def between(self, pairs):
-        batch = 8 # avoid giant requests
-        r = []
-        for i in xrange(0, len(pairs), batch):
-            n = " ".join([wireprototypes.encodelist(p, '-')
-                          for p in pairs[i:i + batch]])
-            d = self._call("between", pairs=n)
-            try:
-                r.extend(l and wireprototypes.decodelist(l) or []
-                         for l in d.splitlines())
-            except ValueError:
-                self._abort(error.ResponseError(_("unexpected response:"), d))
-        return r
-
-    def changegroup(self, nodes, kind):
-        n = wireprototypes.encodelist(nodes)
-        f = self._callcompressable("changegroup", roots=n)
-        return changegroupmod.cg1unpacker(f, 'UN')
-
-    def changegroupsubset(self, bases, heads, kind):
-        self.requirecap('changegroupsubset', _('look up remote changes'))
-        bases = wireprototypes.encodelist(bases)
-        heads = wireprototypes.encodelist(heads)
-        f = self._callcompressable("changegroupsubset",
-                                   bases=bases, heads=heads)
-        return changegroupmod.cg1unpacker(f, 'UN')
-
-    # End of ipeerlegacycommands interface.
-
-    def _submitbatch(self, req):
-        """run batch request <req> on the server
-
-        Returns an iterator of the raw responses from the server.
-        """
-        ui = self.ui
-        if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
-            ui.debug('devel-peer-request: batched-content\n')
-            for op, args in req:
-                msg = 'devel-peer-request:    - %s (%d arguments)\n'
-                ui.debug(msg % (op, len(args)))
-
-        unescapearg = wireprototypes.unescapebatcharg
-
-        rsp = self._callstream("batch", cmds=encodebatchcmds(req))
-        chunk = rsp.read(1024)
-        work = [chunk]
-        while chunk:
-            while ';' not in chunk and chunk:
-                chunk = rsp.read(1024)
-                work.append(chunk)
-            merged = ''.join(work)
-            while ';' in merged:
-                one, merged = merged.split(';', 1)
-                yield unescapearg(one)
-            chunk = rsp.read(1024)
-            work = [merged, chunk]
-        yield unescapearg(''.join(work))
-
-    def _submitone(self, op, args):
-        return self._call(op, **pycompat.strkwargs(args))
-
-    def debugwireargs(self, one, two, three=None, four=None, five=None):
-        # don't pass optional arguments left at their default value
-        opts = {}
-        if three is not None:
-            opts[r'three'] = three
-        if four is not None:
-            opts[r'four'] = four
-        return self._call('debugwireargs', one=one, two=two, **opts)
-
-    def _call(self, cmd, **args):
-        """execute <cmd> on the server
-
-        The command is expected to return a simple string.
-
-        returns the server reply as a string."""
-        raise NotImplementedError()
-
-    def _callstream(self, cmd, **args):
-        """execute <cmd> on the server
-
-        The command is expected to return a stream. Note that if the
-        command doesn't return a stream, _callstream behaves
-        differently for ssh and http peers.
-
-        returns the server reply as a file like object.
-        """
-        raise NotImplementedError()
-
-    def _callcompressable(self, cmd, **args):
-        """execute <cmd> on the server
-
-        The command is expected to return a stream.
-
-        The stream may have been compressed in some implementations. This
-        function takes care of the decompression. This is the only difference
-        with _callstream.
-
-        returns the server reply as a file like object.
-        """
-        raise NotImplementedError()
-
-    def _callpush(self, cmd, fp, **args):
-        """execute a <cmd> on server
-
-        The command is expected to be related to a push. Push has a special
-        return method.
-
-        returns the server reply as a (ret, output) tuple. ret is either
-        empty (error) or a stringified int.
-        """
-        raise NotImplementedError()
-
-    def _calltwowaystream(self, cmd, fp, **args):
-        """execute <cmd> on server
-
-        The command will send a stream to the server and get a stream in reply.
-        """
-        raise NotImplementedError()
-
-    def _abort(self, exception):
-        """clearly abort the wire protocol connection and raise the exception
-        """
-        raise NotImplementedError()
-
-# server side
-
 # wire protocol command can either return a string or one of these classes.
 
 def getdispatchrepo(repo, proto, command):
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mercurial/wireprotov1peer.py	Wed Apr 11 12:49:08 2018 -0700
@@ -0,0 +1,420 @@
+# wireprotov1peer.py - Client-side functionality for wire protocol version 1.
+#
+# Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+from __future__ import absolute_import
+
+import hashlib
+
+from .i18n import _
+from .node import (
+    bin,
+)
+
+from . import (
+    bundle2,
+    changegroup as changegroupmod,
+    encoding,
+    error,
+    peer,
+    pushkey as pushkeymod,
+    pycompat,
+    repository,
+    util,
+    wireprototypes,
+)
+
+urlreq = util.urlreq
+
+class remoteiterbatcher(peer.iterbatcher):
+    def __init__(self, remote):
+        super(remoteiterbatcher, self).__init__()
+        self._remote = remote
+
+    def __getattr__(self, name):
+        # Validate this method is batchable, since submit() only supports
+        # batchable methods.
+        fn = getattr(self._remote, name)
+        if not getattr(fn, 'batchable', None):
+            raise error.ProgrammingError('Attempted to batch a non-batchable '
+                                         'call to %r' % name)
+
+        return super(remoteiterbatcher, self).__getattr__(name)
+
+    def submit(self):
+        """Break the batch request into many patch calls and pipeline them.
+
+        This is mostly valuable over http where request sizes can be
+        limited, but can be used in other places as well.
+        """
+        # 2-tuple of (command, arguments) that represents what will be
+        # sent over the wire.
+        requests = []
+
+        # 4-tuple of (command, final future, @batchable generator, remote
+        # future).
+        results = []
+
+        for command, args, opts, finalfuture in self.calls:
+            mtd = getattr(self._remote, command)
+            batchable = mtd.batchable(mtd.__self__, *args, **opts)
+
+            commandargs, fremote = next(batchable)
+            assert fremote
+            requests.append((command, commandargs))
+            results.append((command, finalfuture, batchable, fremote))
+
+        if requests:
+            self._resultiter = self._remote._submitbatch(requests)
+
+        self._results = results
+
+    def results(self):
+        for command, finalfuture, batchable, remotefuture in self._results:
+            # Get the raw result, set it in the remote future, feed it
+            # back into the @batchable generator so it can be decoded, and
+            # set the result on the final future to this value.
+            remoteresult = next(self._resultiter)
+            remotefuture.set(remoteresult)
+            finalfuture.set(next(batchable))
+
+            # Verify our @batchable generators only emit 2 values.
+            try:
+                next(batchable)
+            except StopIteration:
+                pass
+            else:
+                raise error.ProgrammingError('%s @batchable generator emitted '
+                                             'unexpected value count' % command)
+
+            yield finalfuture.value
+
+# Forward a couple of names from peer to make wireproto interactions
+# slightly more sensible.
+batchable = peer.batchable
+future = peer.future
+
+def encodebatchcmds(req):
+    """Return a ``cmds`` argument value for the ``batch`` command."""
+    escapearg = wireprototypes.escapebatcharg
+
+    cmds = []
+    for op, argsdict in req:
+        # Old servers didn't properly unescape argument names. So prevent
+        # the sending of argument names that may not be decoded properly by
+        # servers.
+        assert all(escapearg(k) == k for k in argsdict)
+
+        args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
+                        for k, v in argsdict.iteritems())
+        cmds.append('%s %s' % (op, args))
+
+    return ';'.join(cmds)
+
+class wirepeer(repository.legacypeer):
+    """Client-side interface for communicating with a peer repository.
+
+    Methods commonly call wire protocol commands of the same name.
+
+    See also httppeer.py and sshpeer.py for protocol-specific
+    implementations of this interface.
+    """
+    # Begin of ipeercommands interface.
+
+    def iterbatch(self):
+        return remoteiterbatcher(self)
+
+    @batchable
+    def lookup(self, key):
+        self.requirecap('lookup', _('look up remote revision'))
+        f = future()
+        yield {'key': encoding.fromlocal(key)}, f
+        d = f.value
+        success, data = d[:-1].split(" ", 1)
+        if int(success):
+            yield bin(data)
+        else:
+            self._abort(error.RepoError(data))
+
+    @batchable
+    def heads(self):
+        f = future()
+        yield {}, f
+        d = f.value
+        try:
+            yield wireprototypes.decodelist(d[:-1])
+        except ValueError:
+            self._abort(error.ResponseError(_("unexpected response:"), d))
+
+    @batchable
+    def known(self, nodes):
+        f = future()
+        yield {'nodes': wireprototypes.encodelist(nodes)}, f
+        d = f.value
+        try:
+            yield [bool(int(b)) for b in d]
+        except ValueError:
+            self._abort(error.ResponseError(_("unexpected response:"), d))
+
+    @batchable
+    def branchmap(self):
+        f = future()
+        yield {}, f
+        d = f.value
+        try:
+            branchmap = {}
+            for branchpart in d.splitlines():
+                branchname, branchheads = branchpart.split(' ', 1)
+                branchname = encoding.tolocal(urlreq.unquote(branchname))
+                branchheads = wireprototypes.decodelist(branchheads)
+                branchmap[branchname] = branchheads
+            yield branchmap
+        except TypeError:
+            self._abort(error.ResponseError(_("unexpected response:"), d))
+
+    @batchable
+    def listkeys(self, namespace):
+        if not self.capable('pushkey'):
+            yield {}, None
+        f = future()
+        self.ui.debug('preparing listkeys for "%s"\n' % namespace)
+        yield {'namespace': encoding.fromlocal(namespace)}, f
+        d = f.value
+        self.ui.debug('received listkey for "%s": %i bytes\n'
+                      % (namespace, len(d)))
+        yield pushkeymod.decodekeys(d)
+
+    @batchable
+    def pushkey(self, namespace, key, old, new):
+        if not self.capable('pushkey'):
+            yield False, None
+        f = future()
+        self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
+        yield {'namespace': encoding.fromlocal(namespace),
+               'key': encoding.fromlocal(key),
+               'old': encoding.fromlocal(old),
+               'new': encoding.fromlocal(new)}, f
+        d = f.value
+        d, output = d.split('\n', 1)
+        try:
+            d = bool(int(d))
+        except ValueError:
+            raise error.ResponseError(
+                _('push failed (unexpected response):'), d)
+        for l in output.splitlines(True):
+            self.ui.status(_('remote: '), l)
+        yield d
+
+    def stream_out(self):
+        return self._callstream('stream_out')
+
+    def getbundle(self, source, **kwargs):
+        kwargs = pycompat.byteskwargs(kwargs)
+        self.requirecap('getbundle', _('look up remote changes'))
+        opts = {}
+        bundlecaps = kwargs.get('bundlecaps') or set()
+        for key, value in kwargs.iteritems():
+            if value is None:
+                continue
+            keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
+            if keytype is None:
+                raise error.ProgrammingError(
+                    'Unexpectedly None keytype for key %s' % key)
+            elif keytype == 'nodes':
+                value = wireprototypes.encodelist(value)
+            elif keytype == 'csv':
+                value = ','.join(value)
+            elif keytype == 'scsv':
+                value = ','.join(sorted(value))
+            elif keytype == 'boolean':
+                value = '%i' % bool(value)
+            elif keytype != 'plain':
+                raise KeyError('unknown getbundle option type %s'
+                               % keytype)
+            opts[key] = value
+        f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
+        if any((cap.startswith('HG2') for cap in bundlecaps)):
+            return bundle2.getunbundler(self.ui, f)
+        else:
+            return changegroupmod.cg1unpacker(f, 'UN')
+
+    def unbundle(self, cg, heads, url):
+        '''Send cg (a readable file-like object representing the
+        changegroup to push, typically a chunkbuffer object) to the
+        remote server as a bundle.
+
+        When pushing a bundle10 stream, return an integer indicating the
+        result of the push (see changegroup.apply()).
+
+        When pushing a bundle20 stream, return a bundle20 stream.
+
+        `url` is the url the client thinks it's pushing to, which is
+        visible to hooks.
+        '''
+
+        if heads != ['force'] and self.capable('unbundlehash'):
+            heads = wireprototypes.encodelist(
+                ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
+        else:
+            heads = wireprototypes.encodelist(heads)
+
+        if util.safehasattr(cg, 'deltaheader'):
+            # this a bundle10, do the old style call sequence
+            ret, output = self._callpush("unbundle", cg, heads=heads)
+            if ret == "":
+                raise error.ResponseError(
+                    _('push failed:'), output)
+            try:
+                ret = int(ret)
+            except ValueError:
+                raise error.ResponseError(
+                    _('push failed (unexpected response):'), ret)
+
+            for l in output.splitlines(True):
+                self.ui.status(_('remote: '), l)
+        else:
+            # bundle2 push. Send a stream, fetch a stream.
+            stream = self._calltwowaystream('unbundle', cg, heads=heads)
+            ret = bundle2.getunbundler(self.ui, stream)
+        return ret
+
+    # End of ipeercommands interface.
+
+    # Begin of ipeerlegacycommands interface.
+
+    def branches(self, nodes):
+        n = wireprototypes.encodelist(nodes)
+        d = self._call("branches", nodes=n)
+        try:
+            br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
+            return br
+        except ValueError:
+            self._abort(error.ResponseError(_("unexpected response:"), d))
+
+    def between(self, pairs):
+        batch = 8 # avoid giant requests
+        r = []
+        for i in xrange(0, len(pairs), batch):
+            n = " ".join([wireprototypes.encodelist(p, '-')
+                          for p in pairs[i:i + batch]])
+            d = self._call("between", pairs=n)
+            try:
+                r.extend(l and wireprototypes.decodelist(l) or []
+                         for l in d.splitlines())
+            except ValueError:
+                self._abort(error.ResponseError(_("unexpected response:"), d))
+        return r
+
+    def changegroup(self, nodes, kind):
+        n = wireprototypes.encodelist(nodes)
+        f = self._callcompressable("changegroup", roots=n)
+        return changegroupmod.cg1unpacker(f, 'UN')
+
+    def changegroupsubset(self, bases, heads, kind):
+        self.requirecap('changegroupsubset', _('look up remote changes'))
+        bases = wireprototypes.encodelist(bases)
+        heads = wireprototypes.encodelist(heads)
+        f = self._callcompressable("changegroupsubset",
+                                   bases=bases, heads=heads)
+        return changegroupmod.cg1unpacker(f, 'UN')
+
+    # End of ipeerlegacycommands interface.
+
+    def _submitbatch(self, req):
+        """run batch request <req> on the server
+
+        Returns an iterator of the raw responses from the server.
+        """
+        ui = self.ui
+        if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
+            ui.debug('devel-peer-request: batched-content\n')
+            for op, args in req:
+                msg = 'devel-peer-request:    - %s (%d arguments)\n'
+                ui.debug(msg % (op, len(args)))
+
+        unescapearg = wireprototypes.unescapebatcharg
+
+        rsp = self._callstream("batch", cmds=encodebatchcmds(req))
+        chunk = rsp.read(1024)
+        work = [chunk]
+        while chunk:
+            while ';' not in chunk and chunk:
+                chunk = rsp.read(1024)
+                work.append(chunk)
+            merged = ''.join(work)
+            while ';' in merged:
+                one, merged = merged.split(';', 1)
+                yield unescapearg(one)
+            chunk = rsp.read(1024)
+            work = [merged, chunk]
+        yield unescapearg(''.join(work))
+
+    def _submitone(self, op, args):
+        return self._call(op, **pycompat.strkwargs(args))
+
+    def debugwireargs(self, one, two, three=None, four=None, five=None):
+        # don't pass optional arguments left at their default value
+        opts = {}
+        if three is not None:
+            opts[r'three'] = three
+        if four is not None:
+            opts[r'four'] = four
+        return self._call('debugwireargs', one=one, two=two, **opts)
+
+    def _call(self, cmd, **args):
+        """execute <cmd> on the server
+
+        The command is expected to return a simple string.
+
+        returns the server reply as a string."""
+        raise NotImplementedError()
+
+    def _callstream(self, cmd, **args):
+        """execute <cmd> on the server
+
+        The command is expected to return a stream. Note that if the
+        command doesn't return a stream, _callstream behaves
+        differently for ssh and http peers.
+
+        returns the server reply as a file like object.
+        """
+        raise NotImplementedError()
+
+    def _callcompressable(self, cmd, **args):
+        """execute <cmd> on the server
+
+        The command is expected to return a stream.
+
+        The stream may have been compressed in some implementations. This
+        function takes care of the decompression. This is the only difference
+        with _callstream.
+
+        returns the server reply as a file like object.
+        """
+        raise NotImplementedError()
+
+    def _callpush(self, cmd, fp, **args):
+        """execute a <cmd> on server
+
+        The command is expected to be related to a push. Push has a special
+        return method.
+
+        returns the server reply as a (ret, output) tuple. ret is either
+        empty (error) or a stringified int.
+        """
+        raise NotImplementedError()
+
+    def _calltwowaystream(self, cmd, fp, **args):
+        """execute <cmd> on server
+
+        The command will send a stream to the server and get a stream in reply.
+        """
+        raise NotImplementedError()
+
+    def _abort(self, exception):
+        """clearly abort the wire protocol connection and raise the exception
+        """
+        raise NotImplementedError()
--- a/tests/test-batching.py	Wed Apr 11 10:51:38 2018 -0700
+++ b/tests/test-batching.py	Wed Apr 11 12:49:08 2018 -0700
@@ -11,7 +11,7 @@
     error,
     peer,
     util,
-    wireproto,
+    wireprotov1peer,
 )
 
 # equivalent of repo.repository
@@ -177,7 +177,7 @@
             yield r
 
     def batchiter(self):
-        return wireproto.remoteiterbatcher(self)
+        return wireprotov1peer.remoteiterbatcher(self)
 
     @peer.batchable
     def foo(self, one, two=None):
--- a/tests/test-wireproto.py	Wed Apr 11 10:51:38 2018 -0700
+++ b/tests/test-wireproto.py	Wed Apr 11 12:49:08 2018 -0700
@@ -7,6 +7,7 @@
     util,
     wireproto,
     wireprototypes,
+    wireprotov1peer,
 )
 stringio = util.stringio
 
@@ -29,7 +30,7 @@
     'version': 1,
 }
 
-class clientpeer(wireproto.wirepeer):
+class clientpeer(wireprotov1peer.wirepeer):
     def __init__(self, serverrepo, ui):
         self.serverrepo = serverrepo
         self.ui = ui
@@ -65,9 +66,9 @@
     def _callstream(self, cmd, **args):
         return stringio(self._call(cmd, **args))
 
-    @wireproto.batchable
+    @wireprotov1peer.batchable
     def greet(self, name):
-        f = wireproto.future()
+        f = wireprotov1peer.future()
         yield {b'name': mangle(name)}, f
         yield unmangle(f.value)