tests/test-wireproto.py
author Pulkit Goyal <7895pulkit@gmail.com>
Sat, 23 Dec 2017 15:13:37 +0530
changeset 36063 5a53af7d09aa
parent 33806 dedab036215d
child 36074 2f7290555c96
permissions -rw-r--r--
remotenames: introduce new template keywords for remotenames This patch introduces three new template keywords 'remotenames', 'remotebookmarks', 'remotebranches' to show remotenames, remotebookmarks and remotebranches associated to a changeset. This is a part of moving hgremotenames extension to core. The remotenames template keyword was present in the extension and the rest of the two are not present in the hgremotenames extension and are introduced in this patch. hgremotenames: https://bitbucket.org/seanfarley/hgremotenames Differential Revision: https://phab.mercurial-scm.org/D1759

from __future__ import absolute_import, print_function

from mercurial import (
    util,
    wireproto,
)
stringio = util.stringio

class proto(object):
    def __init__(self, args):
        self.args = args
    def getargs(self, spec):
        args = self.args
        args.setdefault('*', {})
        names = spec.split()
        return [args[n] for n in names]

class clientpeer(wireproto.wirepeer):
    def __init__(self, serverrepo):
        self.serverrepo = serverrepo

    @property
    def ui(self):
        return self.serverrepo.ui

    def url(self):
        return 'test'

    def local(self):
        return None

    def peer(self):
        return self

    def canpush(self):
        return True

    def close(self):
        pass

    def capabilities(self):
        return ['batch']

    def _call(self, cmd, **args):
        return wireproto.dispatch(self.serverrepo, proto(args), cmd)

    def _callstream(self, cmd, **args):
        return stringio(self._call(cmd, **args))

    @wireproto.batchable
    def greet(self, name):
        f = wireproto.future()
        yield {'name': mangle(name)}, f
        yield unmangle(f.value)

class serverrepo(object):
    def greet(self, name):
        return "Hello, " + name

    def filtered(self, name):
        return self

def mangle(s):
    return ''.join(chr(ord(c) + 1) for c in s)
def unmangle(s):
    return ''.join(chr(ord(c) - 1) for c in s)

def greet(repo, proto, name):
    return mangle(repo.greet(unmangle(name)))

wireproto.commands['greet'] = (greet, 'name',)

srv = serverrepo()
clt = clientpeer(srv)

print(clt.greet("Foobar"))
b = clt.iterbatch()
map(b.greet, ('Fo, =;:<o', 'Bar'))
b.submit()
print([r for r in b.results()])