--- a/mercurial/wireprotov1peer.py Fri Apr 13 10:23:05 2018 -0700
+++ b/mercurial/wireprotov1peer.py Fri Apr 13 10:51:23 2018 -0700
@@ -8,12 +8,15 @@
from __future__ import absolute_import
import hashlib
+import sys
from .i18n import _
from .node import (
bin,
)
-
+from .thirdparty.zope import (
+ interface as zi,
+)
from . import (
bundle2,
changegroup as changegroupmod,
@@ -177,6 +180,93 @@
return ';'.join(cmds)
+@zi.implementer(repository.ipeercommandexecutor)
+class peerexecutor(object):
+ def __init__(self, peer):
+ self._peer = peer
+ self._sent = False
+ self._closed = False
+ self._calls = []
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exctype, excvalee, exctb):
+ self.close()
+
+ def callcommand(self, command, args):
+ if self._sent:
+ raise error.ProgrammingError('callcommand() cannot be used '
+ 'after commands are sent')
+
+ if self._closed:
+ raise error.ProgrammingError('callcommand() cannot be used '
+ 'after close()')
+
+ # Commands are dispatched through methods on the peer.
+ fn = getattr(self._peer, pycompat.sysstr(command), None)
+
+ if not fn:
+ raise error.ProgrammingError(
+ 'cannot call command %s: method of same name not available '
+ 'on peer' % command)
+
+ # Commands are either batchable or they aren't. If a command
+ # isn't batchable, we send it immediately because the executor
+ # can no longer accept new commands after a non-batchable command.
+ # If a command is batchable, we queue it for later.
+
+ if getattr(fn, 'batchable', False):
+ pass
+ else:
+ if self._calls:
+ raise error.ProgrammingError(
+ '%s is not batchable and cannot be called on a command '
+ 'executor along with other commands' % command)
+
+ # We don't support batching yet. So resolve it immediately.
+ f = pycompat.futures.Future()
+ self._calls.append((command, args, fn, f))
+ self.sendcommands()
+ return f
+
+ def sendcommands(self):
+ if self._sent:
+ return
+
+ if not self._calls:
+ return
+
+ self._sent = True
+
+ calls = self._calls
+ # Mainly to destroy references to futures.
+ self._calls = None
+
+ if len(calls) == 1:
+ command, args, fn, f = calls[0]
+
+ # Future was cancelled. Ignore it.
+ if not f.set_running_or_notify_cancel():
+ return
+
+ try:
+ result = fn(**pycompat.strkwargs(args))
+ except Exception:
+ f.set_exception_info(*sys.exc_info()[1:])
+ else:
+ f.set_result(result)
+
+ return
+
+ raise error.ProgrammingError('support for multiple commands not '
+ 'yet implemented')
+
+ def close(self):
+ self.sendcommands()
+
+ self._closed = True
+
class wirepeer(repository.legacypeer):
"""Client-side interface for communicating with a peer repository.
@@ -185,6 +275,9 @@
See also httppeer.py and sshpeer.py for protocol-specific
implementations of this interface.
"""
+ def commandexecutor(self):
+ return peerexecutor(self)
+
# Begin of ipeercommands interface.
def iterbatch(self):