Mercurial > hg
view tests/test-wireproto.py @ 28701:3bce3d2fd727
run-tests: make _processoutput picky about optional globs
1ad0ddf8cccc enabled lines that were not matched to be found later in cases of jitter.
Unfortunately, in this model an optional line would always jitter to the end when
it is not present. That is not ideal.
It would be possible to do better, by queuing all writes until the end in case
an optional line jitters, but for now, it is simpler to assume optional lines
have a fixed place in the stream.
author | timeless <timeless@mozdev.org> |
---|---|
date | Wed, 30 Mar 2016 09:13:47 +0000 |
parents | fcafd84bc9c5 |
children | 50d11dd8ac02 |
line wrap: on
line source
from __future__ import absolute_import, print_function import StringIO from mercurial import wireproto class proto(object): def __init__(self, args): self.args = args def getargs(self, spec): args = self.args args.setdefault('*', {}) names = spec.split() return [args[n] for n in names] class clientpeer(wireproto.wirepeer): def __init__(self, serverrepo): self.serverrepo = serverrepo def _capabilities(self): return ['batch'] def _call(self, cmd, **args): return wireproto.dispatch(self.serverrepo, proto(args), cmd) def _callstream(self, cmd, **args): return StringIO.StringIO(self._call(cmd, **args)) @wireproto.batchable def greet(self, name): f = wireproto.future() yield {'name': mangle(name)}, f yield unmangle(f.value) class serverrepo(object): def greet(self, name): return "Hello, " + name def filtered(self, name): return self def mangle(s): return ''.join(chr(ord(c) + 1) for c in s) def unmangle(s): return ''.join(chr(ord(c) - 1) for c in s) def greet(repo, proto, name): return mangle(repo.greet(unmangle(name))) wireproto.commands['greet'] = (greet, 'name',) srv = serverrepo() clt = clientpeer(srv) print(clt.greet("Foobar")) b = clt.batch() fs = [b.greet(s) for s in ["Fo, =;:<o", "Bar"]] b.submit() print([f.value for f in fs])