mercurial/wireproto.py
changeset 14621 84094c0d2724
parent 14436 5adb52524779
child 14622 bd88561afb4b
--- a/mercurial/wireproto.py	Wed Jun 15 01:50:49 2011 +0900
+++ b/mercurial/wireproto.py	Tue Jun 14 22:51:26 2011 +0200
@@ -12,6 +12,110 @@
 import repo, error, encoding, util, store
 import pushkey as pushkeymod
 
+# abstract batching support
+
+class future(object):
+    '''placeholder for a value to be set later'''
+    def set(self, value):
+        if hasattr(self, 'value'):
+            raise error.RepoError("future is already set")
+        self.value = value
+
+class batcher(object):
+    '''base class for batches of commands submittable in a single request
+
+    All methods invoked on instances of this class are simply queued and return a
+    a future for the result. Once you call submit(), all the queued calls are
+    performed and the results set in their respective futures.
+    '''
+    def __init__(self):
+        self.calls = []
+    def __getattr__(self, name):
+        def call(*args, **opts):
+            resref = future()
+            self.calls.append((name, args, opts, resref,))
+            return resref
+        return call
+    def submit(self):
+        pass
+
+class localbatch(batcher):
+    '''performs the queued calls directly'''
+    def __init__(self, local):
+        batcher.__init__(self)
+        self.local = local
+    def submit(self):
+        for name, args, opts, resref in self.calls:
+            resref.set(getattr(self.local, name)(*args, **opts))
+
+class remotebatch(batcher):
+    '''batches the queued calls; uses as few roundtrips as possible'''
+    def __init__(self, remote):
+        '''remote must support _submitbatch(encbatch) and _submitone(op, encargs)'''
+        batcher.__init__(self)
+        self.remote = remote
+    def submit(self):
+        req, rsp = [], []
+        for name, args, opts, resref in self.calls:
+            mtd = getattr(self.remote, name)
+            if hasattr(mtd, 'batchable'):
+                batchable = getattr(mtd, 'batchable')(mtd.im_self, *args, **opts)
+                encargsorres, encresref = batchable.next()
+                if encresref:
+                    req.append((name, encargsorres,))
+                    rsp.append((batchable, encresref, resref,))
+                else:
+                    resref.set(encargsorres)
+            else:
+                if req:
+                    self._submitreq(req, rsp)
+                    req, rsp = [], []
+                resref.set(mtd(*args, **opts))
+        if req:
+            self._submitreq(req, rsp)
+    def _submitreq(self, req, rsp):
+        encresults = self.remote._submitbatch(req)
+        for encres, r in zip(encresults, rsp):
+            batchable, encresref, resref = r
+            encresref.set(encres)
+            resref.set(batchable.next())
+
+def batchable(f):
+    '''annotation for batchable methods
+
+    Such methods must implement a coroutine as follows:
+
+    @batchable
+    def sample(self, one, two=None):
+        # Handle locally computable results first:
+        if not one:
+            yield "a local result", None
+        # Build list of encoded arguments suitable for your wire protocol:
+        encargs = [('one', encode(one),), ('two', encode(two),)]
+        # Create future for injection of encoded result:
+        encresref = future()
+        # Return encoded arguments and future:
+        yield encargs, encresref
+        # Assuming the future to be filled with the result from the batched request
+        # now. Decode it:
+        yield decode(encresref.value)
+
+    The decorator returns a function which wraps this coroutine as a plain method,
+    but adds the original method as an attribute called "batchable", which is
+    used by remotebatch to split the call into separate encoding and decoding
+    phases.
+    '''
+    def plain(*args, **opts):
+        batchable = f(*args, **opts)
+        encargsorres, encresref = batchable.next()
+        if not encresref:
+            return encargsorres # a local result in this case
+        self = args[0]
+        encresref.set(self._submitone(f.func_name, encargsorres))
+        return batchable.next()
+    setattr(plain, 'batchable', f)
+    return plain
+
 # list of nodes encoding / decoding
 
 def decodelist(l, sep=' '):