Mercurial > hg-stable
changeset 14621:84094c0d2724
wireproto: add basic command batching infrastructure
Note that localbatch will not be used until we actually have a localpeer to
use it with.
author | Peter Arrenbrecht <peter.arrenbrecht@gmail.com> |
---|---|
date | Tue, 14 Jun 2011 22:51:26 +0200 |
parents | 2b9c32929e62 |
children | bd88561afb4b |
files | mercurial/wireproto.py tests/test-batching.py tests/test-batching.py.out |
diffstat | 3 files changed, 311 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/wireproto.py Wed Jun 15 01:50:49 2011 +0900 +++ b/mercurial/wireproto.py Tue Jun 14 22:51:26 2011 +0200 @@ -12,6 +12,110 @@ import repo, error, encoding, util, store import pushkey as pushkeymod +# abstract batching support + +class future(object): + '''placeholder for a value to be set later''' + def set(self, value): + if hasattr(self, 'value'): + raise error.RepoError("future is already set") + self.value = value + +class batcher(object): + '''base class for batches of commands submittable in a single request + + All methods invoked on instances of this class are simply queued and return a + a future for the result. Once you call submit(), all the queued calls are + performed and the results set in their respective futures. + ''' + def __init__(self): + self.calls = [] + def __getattr__(self, name): + def call(*args, **opts): + resref = future() + self.calls.append((name, args, opts, resref,)) + return resref + return call + def submit(self): + pass + +class localbatch(batcher): + '''performs the queued calls directly''' + def __init__(self, local): + batcher.__init__(self) + self.local = local + def submit(self): + for name, args, opts, resref in self.calls: + resref.set(getattr(self.local, name)(*args, **opts)) + +class remotebatch(batcher): + '''batches the queued calls; uses as few roundtrips as possible''' + def __init__(self, remote): + '''remote must support _submitbatch(encbatch) and _submitone(op, encargs)''' + batcher.__init__(self) + self.remote = remote + def submit(self): + req, rsp = [], [] + for name, args, opts, resref in self.calls: + mtd = getattr(self.remote, name) + if hasattr(mtd, 'batchable'): + batchable = getattr(mtd, 'batchable')(mtd.im_self, *args, **opts) + encargsorres, encresref = batchable.next() + if encresref: + req.append((name, encargsorres,)) + rsp.append((batchable, encresref, resref,)) + else: + resref.set(encargsorres) + else: + if req: + self._submitreq(req, rsp) + req, rsp = [], [] + resref.set(mtd(*args, **opts)) + if req: + self._submitreq(req, rsp) + def _submitreq(self, req, rsp): + encresults = self.remote._submitbatch(req) + for encres, r in zip(encresults, rsp): + batchable, encresref, resref = r + encresref.set(encres) + resref.set(batchable.next()) + +def batchable(f): + '''annotation for batchable methods + + Such methods must implement a coroutine as follows: + + @batchable + def sample(self, one, two=None): + # Handle locally computable results first: + if not one: + yield "a local result", None + # Build list of encoded arguments suitable for your wire protocol: + encargs = [('one', encode(one),), ('two', encode(two),)] + # Create future for injection of encoded result: + encresref = future() + # Return encoded arguments and future: + yield encargs, encresref + # Assuming the future to be filled with the result from the batched request + # now. Decode it: + yield decode(encresref.value) + + The decorator returns a function which wraps this coroutine as a plain method, + but adds the original method as an attribute called "batchable", which is + used by remotebatch to split the call into separate encoding and decoding + phases. + ''' + def plain(*args, **opts): + batchable = f(*args, **opts) + encargsorres, encresref = batchable.next() + if not encresref: + return encargsorres # a local result in this case + self = args[0] + encresref.set(self._submitone(f.func_name, encargsorres)) + return batchable.next() + setattr(plain, 'batchable', f) + return plain + # list of nodes encoding / decoding def decodelist(l, sep=' '):
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/test-batching.py Tue Jun 14 22:51:26 2011 +0200 @@ -0,0 +1,175 @@ +# test-batching.py - tests for transparent command batching +# +# Copyright 2011 Peter Arrenbrecht <peter@arrenbrecht.ch> +# +# This software may be used and distributed according to the terms of the +# GNU General Public License version 2 or any later version. + +from mercurial.wireproto import localbatch, remotebatch, batchable, future + +# equivalent of repo.repository +class thing(object): + def hello(self): + return "Ready." + +# equivalent of localrepo.localrepository +class localthing(thing): + def foo(self, one, two=None): + if one: + return "%s and %s" % (one, two,) + return "Nope" + def bar(self, b, a): + return "%s und %s" % (b, a,) + def greet(self, name=None): + return "Hello, %s" % name + def batch(self): + '''Support for local batching.''' + return localbatch(self) + +# usage of "thing" interface +def use(it): + + # Direct call to base method shared between client and server. + print it.hello() + + # Direct calls to proxied methods. They cause individual roundtrips. + print it.foo("Un", two="Deux") + print it.bar("Eins", "Zwei") + + # Batched call to a couple of (possibly proxied) methods. + batch = it.batch() + # The calls return futures to eventually hold results. + foo = batch.foo(one="One", two="Two") + foo2 = batch.foo(None) + bar = batch.bar("Eins", "Zwei") + # We can call non-batchable proxy methods, but the break the current batch + # request and cause additional roundtrips. + greet = batch.greet(name="John Smith") + # We can also add local methods into the mix, but they break the batch too. + hello = batch.hello() + bar2 = batch.bar(b="Uno", a="Due") + # Only now are all the calls executed in sequence, with as few roundtrips + # as possible. + batch.submit() + # After the call to submit, the futures actually contain values. + print foo.value + print foo2.value + print bar.value + print greet.value + print hello.value + print bar2.value + +# local usage +mylocal = localthing() +print +print "== Local" +use(mylocal) + +# demo remoting; mimicks what wireproto and HTTP/SSH do + +# shared + +def escapearg(plain): + return (plain + .replace(':', '::') + .replace(',', ':,') + .replace(';', ':;') + .replace('=', ':=')) +def unescapearg(escaped): + return (escaped + .replace(':=', '=') + .replace(':;', ';') + .replace(':,', ',') + .replace('::', ':')) + +# server side + +# equivalent of wireproto's global functions +class server: + def __init__(self, local): + self.local = local + def _call(self, name, args): + args = dict(arg.split('=', 1) for arg in args) + return getattr(self, name)(**args) + def perform(self, req): + print "REQ:", req + name, args = req.split('?', 1) + args = args.split('&') + vals = dict(arg.split('=', 1) for arg in args) + res = getattr(self, name)(**vals) + print " ->", res + return res + def batch(self, cmds): + res = [] + for pair in cmds.split(';'): + name, args = pair.split(':', 1) + vals = {} + for a in args.split(','): + if a: + n, v = a.split('=') + vals[n] = unescapearg(v) + res.append(escapearg(getattr(self, name)(**vals))) + return ';'.join(res) + def foo(self, one, two): + return mangle(self.local.foo(unmangle(one), unmangle(two))) + def bar(self, b, a): + return mangle(self.local.bar(unmangle(b), unmangle(a))) + def greet(self, name): + return mangle(self.local.greet(unmangle(name))) +myserver = server(mylocal) + +# local side + +# equivalent of wireproto.encode/decodelist, that is, type-specific marshalling +# here we just transform the strings a bit to check we're properly en-/decoding +def mangle(s): + return ''.join(chr(ord(c) + 1) for c in s) +def unmangle(s): + return ''.join(chr(ord(c) - 1) for c in s) + +# equivalent of wireproto.wirerepository and something like http's wire format +class remotething(thing): + def __init__(self, server): + self.server = server + def _submitone(self, name, args): + req = name + '?' + '&'.join(['%s=%s' % (n, v) for n, v in args]) + return self.server.perform(req) + def _submitbatch(self, cmds): + req = [] + for name, args in cmds: + args = ','.join(n + '=' + escapearg(v) for n, v in args) + req.append(name + ':' + args) + req = ';'.join(req) + res = self._submitone('batch', [('cmds', req,)]) + return res.split(';') + + def batch(self): + return remotebatch(self) + + @batchable + def foo(self, one, two=None): + if not one: + yield "Nope", None + encargs = [('one', mangle(one),), ('two', mangle(two),)] + encresref = future() + yield encargs, encresref + yield unmangle(encresref.value) + + @batchable + def bar(self, b, a): + encresref = future() + yield [('b', mangle(b),), ('a', mangle(a),)], encresref + yield unmangle(encresref.value) + + # greet is coded directly. It therefore does not support batching. If it + # does appear in a batch, the batch is split around greet, and the call to + # greet is done in its own roundtrip. + def greet(self, name=None): + return unmangle(self._submitone('greet', [('name', mangle(name),)])) + +# demo remote usage + +myproxy = remotething(myserver) +print +print "== Remote" +use(myproxy)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/test-batching.py.out Tue Jun 14 22:51:26 2011 +0200 @@ -0,0 +1,32 @@ + +== Local +Ready. +Un and Deux +Eins und Zwei +One and Two +Nope +Eins und Zwei +Hello, John Smith +Ready. +Uno und Due + +== Remote +Ready. +REQ: foo?one=Vo&two=Efvy + -> Vo!boe!Efvy +Un and Deux +REQ: bar?b=Fjot&a=[xfj + -> Fjot!voe![xfj +Eins und Zwei +REQ: batch?cmds=foo:one=Pof,two=Uxp;bar:b=Fjot,a=[xfj + -> Pof!boe!Uxp;Fjot!voe![xfj +REQ: greet?name=Kpio!Tnjui + -> Ifmmp-!Kpio!Tnjui +REQ: batch?cmds=bar:b=Vop,a=Evf + -> Vop!voe!Evf +One and Two +Nope +Eins und Zwei +Hello, John Smith +Ready. +Uno und Due