--- a/mercurial/wireprotov1peer.py Sat Oct 05 10:29:34 2019 -0400
+++ b/mercurial/wireprotov1peer.py Sun Oct 06 09:45:02 2019 -0400
@@ -12,9 +12,7 @@
import weakref
from .i18n import _
-from .node import (
- bin,
-)
+from .node import bin
from . import (
bundle2,
changegroup as changegroupmod,
@@ -32,6 +30,7 @@
urlreq = util.urlreq
+
def batchable(f):
'''annotation for batchable methods
@@ -54,26 +53,31 @@
which is used by remotebatch to split the call into separate encoding and
decoding phases.
'''
+
def plain(*args, **opts):
batchable = f(*args, **opts)
encargsorres, encresref = next(batchable)
if not encresref:
- return encargsorres # a local result in this case
+ return encargsorres # a local result in this case
self = args[0]
cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
encresref.set(self._submitone(cmd, encargsorres))
return next(batchable)
+
setattr(plain, 'batchable', f)
setattr(plain, '__name__', f.__name__)
return plain
+
class future(object):
'''placeholder for a value to be set later'''
+
def set(self, value):
if util.safehasattr(self, 'value'):
raise error.RepoError("future is already set")
self.value = value
+
def encodebatchcmds(req):
"""Return a ``cmds`` argument value for the ``batch`` command."""
escapearg = wireprototypes.escapebatcharg
@@ -85,12 +89,15 @@
# servers.
assert all(escapearg(k) == k for k in argsdict)
- args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
- for k, v in argsdict.iteritems())
+ args = ','.join(
+ '%s=%s' % (escapearg(k), escapearg(v))
+ for k, v in argsdict.iteritems()
+ )
cmds.append('%s %s' % (op, args))
return ';'.join(cmds)
+
class unsentfuture(pycompat.futures.Future):
"""A Future variation to represent an unsent command.
@@ -111,6 +118,7 @@
# on that.
return self.result(timeout)
+
@interfaceutil.implementer(repository.ipeercommandexecutor)
class peerexecutor(object):
def __init__(self, peer):
@@ -130,12 +138,14 @@
def callcommand(self, command, args):
if self._sent:
- raise error.ProgrammingError('callcommand() cannot be used '
- 'after commands are sent')
+ raise error.ProgrammingError(
+ 'callcommand() cannot be used ' 'after commands are sent'
+ )
if self._closed:
- raise error.ProgrammingError('callcommand() cannot be used '
- 'after close()')
+ raise error.ProgrammingError(
+ 'callcommand() cannot be used ' 'after close()'
+ )
# Commands are dispatched through methods on the peer.
fn = getattr(self._peer, pycompat.sysstr(command), None)
@@ -143,7 +153,8 @@
if not fn:
raise error.ProgrammingError(
'cannot call command %s: method of same name not available '
- 'on peer' % command)
+ 'on peer' % command
+ )
# Commands are either batchable or they aren't. If a command
# isn't batchable, we send it immediately because the executor
@@ -169,7 +180,8 @@
if self._calls:
raise error.ProgrammingError(
'%s is not batchable and cannot be called on a command '
- 'executor along with other commands' % command)
+ 'executor along with other commands' % command
+ )
f = addcall()
@@ -232,8 +244,9 @@
continue
try:
- batchable = fn.batchable(fn.__self__,
- **pycompat.strkwargs(args))
+ batchable = fn.batchable(
+ fn.__self__, **pycompat.strkwargs(args)
+ )
except Exception:
pycompat.future_set_exception_info(f, sys.exc_info()[1:])
return
@@ -263,8 +276,9 @@
# concurrent.futures already solves these problems and its thread pool
# executor has minimal overhead. So we use it.
self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
- self._responsef = self._responseexecutor.submit(self._readbatchresponse,
- states, wireresults)
+ self._responsef = self._responseexecutor.submit(
+ self._readbatchresponse, states, wireresults
+ )
def close(self):
self.sendcommands()
@@ -290,8 +304,11 @@
# errored. Otherwise a result() could wait indefinitely.
for f in self._futures:
if not f.done():
- f.set_exception(error.ResponseError(
- _('unfulfilled batch command response')))
+ f.set_exception(
+ error.ResponseError(
+ _('unfulfilled batch command response')
+ )
+ )
self._futures = None
@@ -312,8 +329,10 @@
else:
f.set_result(result)
-@interfaceutil.implementer(repository.ipeercommands,
- repository.ipeerlegacycommands)
+
+@interfaceutil.implementer(
+ repository.ipeercommands, repository.ipeerlegacycommands
+)
class wirepeer(repository.peer):
"""Client-side interface for communicating with a peer repository.
@@ -322,6 +341,7 @@
See also httppeer.py and sshpeer.py for protocol-specific
implementations of this interface.
"""
+
def commandexecutor(self):
return peerexecutor(self)
@@ -387,8 +407,9 @@
self.ui.debug('preparing listkeys for "%s"\n' % namespace)
yield {'namespace': encoding.fromlocal(namespace)}, f
d = f.value
- self.ui.debug('received listkey for "%s": %i bytes\n'
- % (namespace, len(d)))
+ self.ui.debug(
+ 'received listkey for "%s": %i bytes\n' % (namespace, len(d))
+ )
yield pushkeymod.decodekeys(d)
@batchable
@@ -397,17 +418,20 @@
yield False, None
f = future()
self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
- yield {'namespace': encoding.fromlocal(namespace),
- 'key': encoding.fromlocal(key),
- 'old': encoding.fromlocal(old),
- 'new': encoding.fromlocal(new)}, f
+ yield {
+ 'namespace': encoding.fromlocal(namespace),
+ 'key': encoding.fromlocal(key),
+ 'old': encoding.fromlocal(old),
+ 'new': encoding.fromlocal(new),
+ }, f
d = f.value
d, output = d.split('\n', 1)
try:
d = bool(int(d))
except ValueError:
raise error.ResponseError(
- _('push failed (unexpected response):'), d)
+ _('push failed (unexpected response):'), d
+ )
for l in output.splitlines(True):
self.ui.status(_('remote: '), l)
yield d
@@ -426,7 +450,8 @@
keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
if keytype is None:
raise error.ProgrammingError(
- 'Unexpectedly None keytype for key %s' % key)
+ 'Unexpectedly None keytype for key %s' % key
+ )
elif keytype == 'nodes':
value = wireprototypes.encodelist(value)
elif keytype == 'csv':
@@ -436,8 +461,7 @@
elif keytype == 'boolean':
value = '%i' % bool(value)
elif keytype != 'plain':
- raise KeyError('unknown getbundle option type %s'
- % keytype)
+ raise KeyError('unknown getbundle option type %s' % keytype)
opts[key] = value
f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
if any((cap.startswith('HG2') for cap in bundlecaps)):
@@ -461,7 +485,8 @@
if heads != ['force'] and self.capable('unbundlehash'):
heads = wireprototypes.encodelist(
- ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
+ ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()]
+ )
else:
heads = wireprototypes.encodelist(heads)
@@ -469,13 +494,13 @@
# this a bundle10, do the old style call sequence
ret, output = self._callpush("unbundle", bundle, heads=heads)
if ret == "":
- raise error.ResponseError(
- _('push failed:'), output)
+ raise error.ResponseError(_('push failed:'), output)
try:
ret = int(ret)
except ValueError:
raise error.ResponseError(
- _('push failed (unexpected response):'), ret)
+ _('push failed (unexpected response):'), ret
+ )
for l in output.splitlines(True):
self.ui.status(_('remote: '), l)
@@ -499,15 +524,21 @@
self._abort(error.ResponseError(_("unexpected response:"), d))
def between(self, pairs):
- batch = 8 # avoid giant requests
+ batch = 8 # avoid giant requests
r = []
for i in pycompat.xrange(0, len(pairs), batch):
- n = " ".join([wireprototypes.encodelist(p, '-')
- for p in pairs[i:i + batch]])
+ n = " ".join(
+ [
+ wireprototypes.encodelist(p, '-')
+ for p in pairs[i : i + batch]
+ ]
+ )
d = self._call("between", pairs=n)
try:
- r.extend(l and wireprototypes.decodelist(l) or []
- for l in d.splitlines())
+ r.extend(
+ l and wireprototypes.decodelist(l) or []
+ for l in d.splitlines()
+ )
except ValueError:
self._abort(error.ResponseError(_("unexpected response:"), d))
return r
@@ -521,8 +552,9 @@
self.requirecap('changegroupsubset', _('look up remote changes'))
bases = wireprototypes.encodelist(bases)
heads = wireprototypes.encodelist(heads)
- f = self._callcompressable("changegroupsubset",
- bases=bases, heads=heads)
+ f = self._callcompressable(
+ "changegroupsubset", bases=bases, heads=heads
+ )
return changegroupmod.cg1unpacker(f, 'UN')
# End of ipeerlegacycommands interface.