view tests/test-wireproto.py @ 33796:4f8c241b2bfa

test-pushvars: invoke shell script hook via `sh` for Windows Invoking *.sh on Windows leads to the "what program should open this?" prompt, which stalls the test and led to the recent series of exceptions on the Windows test machine as the runner times out.
author Matt Harbison <matt_harbison@yahoo.com>
date Mon, 14 Aug 2017 22:26:48 -0400
parents b47fe9733d76
children dedab036215d
line wrap: on
line source

from __future__ import absolute_import, print_function

from mercurial import (
    util,
    wireproto,
)
stringio = util.stringio

class proto(object):
    def __init__(self, args):
        self.args = args
    def getargs(self, spec):
        args = self.args
        args.setdefault('*', {})
        names = spec.split()
        return [args[n] for n in names]

class clientpeer(wireproto.wirepeer):
    def __init__(self, serverrepo):
        self.serverrepo = serverrepo

    def _capabilities(self):
        return ['batch']

    def _call(self, cmd, **args):
        return wireproto.dispatch(self.serverrepo, proto(args), cmd)

    def _callstream(self, cmd, **args):
        return stringio(self._call(cmd, **args))

    @wireproto.batchable
    def greet(self, name):
        f = wireproto.future()
        yield {'name': mangle(name)}, f
        yield unmangle(f.value)

class serverrepo(object):
    def greet(self, name):
        return "Hello, " + name

    def filtered(self, name):
        return self

def mangle(s):
    return ''.join(chr(ord(c) + 1) for c in s)
def unmangle(s):
    return ''.join(chr(ord(c) - 1) for c in s)

def greet(repo, proto, name):
    return mangle(repo.greet(unmangle(name)))

wireproto.commands['greet'] = (greet, 'name',)

srv = serverrepo()
clt = clientpeer(srv)

print(clt.greet("Foobar"))
b = clt.iterbatch()
map(b.greet, ('Fo, =;:<o', 'Bar'))
b.submit()
print([r for r in b.results()])