wireproto: overhaul iterating batcher code (API)
The remote batching code is difficult to read. Let's improve it.
As part of the refactor, the future returned by method calls on
batchiter() instances is now populated. However, you still need to
consume the results() generator for the future to be set. But at
least now we can stuff the future somewhere and not have to worry
about aligning method call order with result order since you can
use a future to hold the result.
Also as part of the change, we now verify that @batchable generators
yield exactly 2 values. In other words, we enforce their API.
The non-iter batcher has been unused since
b6e71f8af5b8. And to my
surprise we had no explicit unit test coverage of it! test-batching.py
has been overhauled to use the iterating batcher.
Since the iterating batcher doesn't allow non-batchable method
calls nor local calls, tests have been updated to reflect reality.
The iterating batcher has been used for multiple releases apparently
without major issue. So this shouldn't cause alarm.
.. api::
@peer.batchable functions must now yield exactly 2 values
Differential Revision: https://phab.mercurial-scm.org/D319
--- a/mercurial/peer.py Wed Aug 09 22:52:05 2017 -0700
+++ b/mercurial/peer.py Wed Aug 09 23:29:30 2017 -0700
@@ -69,7 +69,8 @@
def results(self):
for name, args, opts, resref in self.calls:
- yield getattr(self.local, name)(*args, **opts)
+ resref.set(getattr(self.local, name)(*args, **opts))
+ yield resref.value
def batchable(f):
'''annotation for batchable methods
--- a/mercurial/wireproto.py Wed Aug 09 22:52:05 2017 -0700
+++ b/mercurial/wireproto.py Wed Aug 09 23:29:30 2017 -0700
@@ -133,23 +133,47 @@
This is mostly valuable over http where request sizes can be
limited, but can be used in other places as well.
"""
- req, rsp = [], []
- for name, args, opts, resref in self.calls:
- mtd = getattr(self._remote, name)
+ # 2-tuple of (command, arguments) that represents what will be
+ # sent over the wire.
+ requests = []
+
+ # 4-tuple of (command, final future, @batchable generator, remote
+ # future).
+ results = []
+
+ for command, args, opts, finalfuture in self.calls:
+ mtd = getattr(self._remote, command)
batchable = mtd.batchable(mtd.im_self, *args, **opts)
- encargsorres, encresref = next(batchable)
- assert encresref
- req.append((name, encargsorres))
- rsp.append((batchable, encresref))
- if req:
- self._resultiter = self._remote._submitbatch(req)
- self._rsp = rsp
+
+ commandargs, fremote = next(batchable)
+ assert fremote
+ requests.append((command, commandargs))
+ results.append((command, finalfuture, batchable, fremote))
+
+ if requests:
+ self._resultiter = self._remote._submitbatch(requests)
+
+ self._results = results
def results(self):
- for (batchable, encresref), encres in itertools.izip(
- self._rsp, self._resultiter):
- encresref.set(encres)
- yield next(batchable)
+ for command, finalfuture, batchable, remotefuture in self._results:
+ # Get the raw result, set it in the remote future, feed it
+ # back into the @batchable generator so it can be decoded, and
+ # set the result on the final future to this value.
+ remoteresult = next(self._resultiter)
+ remotefuture.set(remoteresult)
+ finalfuture.set(next(batchable))
+
+ # Verify our @batchable generators only emit 2 values.
+ try:
+ next(batchable)
+ except StopIteration:
+ pass
+ else:
+ raise error.ProgrammingError('%s @batchable generator emitted '
+ 'unexpected value count' % command)
+
+ yield finalfuture.value
# Forward a couple of names from peer to make wireproto interactions
# slightly more sensible.
--- a/tests/test-batching.py Wed Aug 09 22:52:05 2017 -0700
+++ b/tests/test-batching.py Wed Aug 09 23:29:30 2017 -0700
@@ -8,7 +8,9 @@
from __future__ import absolute_import, print_function
from mercurial import (
+ error,
peer,
+ util,
wireproto,
)
@@ -27,9 +29,9 @@
return "%s und %s" % (b, a,)
def greet(self, name=None):
return "Hello, %s" % name
- def batch(self):
+ def batchiter(self):
'''Support for local batching.'''
- return peer.localbatch(self)
+ return peer.localiterbatcher(self)
# usage of "thing" interface
def use(it):
@@ -41,27 +43,54 @@
print(it.foo("Un", two="Deux"))
print(it.bar("Eins", "Zwei"))
- # Batched call to a couple of (possibly proxied) methods.
- batch = it.batch()
+ # Batched call to a couple of proxied methods.
+ batch = it.batchiter()
# The calls return futures to eventually hold results.
foo = batch.foo(one="One", two="Two")
bar = batch.bar("Eins", "Zwei")
- # We can call non-batchable proxy methods, but the break the current batch
- # request and cause additional roundtrips.
- greet = batch.greet(name="John Smith")
- # We can also add local methods into the mix, but they break the batch too.
- hello = batch.hello()
bar2 = batch.bar(b="Uno", a="Due")
- # Only now are all the calls executed in sequence, with as few roundtrips
- # as possible.
+
+ # Future shouldn't be set until we submit().
+ assert isinstance(foo, peer.future)
+ assert not util.safehasattr(foo, 'value')
+ assert not util.safehasattr(bar, 'value')
batch.submit()
- # After the call to submit, the futures actually contain values.
+ # Call results() to obtain results as a generator.
+ results = batch.results()
+
+ # Future results shouldn't be set until we consume a value.
+ assert not util.safehasattr(foo, 'value')
+ foovalue = next(results)
+ assert util.safehasattr(foo, 'value')
+ assert foovalue == foo.value
print(foo.value)
+ next(results)
print(bar.value)
- print(greet.value)
- print(hello.value)
+ next(results)
print(bar2.value)
+ # We should be at the end of the results generator.
+ try:
+ next(results)
+ except StopIteration:
+ print('proper end of results generator')
+ else:
+ print('extra emitted element!')
+
+ # Attempting to call a non-batchable method inside a batch fails.
+ batch = it.batchiter()
+ try:
+ batch.greet(name='John Smith')
+ except error.ProgrammingError as e:
+ print(e)
+
+ # Attempting to call a local method inside a batch fails.
+ batch = it.batchiter()
+ try:
+ batch.hello()
+ except error.ProgrammingError as e:
+ print(e)
+
# local usage
mylocal = localthing()
print()
@@ -144,10 +173,11 @@
req.append(name + ':' + args)
req = ';'.join(req)
res = self._submitone('batch', [('cmds', req,)])
- return res.split(';')
+ for r in res.split(';'):
+ yield r
- def batch(self):
- return wireproto.remotebatch(self)
+ def batchiter(self):
+ return wireproto.remoteiterbatcher(self)
@peer.batchable
def foo(self, one, two=None):
--- a/tests/test-batching.py.out Wed Aug 09 22:52:05 2017 -0700
+++ b/tests/test-batching.py.out Wed Aug 09 23:29:30 2017 -0700
@@ -5,9 +5,8 @@
Eins und Zwei
One and Two
Eins und Zwei
-Hello, John Smith
-Ready.
Uno und Due
+proper end of results generator
== Remote
Ready.
@@ -17,14 +16,11 @@
REQ: bar?b=Fjot&a=[xfj
-> Fjot!voe![xfj
Eins und Zwei
-REQ: batch?cmds=foo:one=Pof,two=Uxp;bar:b=Fjot,a=[xfj
- -> Pof!boe!Uxp;Fjot!voe![xfj
-REQ: greet?name=Kpio!Tnjui
- -> Ifmmp-!Kpio!Tnjui
-REQ: batch?cmds=bar:b=Vop,a=Evf
- -> Vop!voe!Evf
+REQ: batch?cmds=foo:one=Pof,two=Uxp;bar:b=Fjot,a=[xfj;bar:b=Vop,a=Evf
+ -> Pof!boe!Uxp;Fjot!voe![xfj;Vop!voe!Evf
One and Two
Eins und Zwei
-Hello, John Smith
-Ready.
Uno und Due
+proper end of results generator
+Attempted to batch a non-batchable call to 'greet'
+Attempted to batch a non-batchable call to 'hello'