--- a/mercurial/wireproto.py Wed Jun 15 01:50:49 2011 +0900
+++ b/mercurial/wireproto.py Tue Jun 14 22:51:26 2011 +0200
@@ -12,6 +12,110 @@
import repo, error, encoding, util, store
import pushkey as pushkeymod
+# abstract batching support
+
+class future(object):
+ '''placeholder for a value to be set later'''
+ def set(self, value):
+ if hasattr(self, 'value'):
+ raise error.RepoError("future is already set")
+ self.value = value
+
+class batcher(object):
+ '''base class for batches of commands submittable in a single request
+
+ All methods invoked on instances of this class are simply queued and return a
+ a future for the result. Once you call submit(), all the queued calls are
+ performed and the results set in their respective futures.
+ '''
+ def __init__(self):
+ self.calls = []
+ def __getattr__(self, name):
+ def call(*args, **opts):
+ resref = future()
+ self.calls.append((name, args, opts, resref,))
+ return resref
+ return call
+ def submit(self):
+ pass
+
+class localbatch(batcher):
+ '''performs the queued calls directly'''
+ def __init__(self, local):
+ batcher.__init__(self)
+ self.local = local
+ def submit(self):
+ for name, args, opts, resref in self.calls:
+ resref.set(getattr(self.local, name)(*args, **opts))
+
+class remotebatch(batcher):
+ '''batches the queued calls; uses as few roundtrips as possible'''
+ def __init__(self, remote):
+ '''remote must support _submitbatch(encbatch) and _submitone(op, encargs)'''
+ batcher.__init__(self)
+ self.remote = remote
+ def submit(self):
+ req, rsp = [], []
+ for name, args, opts, resref in self.calls:
+ mtd = getattr(self.remote, name)
+ if hasattr(mtd, 'batchable'):
+ batchable = getattr(mtd, 'batchable')(mtd.im_self, *args, **opts)
+ encargsorres, encresref = batchable.next()
+ if encresref:
+ req.append((name, encargsorres,))
+ rsp.append((batchable, encresref, resref,))
+ else:
+ resref.set(encargsorres)
+ else:
+ if req:
+ self._submitreq(req, rsp)
+ req, rsp = [], []
+ resref.set(mtd(*args, **opts))
+ if req:
+ self._submitreq(req, rsp)
+ def _submitreq(self, req, rsp):
+ encresults = self.remote._submitbatch(req)
+ for encres, r in zip(encresults, rsp):
+ batchable, encresref, resref = r
+ encresref.set(encres)
+ resref.set(batchable.next())
+
+def batchable(f):
+ '''annotation for batchable methods
+
+ Such methods must implement a coroutine as follows:
+
+ @batchable
+ def sample(self, one, two=None):
+ # Handle locally computable results first:
+ if not one:
+ yield "a local result", None
+ # Build list of encoded arguments suitable for your wire protocol:
+ encargs = [('one', encode(one),), ('two', encode(two),)]
+ # Create future for injection of encoded result:
+ encresref = future()
+ # Return encoded arguments and future:
+ yield encargs, encresref
+ # Assuming the future to be filled with the result from the batched request
+ # now. Decode it:
+ yield decode(encresref.value)
+
+ The decorator returns a function which wraps this coroutine as a plain method,
+ but adds the original method as an attribute called "batchable", which is
+ used by remotebatch to split the call into separate encoding and decoding
+ phases.
+ '''
+ def plain(*args, **opts):
+ batchable = f(*args, **opts)
+ encargsorres, encresref = batchable.next()
+ if not encresref:
+ return encargsorres # a local result in this case
+ self = args[0]
+ encresref.set(self._submitone(f.func_name, encargsorres))
+ return batchable.next()
+ setattr(plain, 'batchable', f)
+ return plain
+
# list of nodes encoding / decoding
def decodelist(l, sep=' '):