tests/test-wireproto.py
author Manuel Jacob <me@manueljacob.de>
Wed, 24 Jun 2020 14:44:21 +0200
changeset 44998 f2de8f31cb59
parent 43076 2372284d9457
child 47873 c424ff4807e6
permissions -rw-r--r--
pycompat: use os.fsencode() to re-encode sys.argv Historically, the previous code made sense, as Py_EncodeLocale() and fs.fsencode() could possibly use different encodings. However, this is not the case anymore for Python 3.2, which uses the locale encoding as the filesystem encoding (this is not true for later Python versions, but see below). See https://vstinner.github.io/painful-history-python-filesystem-encoding.html for a source and more background information. Using os.fsencode() is safer, as the documentation for sys.argv says that it can be used to get the original bytes. When doing further changes, the Python developers will take care that this continues to work. One concrete case where os.fsencode() is more correct is when enabling Python's UTF-8 mode. Py_DecodeLocale() will use UTF-8 in this case. Our previous code would have encoded it using the locale encoding (which might be different), whereas os.fsencode() will encode it with UTF-8. Since we don’t claim to support the UTF-8 mode, this is not really a bug and the patch can go to the default branch. It might be a good idea to not commit this to the stable branch, as it could in theory introduce regressions.

from __future__ import absolute_import, print_function

import sys

from mercurial import (
    error,
    pycompat,
    ui as uimod,
    util,
    wireprototypes,
    wireprotov1peer,
    wireprotov1server,
)
from mercurial.utils import stringutil

stringio = util.stringio


class proto(object):
    def __init__(self, args):
        self.args = args
        self.name = 'dummyproto'

    def getargs(self, spec):
        args = self.args
        args.setdefault(b'*', {})
        names = spec.split()
        return [args[n] for n in names]

    def checkperm(self, perm):
        pass


wireprototypes.TRANSPORTS['dummyproto'] = {
    'transport': 'dummy',
    'version': 1,
}


class clientpeer(wireprotov1peer.wirepeer):
    def __init__(self, serverrepo, ui):
        self.serverrepo = serverrepo
        self.ui = ui

    def url(self):
        return b'test'

    def local(self):
        return None

    def peer(self):
        return self

    def canpush(self):
        return True

    def close(self):
        pass

    def capabilities(self):
        return [b'batch']

    def _call(self, cmd, **args):
        args = pycompat.byteskwargs(args)
        res = wireprotov1server.dispatch(self.serverrepo, proto(args), cmd)
        if isinstance(res, wireprototypes.bytesresponse):
            return res.data
        elif isinstance(res, bytes):
            return res
        else:
            raise error.Abort('dummy client does not support response type')

    def _callstream(self, cmd, **args):
        return stringio(self._call(cmd, **args))

    @wireprotov1peer.batchable
    def greet(self, name):
        f = wireprotov1peer.future()
        yield {b'name': mangle(name)}, f
        yield unmangle(f.value)


class serverrepo(object):
    def __init__(self, ui):
        self.ui = ui

    def greet(self, name):
        return b"Hello, " + name

    def filtered(self, name):
        return self


def mangle(s):
    return b''.join(pycompat.bytechr(ord(c) + 1) for c in pycompat.bytestr(s))


def unmangle(s):
    return b''.join(pycompat.bytechr(ord(c) - 1) for c in pycompat.bytestr(s))


def greet(repo, proto, name):
    return mangle(repo.greet(unmangle(name)))


wireprotov1server.commands[b'greet'] = (greet, b'name')

srv = serverrepo(uimod.ui())
clt = clientpeer(srv, uimod.ui())


def printb(data, end=b'\n'):
    out = getattr(sys.stdout, 'buffer', sys.stdout)
    out.write(data + end)
    out.flush()


printb(clt.greet(b"Foobar"))

with clt.commandexecutor() as e:
    fgreet1 = e.callcommand(b'greet', {b'name': b'Fo, =;:<o'})
    fgreet2 = e.callcommand(b'greet', {b'name': b'Bar'})

printb(
    stringutil.pprint([f.result() for f in (fgreet1, fgreet2)], bprefix=True)
)