Mercurial > hg
changeset 25912:cbbdd085c991
batching: migrate basic noop batching into peer.peer
"Real" batching only makes sense for wirepeers, but it greatly
simplifies the clients of peer instances if they can be ignorant to
actual batching capabilities of that peer. By moving the
not-really-batched batching code into peer.peer, all peer instances
now work with the batching API, thus simplifying users.
This leaves a couple of name forwards in wirepeer.py. Originally I had
planned to clean those up, but it kind of unclarifies other bits of
code that want to use batching, so I think it makes sense for the
names to stay exposed by wireproto. Specifically, almost nothing is
currently aware of peer (see largefiles.proto for an example), so
making them be aware of the peer module *and* the wireproto module
seems like some abstraction leakage. I *think* the right long-term fix
would actually be to make wireproto an implementation detail that
clients wouldn't need to know about, but I don't really know what that
would entail at the moment.
As far as I'm aware, no clients of batching in third-party extensions
will need updating, which is nice icing.
author | Augie Fackler <augie@google.com> |
---|---|
date | Wed, 05 Aug 2015 14:51:34 -0400 |
parents | b4a85ddadcb9 |
children | fa14ba7b9667 |
files | mercurial/peer.py mercurial/wireproto.py tests/test-batching.py tests/test-wireproto.py |
diffstat | 4 files changed, 88 insertions(+), 74 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/peer.py Mon Aug 03 06:13:05 2015 -0700 +++ b/mercurial/peer.py Wed Aug 05 14:51:34 2015 -0400 @@ -8,9 +8,85 @@ from i18n import _ import error +import util + +# abstract batching support + +class future(object): + '''placeholder for a value to be set later''' + def set(self, value): + if util.safehasattr(self, 'value'): + raise error.RepoError("future is already set") + self.value = value + +class batcher(object): + '''base class for batches of commands submittable in a single request + + All methods invoked on instances of this class are simply queued and + return a a future for the result. Once you call submit(), all the queued + calls are performed and the results set in their respective futures. + ''' + def __init__(self): + self.calls = [] + def __getattr__(self, name): + def call(*args, **opts): + resref = future() + self.calls.append((name, args, opts, resref,)) + return resref + return call + def submit(self): + pass + +class localbatch(batcher): + '''performs the queued calls directly''' + def __init__(self, local): + batcher.__init__(self) + self.local = local + def submit(self): + for name, args, opts, resref in self.calls: + resref.set(getattr(self.local, name)(*args, **opts)) + +def batchable(f): + '''annotation for batchable methods + + Such methods must implement a coroutine as follows: + + @batchable + def sample(self, one, two=None): + # Handle locally computable results first: + if not one: + yield "a local result", None + # Build list of encoded arguments suitable for your wire protocol: + encargs = [('one', encode(one),), ('two', encode(two),)] + # Create future for injection of encoded result: + encresref = future() + # Return encoded arguments and future: + yield encargs, encresref + # Assuming the future to be filled with the result from the batched + # request now. Decode it: + yield decode(encresref.value) + + The decorator returns a function which wraps this coroutine as a plain + method, but adds the original method as an attribute called "batchable", + which is used by remotebatch to split the call into separate encoding and + decoding phases. + ''' + def plain(*args, **opts): + batchable = f(*args, **opts) + encargsorres, encresref = batchable.next() + if not encresref: + return encargsorres # a local result in this case + self = args[0] + encresref.set(self._submitone(f.func_name, encargsorres)) + return batchable.next() + setattr(plain, 'batchable', f) + return plain class peerrepository(object): + def batch(self): + return localbatch(self) + def capable(self, name): '''tell whether repo supports named capability. return False if not supported.
--- a/mercurial/wireproto.py Mon Aug 03 06:13:05 2015 -0700 +++ b/mercurial/wireproto.py Wed Aug 05 14:51:34 2015 -0400 @@ -58,48 +58,12 @@ Some protocols may have compressed the contents.""" raise NotImplementedError() -# abstract batching support - -class future(object): - '''placeholder for a value to be set later''' - def set(self, value): - if util.safehasattr(self, 'value'): - raise error.RepoError("future is already set") - self.value = value - -class batcher(object): - '''base class for batches of commands submittable in a single request - - All methods invoked on instances of this class are simply queued and - return a a future for the result. Once you call submit(), all the queued - calls are performed and the results set in their respective futures. - ''' - def __init__(self): - self.calls = [] - def __getattr__(self, name): - def call(*args, **opts): - resref = future() - self.calls.append((name, args, opts, resref,)) - return resref - return call - def submit(self): - pass - -class localbatch(batcher): - '''performs the queued calls directly''' - def __init__(self, local): - batcher.__init__(self) - self.local = local - def submit(self): - for name, args, opts, resref in self.calls: - resref.set(getattr(self.local, name)(*args, **opts)) - -class remotebatch(batcher): +class remotebatch(peer.batcher): '''batches the queued calls; uses as few roundtrips as possible''' def __init__(self, remote): '''remote must support _submitbatch(encbatch) and _submitone(op, encargs)''' - batcher.__init__(self) + peer.batcher.__init__(self) self.remote = remote def submit(self): req, rsp = [], [] @@ -128,41 +92,10 @@ encresref.set(encres) resref.set(batchable.next()) -def batchable(f): - '''annotation for batchable methods - - Such methods must implement a coroutine as follows: - - @batchable - def sample(self, one, two=None): - # Handle locally computable results first: - if not one: - yield "a local result", None - # Build list of encoded arguments suitable for your wire protocol: - encargs = [('one', encode(one),), ('two', encode(two),)] - # Create future for injection of encoded result: - encresref = future() - # Return encoded arguments and future: - yield encargs, encresref - # Assuming the future to be filled with the result from the batched - # request now. Decode it: - yield decode(encresref.value) - - The decorator returns a function which wraps this coroutine as a plain - method, but adds the original method as an attribute called "batchable", - which is used by remotebatch to split the call into separate encoding and - decoding phases. - ''' - def plain(*args, **opts): - batchable = f(*args, **opts) - encargsorres, encresref = batchable.next() - if not encresref: - return encargsorres # a local result in this case - self = args[0] - encresref.set(self._submitone(f.func_name, encargsorres)) - return batchable.next() - setattr(plain, 'batchable', f) - return plain +# Forward a couple of names from peer to make wireproto interactions +# slightly more sensible. +batchable = peer.batchable +future = peer.future # list of nodes encoding / decoding
--- a/tests/test-batching.py Mon Aug 03 06:13:05 2015 -0700 +++ b/tests/test-batching.py Wed Aug 05 14:51:34 2015 -0400 @@ -5,7 +5,8 @@ # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. -from mercurial.wireproto import localbatch, remotebatch, batchable, future +from mercurial.peer import localbatch, batchable, future +from mercurial.wireproto import remotebatch # equivalent of repo.repository class thing(object):
--- a/tests/test-wireproto.py Mon Aug 03 06:13:05 2015 -0700 +++ b/tests/test-wireproto.py Wed Aug 05 14:51:34 2015 -0400 @@ -12,6 +12,10 @@ class clientpeer(wireproto.wirepeer): def __init__(self, serverrepo): self.serverrepo = serverrepo + + def _capabilities(self): + return ['batch'] + def _call(self, cmd, **args): return wireproto.dispatch(self.serverrepo, proto(args), cmd)