wireproto: move version 1 peer functionality to standalone module (API)
wireproto.py contains code for both the client and the server. There
*should* be a somewhat strong separation between the two.
This commit extracts the client-side code from wireproto.py into a new
module - wireprotov1peer.
Differential Revision: https://phab.mercurial-scm.org/D3259
--- a/hgext/infinitepush/__init__.py Wed Apr 11 10:51:38 2018 -0700
+++ b/hgext/infinitepush/__init__.py Wed Apr 11 12:49:08 2018 -0700
@@ -128,6 +128,7 @@
util,
wireproto,
wireprototypes,
+ wireprotov1peer,
)
from . import (
@@ -319,7 +320,7 @@
extensions.wrapfunction(discovery, 'checkheads', _checkheads)
- wireproto.wirepeer.listkeyspatterns = listkeyspatterns
+ wireprotov1peer.wirepeer.listkeyspatterns = listkeyspatterns
partorder = exchange.b2partsgenorder
index = partorder.index('changeset')
--- a/hgext/largefiles/proto.py Wed Apr 11 10:51:38 2018 -0700
+++ b/hgext/largefiles/proto.py Wed Apr 11 12:49:08 2018 -0700
@@ -13,8 +13,8 @@
error,
httppeer,
util,
- wireproto,
wireprototypes,
+ wireprotov1peer,
)
from . import (
@@ -145,9 +145,9 @@
self._abort(error.ResponseError(_("unexpected response:"),
chunk))
- @wireproto.batchable
+ @wireprotov1peer.batchable
def statlfile(self, sha):
- f = wireproto.future()
+ f = wireprotov1peer.future()
result = {'sha': sha}
yield result, f
try:
--- a/mercurial/httppeer.py Wed Apr 11 10:51:38 2018 -0700
+++ b/mercurial/httppeer.py Wed Apr 11 12:49:08 2018 -0700
@@ -34,6 +34,7 @@
wireproto,
wireprotoframing,
wireprototypes,
+ wireprotov1peer,
wireprotov2server,
)
@@ -382,7 +383,7 @@
return respurl, proto, resp
-class httppeer(wireproto.wirepeer):
+class httppeer(wireprotov1peer.wirepeer):
def __init__(self, ui, path, url, opener, requestbuilder, caps):
self.ui = ui
self._path = path
--- a/mercurial/sshpeer.py Wed Apr 11 10:51:38 2018 -0700
+++ b/mercurial/sshpeer.py Wed Apr 11 12:49:08 2018 -0700
@@ -18,6 +18,7 @@
wireproto,
wireprotoserver,
wireprototypes,
+ wireprotov1peer,
)
from .utils import (
procutil,
@@ -352,7 +353,7 @@
return protoname, caps
-class sshv1peer(wireproto.wirepeer):
+class sshv1peer(wireprotov1peer.wirepeer):
def __init__(self, ui, url, proc, stdin, stdout, stderr, caps,
autoreadstderr=True):
"""Create a peer from an existing SSH connection.
@@ -589,7 +590,7 @@
def instance(ui, path, create):
"""Create an SSH peer.
- The returned object conforms to the ``wireproto.wirepeer`` interface.
+ The returned object conforms to the ``wireprotov1peer.wirepeer`` interface.
"""
u = util.url(path, parsequery=False, parsefragment=False)
if u.scheme != 'ssh' or not u.host or u.path is None:
--- a/mercurial/wireproto.py Wed Apr 11 10:51:38 2018 -0700
+++ b/mercurial/wireproto.py Wed Apr 11 12:49:08 2018 -0700
@@ -7,13 +7,11 @@
from __future__ import absolute_import
-import hashlib
import os
import tempfile
from .i18n import _
from .node import (
- bin,
hex,
nullid,
)
@@ -25,10 +23,8 @@
encoding,
error,
exchange,
- peer,
pushkey as pushkeymod,
pycompat,
- repository,
streamclone,
util,
wireprototypes,
@@ -47,92 +43,6 @@
'IncompatibleClient')
bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
-class remoteiterbatcher(peer.iterbatcher):
- def __init__(self, remote):
- super(remoteiterbatcher, self).__init__()
- self._remote = remote
-
- def __getattr__(self, name):
- # Validate this method is batchable, since submit() only supports
- # batchable methods.
- fn = getattr(self._remote, name)
- if not getattr(fn, 'batchable', None):
- raise error.ProgrammingError('Attempted to batch a non-batchable '
- 'call to %r' % name)
-
- return super(remoteiterbatcher, self).__getattr__(name)
-
- def submit(self):
- """Break the batch request into many patch calls and pipeline them.
-
- This is mostly valuable over http where request sizes can be
- limited, but can be used in other places as well.
- """
- # 2-tuple of (command, arguments) that represents what will be
- # sent over the wire.
- requests = []
-
- # 4-tuple of (command, final future, @batchable generator, remote
- # future).
- results = []
-
- for command, args, opts, finalfuture in self.calls:
- mtd = getattr(self._remote, command)
- batchable = mtd.batchable(mtd.__self__, *args, **opts)
-
- commandargs, fremote = next(batchable)
- assert fremote
- requests.append((command, commandargs))
- results.append((command, finalfuture, batchable, fremote))
-
- if requests:
- self._resultiter = self._remote._submitbatch(requests)
-
- self._results = results
-
- def results(self):
- for command, finalfuture, batchable, remotefuture in self._results:
- # Get the raw result, set it in the remote future, feed it
- # back into the @batchable generator so it can be decoded, and
- # set the result on the final future to this value.
- remoteresult = next(self._resultiter)
- remotefuture.set(remoteresult)
- finalfuture.set(next(batchable))
-
- # Verify our @batchable generators only emit 2 values.
- try:
- next(batchable)
- except StopIteration:
- pass
- else:
- raise error.ProgrammingError('%s @batchable generator emitted '
- 'unexpected value count' % command)
-
- yield finalfuture.value
-
-# Forward a couple of names from peer to make wireproto interactions
-# slightly more sensible.
-batchable = peer.batchable
-future = peer.future
-
-
-def encodebatchcmds(req):
- """Return a ``cmds`` argument value for the ``batch`` command."""
- escapearg = wireprototypes.escapebatcharg
-
- cmds = []
- for op, argsdict in req:
- # Old servers didn't properly unescape argument names. So prevent
- # the sending of argument names that may not be decoded properly by
- # servers.
- assert all(escapearg(k) == k for k in argsdict)
-
- args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
- for k, v in argsdict.iteritems())
- cmds.append('%s %s' % (op, args))
-
- return ';'.join(cmds)
-
def clientcompressionsupport(proto):
"""Returns a list of compression methods supported by the client.
@@ -145,315 +55,6 @@
return cap[5:].split(',')
return ['zlib', 'none']
-# client side
-
-class wirepeer(repository.legacypeer):
- """Client-side interface for communicating with a peer repository.
-
- Methods commonly call wire protocol commands of the same name.
-
- See also httppeer.py and sshpeer.py for protocol-specific
- implementations of this interface.
- """
- # Begin of ipeercommands interface.
-
- def iterbatch(self):
- return remoteiterbatcher(self)
-
- @batchable
- def lookup(self, key):
- self.requirecap('lookup', _('look up remote revision'))
- f = future()
- yield {'key': encoding.fromlocal(key)}, f
- d = f.value
- success, data = d[:-1].split(" ", 1)
- if int(success):
- yield bin(data)
- else:
- self._abort(error.RepoError(data))
-
- @batchable
- def heads(self):
- f = future()
- yield {}, f
- d = f.value
- try:
- yield wireprototypes.decodelist(d[:-1])
- except ValueError:
- self._abort(error.ResponseError(_("unexpected response:"), d))
-
- @batchable
- def known(self, nodes):
- f = future()
- yield {'nodes': wireprototypes.encodelist(nodes)}, f
- d = f.value
- try:
- yield [bool(int(b)) for b in d]
- except ValueError:
- self._abort(error.ResponseError(_("unexpected response:"), d))
-
- @batchable
- def branchmap(self):
- f = future()
- yield {}, f
- d = f.value
- try:
- branchmap = {}
- for branchpart in d.splitlines():
- branchname, branchheads = branchpart.split(' ', 1)
- branchname = encoding.tolocal(urlreq.unquote(branchname))
- branchheads = wireprototypes.decodelist(branchheads)
- branchmap[branchname] = branchheads
- yield branchmap
- except TypeError:
- self._abort(error.ResponseError(_("unexpected response:"), d))
-
- @batchable
- def listkeys(self, namespace):
- if not self.capable('pushkey'):
- yield {}, None
- f = future()
- self.ui.debug('preparing listkeys for "%s"\n' % namespace)
- yield {'namespace': encoding.fromlocal(namespace)}, f
- d = f.value
- self.ui.debug('received listkey for "%s": %i bytes\n'
- % (namespace, len(d)))
- yield pushkeymod.decodekeys(d)
-
- @batchable
- def pushkey(self, namespace, key, old, new):
- if not self.capable('pushkey'):
- yield False, None
- f = future()
- self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
- yield {'namespace': encoding.fromlocal(namespace),
- 'key': encoding.fromlocal(key),
- 'old': encoding.fromlocal(old),
- 'new': encoding.fromlocal(new)}, f
- d = f.value
- d, output = d.split('\n', 1)
- try:
- d = bool(int(d))
- except ValueError:
- raise error.ResponseError(
- _('push failed (unexpected response):'), d)
- for l in output.splitlines(True):
- self.ui.status(_('remote: '), l)
- yield d
-
- def stream_out(self):
- return self._callstream('stream_out')
-
- def getbundle(self, source, **kwargs):
- kwargs = pycompat.byteskwargs(kwargs)
- self.requirecap('getbundle', _('look up remote changes'))
- opts = {}
- bundlecaps = kwargs.get('bundlecaps') or set()
- for key, value in kwargs.iteritems():
- if value is None:
- continue
- keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
- if keytype is None:
- raise error.ProgrammingError(
- 'Unexpectedly None keytype for key %s' % key)
- elif keytype == 'nodes':
- value = wireprototypes.encodelist(value)
- elif keytype == 'csv':
- value = ','.join(value)
- elif keytype == 'scsv':
- value = ','.join(sorted(value))
- elif keytype == 'boolean':
- value = '%i' % bool(value)
- elif keytype != 'plain':
- raise KeyError('unknown getbundle option type %s'
- % keytype)
- opts[key] = value
- f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
- if any((cap.startswith('HG2') for cap in bundlecaps)):
- return bundle2.getunbundler(self.ui, f)
- else:
- return changegroupmod.cg1unpacker(f, 'UN')
-
- def unbundle(self, cg, heads, url):
- '''Send cg (a readable file-like object representing the
- changegroup to push, typically a chunkbuffer object) to the
- remote server as a bundle.
-
- When pushing a bundle10 stream, return an integer indicating the
- result of the push (see changegroup.apply()).
-
- When pushing a bundle20 stream, return a bundle20 stream.
-
- `url` is the url the client thinks it's pushing to, which is
- visible to hooks.
- '''
-
- if heads != ['force'] and self.capable('unbundlehash'):
- heads = wireprototypes.encodelist(
- ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
- else:
- heads = wireprototypes.encodelist(heads)
-
- if util.safehasattr(cg, 'deltaheader'):
- # this a bundle10, do the old style call sequence
- ret, output = self._callpush("unbundle", cg, heads=heads)
- if ret == "":
- raise error.ResponseError(
- _('push failed:'), output)
- try:
- ret = int(ret)
- except ValueError:
- raise error.ResponseError(
- _('push failed (unexpected response):'), ret)
-
- for l in output.splitlines(True):
- self.ui.status(_('remote: '), l)
- else:
- # bundle2 push. Send a stream, fetch a stream.
- stream = self._calltwowaystream('unbundle', cg, heads=heads)
- ret = bundle2.getunbundler(self.ui, stream)
- return ret
-
- # End of ipeercommands interface.
-
- # Begin of ipeerlegacycommands interface.
-
- def branches(self, nodes):
- n = wireprototypes.encodelist(nodes)
- d = self._call("branches", nodes=n)
- try:
- br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
- return br
- except ValueError:
- self._abort(error.ResponseError(_("unexpected response:"), d))
-
- def between(self, pairs):
- batch = 8 # avoid giant requests
- r = []
- for i in xrange(0, len(pairs), batch):
- n = " ".join([wireprototypes.encodelist(p, '-')
- for p in pairs[i:i + batch]])
- d = self._call("between", pairs=n)
- try:
- r.extend(l and wireprototypes.decodelist(l) or []
- for l in d.splitlines())
- except ValueError:
- self._abort(error.ResponseError(_("unexpected response:"), d))
- return r
-
- def changegroup(self, nodes, kind):
- n = wireprototypes.encodelist(nodes)
- f = self._callcompressable("changegroup", roots=n)
- return changegroupmod.cg1unpacker(f, 'UN')
-
- def changegroupsubset(self, bases, heads, kind):
- self.requirecap('changegroupsubset', _('look up remote changes'))
- bases = wireprototypes.encodelist(bases)
- heads = wireprototypes.encodelist(heads)
- f = self._callcompressable("changegroupsubset",
- bases=bases, heads=heads)
- return changegroupmod.cg1unpacker(f, 'UN')
-
- # End of ipeerlegacycommands interface.
-
- def _submitbatch(self, req):
- """run batch request <req> on the server
-
- Returns an iterator of the raw responses from the server.
- """
- ui = self.ui
- if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
- ui.debug('devel-peer-request: batched-content\n')
- for op, args in req:
- msg = 'devel-peer-request: - %s (%d arguments)\n'
- ui.debug(msg % (op, len(args)))
-
- unescapearg = wireprototypes.unescapebatcharg
-
- rsp = self._callstream("batch", cmds=encodebatchcmds(req))
- chunk = rsp.read(1024)
- work = [chunk]
- while chunk:
- while ';' not in chunk and chunk:
- chunk = rsp.read(1024)
- work.append(chunk)
- merged = ''.join(work)
- while ';' in merged:
- one, merged = merged.split(';', 1)
- yield unescapearg(one)
- chunk = rsp.read(1024)
- work = [merged, chunk]
- yield unescapearg(''.join(work))
-
- def _submitone(self, op, args):
- return self._call(op, **pycompat.strkwargs(args))
-
- def debugwireargs(self, one, two, three=None, four=None, five=None):
- # don't pass optional arguments left at their default value
- opts = {}
- if three is not None:
- opts[r'three'] = three
- if four is not None:
- opts[r'four'] = four
- return self._call('debugwireargs', one=one, two=two, **opts)
-
- def _call(self, cmd, **args):
- """execute <cmd> on the server
-
- The command is expected to return a simple string.
-
- returns the server reply as a string."""
- raise NotImplementedError()
-
- def _callstream(self, cmd, **args):
- """execute <cmd> on the server
-
- The command is expected to return a stream. Note that if the
- command doesn't return a stream, _callstream behaves
- differently for ssh and http peers.
-
- returns the server reply as a file like object.
- """
- raise NotImplementedError()
-
- def _callcompressable(self, cmd, **args):
- """execute <cmd> on the server
-
- The command is expected to return a stream.
-
- The stream may have been compressed in some implementations. This
- function takes care of the decompression. This is the only difference
- with _callstream.
-
- returns the server reply as a file like object.
- """
- raise NotImplementedError()
-
- def _callpush(self, cmd, fp, **args):
- """execute a <cmd> on server
-
- The command is expected to be related to a push. Push has a special
- return method.
-
- returns the server reply as a (ret, output) tuple. ret is either
- empty (error) or a stringified int.
- """
- raise NotImplementedError()
-
- def _calltwowaystream(self, cmd, fp, **args):
- """execute <cmd> on server
-
- The command will send a stream to the server and get a stream in reply.
- """
- raise NotImplementedError()
-
- def _abort(self, exception):
- """clearly abort the wire protocol connection and raise the exception
- """
- raise NotImplementedError()
-
-# server side
-
# wire protocol command can either return a string or one of these classes.
def getdispatchrepo(repo, proto, command):
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/mercurial/wireprotov1peer.py Wed Apr 11 12:49:08 2018 -0700
@@ -0,0 +1,420 @@
+# wireprotov1peer.py - Client-side functionality for wire protocol version 1.
+#
+# Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+from __future__ import absolute_import
+
+import hashlib
+
+from .i18n import _
+from .node import (
+ bin,
+)
+
+from . import (
+ bundle2,
+ changegroup as changegroupmod,
+ encoding,
+ error,
+ peer,
+ pushkey as pushkeymod,
+ pycompat,
+ repository,
+ util,
+ wireprototypes,
+)
+
+urlreq = util.urlreq
+
+class remoteiterbatcher(peer.iterbatcher):
+ def __init__(self, remote):
+ super(remoteiterbatcher, self).__init__()
+ self._remote = remote
+
+ def __getattr__(self, name):
+ # Validate this method is batchable, since submit() only supports
+ # batchable methods.
+ fn = getattr(self._remote, name)
+ if not getattr(fn, 'batchable', None):
+ raise error.ProgrammingError('Attempted to batch a non-batchable '
+ 'call to %r' % name)
+
+ return super(remoteiterbatcher, self).__getattr__(name)
+
+ def submit(self):
+ """Break the batch request into many patch calls and pipeline them.
+
+ This is mostly valuable over http where request sizes can be
+ limited, but can be used in other places as well.
+ """
+ # 2-tuple of (command, arguments) that represents what will be
+ # sent over the wire.
+ requests = []
+
+ # 4-tuple of (command, final future, @batchable generator, remote
+ # future).
+ results = []
+
+ for command, args, opts, finalfuture in self.calls:
+ mtd = getattr(self._remote, command)
+ batchable = mtd.batchable(mtd.__self__, *args, **opts)
+
+ commandargs, fremote = next(batchable)
+ assert fremote
+ requests.append((command, commandargs))
+ results.append((command, finalfuture, batchable, fremote))
+
+ if requests:
+ self._resultiter = self._remote._submitbatch(requests)
+
+ self._results = results
+
+ def results(self):
+ for command, finalfuture, batchable, remotefuture in self._results:
+ # Get the raw result, set it in the remote future, feed it
+ # back into the @batchable generator so it can be decoded, and
+ # set the result on the final future to this value.
+ remoteresult = next(self._resultiter)
+ remotefuture.set(remoteresult)
+ finalfuture.set(next(batchable))
+
+ # Verify our @batchable generators only emit 2 values.
+ try:
+ next(batchable)
+ except StopIteration:
+ pass
+ else:
+ raise error.ProgrammingError('%s @batchable generator emitted '
+ 'unexpected value count' % command)
+
+ yield finalfuture.value
+
+# Forward a couple of names from peer to make wireproto interactions
+# slightly more sensible.
+batchable = peer.batchable
+future = peer.future
+
+def encodebatchcmds(req):
+ """Return a ``cmds`` argument value for the ``batch`` command."""
+ escapearg = wireprototypes.escapebatcharg
+
+ cmds = []
+ for op, argsdict in req:
+ # Old servers didn't properly unescape argument names. So prevent
+ # the sending of argument names that may not be decoded properly by
+ # servers.
+ assert all(escapearg(k) == k for k in argsdict)
+
+ args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
+ for k, v in argsdict.iteritems())
+ cmds.append('%s %s' % (op, args))
+
+ return ';'.join(cmds)
+
+class wirepeer(repository.legacypeer):
+ """Client-side interface for communicating with a peer repository.
+
+ Methods commonly call wire protocol commands of the same name.
+
+ See also httppeer.py and sshpeer.py for protocol-specific
+ implementations of this interface.
+ """
+ # Begin of ipeercommands interface.
+
+ def iterbatch(self):
+ return remoteiterbatcher(self)
+
+ @batchable
+ def lookup(self, key):
+ self.requirecap('lookup', _('look up remote revision'))
+ f = future()
+ yield {'key': encoding.fromlocal(key)}, f
+ d = f.value
+ success, data = d[:-1].split(" ", 1)
+ if int(success):
+ yield bin(data)
+ else:
+ self._abort(error.RepoError(data))
+
+ @batchable
+ def heads(self):
+ f = future()
+ yield {}, f
+ d = f.value
+ try:
+ yield wireprototypes.decodelist(d[:-1])
+ except ValueError:
+ self._abort(error.ResponseError(_("unexpected response:"), d))
+
+ @batchable
+ def known(self, nodes):
+ f = future()
+ yield {'nodes': wireprototypes.encodelist(nodes)}, f
+ d = f.value
+ try:
+ yield [bool(int(b)) for b in d]
+ except ValueError:
+ self._abort(error.ResponseError(_("unexpected response:"), d))
+
+ @batchable
+ def branchmap(self):
+ f = future()
+ yield {}, f
+ d = f.value
+ try:
+ branchmap = {}
+ for branchpart in d.splitlines():
+ branchname, branchheads = branchpart.split(' ', 1)
+ branchname = encoding.tolocal(urlreq.unquote(branchname))
+ branchheads = wireprototypes.decodelist(branchheads)
+ branchmap[branchname] = branchheads
+ yield branchmap
+ except TypeError:
+ self._abort(error.ResponseError(_("unexpected response:"), d))
+
+ @batchable
+ def listkeys(self, namespace):
+ if not self.capable('pushkey'):
+ yield {}, None
+ f = future()
+ self.ui.debug('preparing listkeys for "%s"\n' % namespace)
+ yield {'namespace': encoding.fromlocal(namespace)}, f
+ d = f.value
+ self.ui.debug('received listkey for "%s": %i bytes\n'
+ % (namespace, len(d)))
+ yield pushkeymod.decodekeys(d)
+
+ @batchable
+ def pushkey(self, namespace, key, old, new):
+ if not self.capable('pushkey'):
+ yield False, None
+ f = future()
+ self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
+ yield {'namespace': encoding.fromlocal(namespace),
+ 'key': encoding.fromlocal(key),
+ 'old': encoding.fromlocal(old),
+ 'new': encoding.fromlocal(new)}, f
+ d = f.value
+ d, output = d.split('\n', 1)
+ try:
+ d = bool(int(d))
+ except ValueError:
+ raise error.ResponseError(
+ _('push failed (unexpected response):'), d)
+ for l in output.splitlines(True):
+ self.ui.status(_('remote: '), l)
+ yield d
+
+ def stream_out(self):
+ return self._callstream('stream_out')
+
+ def getbundle(self, source, **kwargs):
+ kwargs = pycompat.byteskwargs(kwargs)
+ self.requirecap('getbundle', _('look up remote changes'))
+ opts = {}
+ bundlecaps = kwargs.get('bundlecaps') or set()
+ for key, value in kwargs.iteritems():
+ if value is None:
+ continue
+ keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
+ if keytype is None:
+ raise error.ProgrammingError(
+ 'Unexpectedly None keytype for key %s' % key)
+ elif keytype == 'nodes':
+ value = wireprototypes.encodelist(value)
+ elif keytype == 'csv':
+ value = ','.join(value)
+ elif keytype == 'scsv':
+ value = ','.join(sorted(value))
+ elif keytype == 'boolean':
+ value = '%i' % bool(value)
+ elif keytype != 'plain':
+ raise KeyError('unknown getbundle option type %s'
+ % keytype)
+ opts[key] = value
+ f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
+ if any((cap.startswith('HG2') for cap in bundlecaps)):
+ return bundle2.getunbundler(self.ui, f)
+ else:
+ return changegroupmod.cg1unpacker(f, 'UN')
+
+ def unbundle(self, cg, heads, url):
+ '''Send cg (a readable file-like object representing the
+ changegroup to push, typically a chunkbuffer object) to the
+ remote server as a bundle.
+
+ When pushing a bundle10 stream, return an integer indicating the
+ result of the push (see changegroup.apply()).
+
+ When pushing a bundle20 stream, return a bundle20 stream.
+
+ `url` is the url the client thinks it's pushing to, which is
+ visible to hooks.
+ '''
+
+ if heads != ['force'] and self.capable('unbundlehash'):
+ heads = wireprototypes.encodelist(
+ ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
+ else:
+ heads = wireprototypes.encodelist(heads)
+
+ if util.safehasattr(cg, 'deltaheader'):
+ # this a bundle10, do the old style call sequence
+ ret, output = self._callpush("unbundle", cg, heads=heads)
+ if ret == "":
+ raise error.ResponseError(
+ _('push failed:'), output)
+ try:
+ ret = int(ret)
+ except ValueError:
+ raise error.ResponseError(
+ _('push failed (unexpected response):'), ret)
+
+ for l in output.splitlines(True):
+ self.ui.status(_('remote: '), l)
+ else:
+ # bundle2 push. Send a stream, fetch a stream.
+ stream = self._calltwowaystream('unbundle', cg, heads=heads)
+ ret = bundle2.getunbundler(self.ui, stream)
+ return ret
+
+ # End of ipeercommands interface.
+
+ # Begin of ipeerlegacycommands interface.
+
+ def branches(self, nodes):
+ n = wireprototypes.encodelist(nodes)
+ d = self._call("branches", nodes=n)
+ try:
+ br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
+ return br
+ except ValueError:
+ self._abort(error.ResponseError(_("unexpected response:"), d))
+
+ def between(self, pairs):
+ batch = 8 # avoid giant requests
+ r = []
+ for i in xrange(0, len(pairs), batch):
+ n = " ".join([wireprototypes.encodelist(p, '-')
+ for p in pairs[i:i + batch]])
+ d = self._call("between", pairs=n)
+ try:
+ r.extend(l and wireprototypes.decodelist(l) or []
+ for l in d.splitlines())
+ except ValueError:
+ self._abort(error.ResponseError(_("unexpected response:"), d))
+ return r
+
+ def changegroup(self, nodes, kind):
+ n = wireprototypes.encodelist(nodes)
+ f = self._callcompressable("changegroup", roots=n)
+ return changegroupmod.cg1unpacker(f, 'UN')
+
+ def changegroupsubset(self, bases, heads, kind):
+ self.requirecap('changegroupsubset', _('look up remote changes'))
+ bases = wireprototypes.encodelist(bases)
+ heads = wireprototypes.encodelist(heads)
+ f = self._callcompressable("changegroupsubset",
+ bases=bases, heads=heads)
+ return changegroupmod.cg1unpacker(f, 'UN')
+
+ # End of ipeerlegacycommands interface.
+
+ def _submitbatch(self, req):
+ """run batch request <req> on the server
+
+ Returns an iterator of the raw responses from the server.
+ """
+ ui = self.ui
+ if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
+ ui.debug('devel-peer-request: batched-content\n')
+ for op, args in req:
+ msg = 'devel-peer-request: - %s (%d arguments)\n'
+ ui.debug(msg % (op, len(args)))
+
+ unescapearg = wireprototypes.unescapebatcharg
+
+ rsp = self._callstream("batch", cmds=encodebatchcmds(req))
+ chunk = rsp.read(1024)
+ work = [chunk]
+ while chunk:
+ while ';' not in chunk and chunk:
+ chunk = rsp.read(1024)
+ work.append(chunk)
+ merged = ''.join(work)
+ while ';' in merged:
+ one, merged = merged.split(';', 1)
+ yield unescapearg(one)
+ chunk = rsp.read(1024)
+ work = [merged, chunk]
+ yield unescapearg(''.join(work))
+
+ def _submitone(self, op, args):
+ return self._call(op, **pycompat.strkwargs(args))
+
+ def debugwireargs(self, one, two, three=None, four=None, five=None):
+ # don't pass optional arguments left at their default value
+ opts = {}
+ if three is not None:
+ opts[r'three'] = three
+ if four is not None:
+ opts[r'four'] = four
+ return self._call('debugwireargs', one=one, two=two, **opts)
+
+ def _call(self, cmd, **args):
+ """execute <cmd> on the server
+
+ The command is expected to return a simple string.
+
+ returns the server reply as a string."""
+ raise NotImplementedError()
+
+ def _callstream(self, cmd, **args):
+ """execute <cmd> on the server
+
+ The command is expected to return a stream. Note that if the
+ command doesn't return a stream, _callstream behaves
+ differently for ssh and http peers.
+
+ returns the server reply as a file like object.
+ """
+ raise NotImplementedError()
+
+ def _callcompressable(self, cmd, **args):
+ """execute <cmd> on the server
+
+ The command is expected to return a stream.
+
+ The stream may have been compressed in some implementations. This
+ function takes care of the decompression. This is the only difference
+ with _callstream.
+
+ returns the server reply as a file like object.
+ """
+ raise NotImplementedError()
+
+ def _callpush(self, cmd, fp, **args):
+ """execute a <cmd> on server
+
+ The command is expected to be related to a push. Push has a special
+ return method.
+
+ returns the server reply as a (ret, output) tuple. ret is either
+ empty (error) or a stringified int.
+ """
+ raise NotImplementedError()
+
+ def _calltwowaystream(self, cmd, fp, **args):
+ """execute <cmd> on server
+
+ The command will send a stream to the server and get a stream in reply.
+ """
+ raise NotImplementedError()
+
+ def _abort(self, exception):
+ """clearly abort the wire protocol connection and raise the exception
+ """
+ raise NotImplementedError()
--- a/tests/test-batching.py Wed Apr 11 10:51:38 2018 -0700
+++ b/tests/test-batching.py Wed Apr 11 12:49:08 2018 -0700
@@ -11,7 +11,7 @@
error,
peer,
util,
- wireproto,
+ wireprotov1peer,
)
# equivalent of repo.repository
@@ -177,7 +177,7 @@
yield r
def batchiter(self):
- return wireproto.remoteiterbatcher(self)
+ return wireprotov1peer.remoteiterbatcher(self)
@peer.batchable
def foo(self, one, two=None):
--- a/tests/test-wireproto.py Wed Apr 11 10:51:38 2018 -0700
+++ b/tests/test-wireproto.py Wed Apr 11 12:49:08 2018 -0700
@@ -7,6 +7,7 @@
util,
wireproto,
wireprototypes,
+ wireprotov1peer,
)
stringio = util.stringio
@@ -29,7 +30,7 @@
'version': 1,
}
-class clientpeer(wireproto.wirepeer):
+class clientpeer(wireprotov1peer.wirepeer):
def __init__(self, serverrepo, ui):
self.serverrepo = serverrepo
self.ui = ui
@@ -65,9 +66,9 @@
def _callstream(self, cmd, **args):
return stringio(self._call(cmd, **args))
- @wireproto.batchable
+ @wireprotov1peer.batchable
def greet(self, name):
- f = wireproto.future()
+ f = wireprotov1peer.future()
yield {b'name': mangle(name)}, f
yield unmangle(f.value)