mercurial/wireprotov1peer.py
changeset 37630 e1b32dc4646c
parent 37615 f3dc8239e3a9
child 37631 2f626233859b
--- a/mercurial/wireprotov1peer.py	Fri Apr 13 10:23:05 2018 -0700
+++ b/mercurial/wireprotov1peer.py	Fri Apr 13 10:51:23 2018 -0700
@@ -8,12 +8,15 @@
 from __future__ import absolute_import
 
 import hashlib
+import sys
 
 from .i18n import _
 from .node import (
     bin,
 )
-
+from .thirdparty.zope import (
+    interface as zi,
+)
 from . import (
     bundle2,
     changegroup as changegroupmod,
@@ -177,6 +180,93 @@
 
     return ';'.join(cmds)
 
+@zi.implementer(repository.ipeercommandexecutor)
+class peerexecutor(object):
+    def __init__(self, peer):
+        self._peer = peer
+        self._sent = False
+        self._closed = False
+        self._calls = []
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exctype, excvalee, exctb):
+        self.close()
+
+    def callcommand(self, command, args):
+        if self._sent:
+            raise error.ProgrammingError('callcommand() cannot be used '
+                                         'after commands are sent')
+
+        if self._closed:
+            raise error.ProgrammingError('callcommand() cannot be used '
+                                         'after close()')
+
+        # Commands are dispatched through methods on the peer.
+        fn = getattr(self._peer, pycompat.sysstr(command), None)
+
+        if not fn:
+            raise error.ProgrammingError(
+                'cannot call command %s: method of same name not available '
+                'on peer' % command)
+
+        # Commands are either batchable or they aren't. If a command
+        # isn't batchable, we send it immediately because the executor
+        # can no longer accept new commands after a non-batchable command.
+        # If a command is batchable, we queue it for later.
+
+        if getattr(fn, 'batchable', False):
+            pass
+        else:
+            if self._calls:
+                raise error.ProgrammingError(
+                    '%s is not batchable and cannot be called on a command '
+                    'executor along with other commands' % command)
+
+        # We don't support batching yet. So resolve it immediately.
+        f = pycompat.futures.Future()
+        self._calls.append((command, args, fn, f))
+        self.sendcommands()
+        return f
+
+    def sendcommands(self):
+        if self._sent:
+            return
+
+        if not self._calls:
+            return
+
+        self._sent = True
+
+        calls = self._calls
+        # Mainly to destroy references to futures.
+        self._calls = None
+
+        if len(calls) == 1:
+            command, args, fn, f = calls[0]
+
+            # Future was cancelled. Ignore it.
+            if not f.set_running_or_notify_cancel():
+                return
+
+            try:
+                result = fn(**pycompat.strkwargs(args))
+            except Exception:
+                f.set_exception_info(*sys.exc_info()[1:])
+            else:
+                f.set_result(result)
+
+            return
+
+        raise error.ProgrammingError('support for multiple commands not '
+                                     'yet implemented')
+
+    def close(self):
+        self.sendcommands()
+
+        self._closed = True
+
 class wirepeer(repository.legacypeer):
     """Client-side interface for communicating with a peer repository.
 
@@ -185,6 +275,9 @@
     See also httppeer.py and sshpeer.py for protocol-specific
     implementations of this interface.
     """
+    def commandexecutor(self):
+        return peerexecutor(self)
+
     # Begin of ipeercommands interface.
 
     def iterbatch(self):