wireproto: add basic command batching infrastructure
Note that localbatch will not be used until we actually have a localpeer to
use it with.
--- a/mercurial/wireproto.py Wed Jun 15 01:50:49 2011 +0900
+++ b/mercurial/wireproto.py Tue Jun 14 22:51:26 2011 +0200
@@ -12,6 +12,110 @@
import repo, error, encoding, util, store
import pushkey as pushkeymod
+# abstract batching support
+
+class future(object):
+ '''placeholder for a value to be set later'''
+ def set(self, value):
+ if hasattr(self, 'value'):
+ raise error.RepoError("future is already set")
+ self.value = value
+
+class batcher(object):
+ '''base class for batches of commands submittable in a single request
+
+ All methods invoked on instances of this class are simply queued and return a
+ a future for the result. Once you call submit(), all the queued calls are
+ performed and the results set in their respective futures.
+ '''
+ def __init__(self):
+ self.calls = []
+ def __getattr__(self, name):
+ def call(*args, **opts):
+ resref = future()
+ self.calls.append((name, args, opts, resref,))
+ return resref
+ return call
+ def submit(self):
+ pass
+
+class localbatch(batcher):
+ '''performs the queued calls directly'''
+ def __init__(self, local):
+ batcher.__init__(self)
+ self.local = local
+ def submit(self):
+ for name, args, opts, resref in self.calls:
+ resref.set(getattr(self.local, name)(*args, **opts))
+
+class remotebatch(batcher):
+ '''batches the queued calls; uses as few roundtrips as possible'''
+ def __init__(self, remote):
+ '''remote must support _submitbatch(encbatch) and _submitone(op, encargs)'''
+ batcher.__init__(self)
+ self.remote = remote
+ def submit(self):
+ req, rsp = [], []
+ for name, args, opts, resref in self.calls:
+ mtd = getattr(self.remote, name)
+ if hasattr(mtd, 'batchable'):
+ batchable = getattr(mtd, 'batchable')(mtd.im_self, *args, **opts)
+ encargsorres, encresref = batchable.next()
+ if encresref:
+ req.append((name, encargsorres,))
+ rsp.append((batchable, encresref, resref,))
+ else:
+ resref.set(encargsorres)
+ else:
+ if req:
+ self._submitreq(req, rsp)
+ req, rsp = [], []
+ resref.set(mtd(*args, **opts))
+ if req:
+ self._submitreq(req, rsp)
+ def _submitreq(self, req, rsp):
+ encresults = self.remote._submitbatch(req)
+ for encres, r in zip(encresults, rsp):
+ batchable, encresref, resref = r
+ encresref.set(encres)
+ resref.set(batchable.next())
+
+def batchable(f):
+ '''annotation for batchable methods
+
+ Such methods must implement a coroutine as follows:
+
+ @batchable
+ def sample(self, one, two=None):
+ # Handle locally computable results first:
+ if not one:
+ yield "a local result", None
+ # Build list of encoded arguments suitable for your wire protocol:
+ encargs = [('one', encode(one),), ('two', encode(two),)]
+ # Create future for injection of encoded result:
+ encresref = future()
+ # Return encoded arguments and future:
+ yield encargs, encresref
+ # Assuming the future to be filled with the result from the batched request
+ # now. Decode it:
+ yield decode(encresref.value)
+
+ The decorator returns a function which wraps this coroutine as a plain method,
+ but adds the original method as an attribute called "batchable", which is
+ used by remotebatch to split the call into separate encoding and decoding
+ phases.
+ '''
+ def plain(*args, **opts):
+ batchable = f(*args, **opts)
+ encargsorres, encresref = batchable.next()
+ if not encresref:
+ return encargsorres # a local result in this case
+ self = args[0]
+ encresref.set(self._submitone(f.func_name, encargsorres))
+ return batchable.next()
+ setattr(plain, 'batchable', f)
+ return plain
+
# list of nodes encoding / decoding
def decodelist(l, sep=' '):
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/test-batching.py Tue Jun 14 22:51:26 2011 +0200
@@ -0,0 +1,175 @@
+# test-batching.py - tests for transparent command batching
+#
+# Copyright 2011 Peter Arrenbrecht <peter@arrenbrecht.ch>
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+from mercurial.wireproto import localbatch, remotebatch, batchable, future
+
+# equivalent of repo.repository
+class thing(object):
+ def hello(self):
+ return "Ready."
+
+# equivalent of localrepo.localrepository
+class localthing(thing):
+ def foo(self, one, two=None):
+ if one:
+ return "%s and %s" % (one, two,)
+ return "Nope"
+ def bar(self, b, a):
+ return "%s und %s" % (b, a,)
+ def greet(self, name=None):
+ return "Hello, %s" % name
+ def batch(self):
+ '''Support for local batching.'''
+ return localbatch(self)
+
+# usage of "thing" interface
+def use(it):
+
+ # Direct call to base method shared between client and server.
+ print it.hello()
+
+ # Direct calls to proxied methods. They cause individual roundtrips.
+ print it.foo("Un", two="Deux")
+ print it.bar("Eins", "Zwei")
+
+ # Batched call to a couple of (possibly proxied) methods.
+ batch = it.batch()
+ # The calls return futures to eventually hold results.
+ foo = batch.foo(one="One", two="Two")
+ foo2 = batch.foo(None)
+ bar = batch.bar("Eins", "Zwei")
+ # We can call non-batchable proxy methods, but the break the current batch
+ # request and cause additional roundtrips.
+ greet = batch.greet(name="John Smith")
+ # We can also add local methods into the mix, but they break the batch too.
+ hello = batch.hello()
+ bar2 = batch.bar(b="Uno", a="Due")
+ # Only now are all the calls executed in sequence, with as few roundtrips
+ # as possible.
+ batch.submit()
+ # After the call to submit, the futures actually contain values.
+ print foo.value
+ print foo2.value
+ print bar.value
+ print greet.value
+ print hello.value
+ print bar2.value
+
+# local usage
+mylocal = localthing()
+print
+print "== Local"
+use(mylocal)
+
+# demo remoting; mimicks what wireproto and HTTP/SSH do
+
+# shared
+
+def escapearg(plain):
+ return (plain
+ .replace(':', '::')
+ .replace(',', ':,')
+ .replace(';', ':;')
+ .replace('=', ':='))
+def unescapearg(escaped):
+ return (escaped
+ .replace(':=', '=')
+ .replace(':;', ';')
+ .replace(':,', ',')
+ .replace('::', ':'))
+
+# server side
+
+# equivalent of wireproto's global functions
+class server:
+ def __init__(self, local):
+ self.local = local
+ def _call(self, name, args):
+ args = dict(arg.split('=', 1) for arg in args)
+ return getattr(self, name)(**args)
+ def perform(self, req):
+ print "REQ:", req
+ name, args = req.split('?', 1)
+ args = args.split('&')
+ vals = dict(arg.split('=', 1) for arg in args)
+ res = getattr(self, name)(**vals)
+ print " ->", res
+ return res
+ def batch(self, cmds):
+ res = []
+ for pair in cmds.split(';'):
+ name, args = pair.split(':', 1)
+ vals = {}
+ for a in args.split(','):
+ if a:
+ n, v = a.split('=')
+ vals[n] = unescapearg(v)
+ res.append(escapearg(getattr(self, name)(**vals)))
+ return ';'.join(res)
+ def foo(self, one, two):
+ return mangle(self.local.foo(unmangle(one), unmangle(two)))
+ def bar(self, b, a):
+ return mangle(self.local.bar(unmangle(b), unmangle(a)))
+ def greet(self, name):
+ return mangle(self.local.greet(unmangle(name)))
+myserver = server(mylocal)
+
+# local side
+
+# equivalent of wireproto.encode/decodelist, that is, type-specific marshalling
+# here we just transform the strings a bit to check we're properly en-/decoding
+def mangle(s):
+ return ''.join(chr(ord(c) + 1) for c in s)
+def unmangle(s):
+ return ''.join(chr(ord(c) - 1) for c in s)
+
+# equivalent of wireproto.wirerepository and something like http's wire format
+class remotething(thing):
+ def __init__(self, server):
+ self.server = server
+ def _submitone(self, name, args):
+ req = name + '?' + '&'.join(['%s=%s' % (n, v) for n, v in args])
+ return self.server.perform(req)
+ def _submitbatch(self, cmds):
+ req = []
+ for name, args in cmds:
+ args = ','.join(n + '=' + escapearg(v) for n, v in args)
+ req.append(name + ':' + args)
+ req = ';'.join(req)
+ res = self._submitone('batch', [('cmds', req,)])
+ return res.split(';')
+
+ def batch(self):
+ return remotebatch(self)
+
+ @batchable
+ def foo(self, one, two=None):
+ if not one:
+ yield "Nope", None
+ encargs = [('one', mangle(one),), ('two', mangle(two),)]
+ encresref = future()
+ yield encargs, encresref
+ yield unmangle(encresref.value)
+
+ @batchable
+ def bar(self, b, a):
+ encresref = future()
+ yield [('b', mangle(b),), ('a', mangle(a),)], encresref
+ yield unmangle(encresref.value)
+
+ # greet is coded directly. It therefore does not support batching. If it
+ # does appear in a batch, the batch is split around greet, and the call to
+ # greet is done in its own roundtrip.
+ def greet(self, name=None):
+ return unmangle(self._submitone('greet', [('name', mangle(name),)]))
+
+# demo remote usage
+
+myproxy = remotething(myserver)
+print
+print "== Remote"
+use(myproxy)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/test-batching.py.out Tue Jun 14 22:51:26 2011 +0200
@@ -0,0 +1,32 @@
+
+== Local
+Ready.
+Un and Deux
+Eins und Zwei
+One and Two
+Nope
+Eins und Zwei
+Hello, John Smith
+Ready.
+Uno und Due
+
+== Remote
+Ready.
+REQ: foo?one=Vo&two=Efvy
+ -> Vo!boe!Efvy
+Un and Deux
+REQ: bar?b=Fjot&a=[xfj
+ -> Fjot!voe![xfj
+Eins und Zwei
+REQ: batch?cmds=foo:one=Pof,two=Uxp;bar:b=Fjot,a=[xfj
+ -> Pof!boe!Uxp;Fjot!voe![xfj
+REQ: greet?name=Kpio!Tnjui
+ -> Ifmmp-!Kpio!Tnjui
+REQ: batch?cmds=bar:b=Vop,a=Evf
+ -> Vop!voe!Evf
+One and Two
+Nope
+Eins und Zwei
+Hello, John Smith
+Ready.
+Uno und Due