view tests/test-wireproto.py @ 50195:11e6eee4b063

transaction: use the standard transaction mechanism to backup branch Branch is a bit special : - It currently does not collaborate with the transaction (or any scoping) for writing (this is bad) - It can change without the lock being taken (it is protected by `wlock`) So we rely on the same mechanism as for the backup of the other dirstate file: - we only do a backup if we hold the wlock - we force a backup though the transaction Since "branch" write does not collaborate with the transaction, we cannot back it up "at the last minute" as we do for the dirstate. We have to back it up "upfront". Since we have a backup, the transaction is no longer doing its "quick_abort" and get noisy. Which is quite annoying. To work around this, and to avoid jumping in yet-another-rabbit-hole of "getting branch written properly", I am doing horrible things to the transaction in the meantime. We should be able to get this code go away during the next cycle. In the meantime, I prefer to take this small stop so that we stop abusing the "journal" and "undo" mechanism instead of the proper backup mechanism of the transaction. Also note that this change regress the warning message for the legacy fallback introduced in 2008 when issue902 got fixed in dd5a501cb97f (Mercurial 1.0). I feel like this is fine as issue 902 remains fixed, and this would only affect people deploying a mix of 15 year old Mercurial and modern mercurial, and using branch and rollback extensively.
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Thu, 23 Feb 2023 15:37:46 +0100
parents 642e31cb55f0
children
line wrap: on
line source

import sys

from mercurial import (
    error,
    pycompat,
    ui as uimod,
    util,
    wireprototypes,
    wireprotov1peer,
    wireprotov1server,
)
from mercurial.utils import stringutil

stringio = util.stringio


class proto:
    def __init__(self, args):
        self.args = args
        self.name = 'dummyproto'

    def getargs(self, spec):
        args = self.args
        args.setdefault(b'*', {})
        names = spec.split()
        return [args[n] for n in names]

    def checkperm(self, perm):
        pass


wireprototypes.TRANSPORTS['dummyproto'] = {
    'transport': 'dummy',
    'version': 1,
}


class clientpeer(wireprotov1peer.wirepeer):
    def __init__(self, serverrepo, ui):
        self.serverrepo = serverrepo
        self.ui = ui

    def url(self):
        return b'test'

    def local(self):
        return None

    def peer(self):
        return self

    def canpush(self):
        return True

    def close(self):
        pass

    def capabilities(self):
        return [b'batch']

    def _call(self, cmd, **args):
        args = pycompat.byteskwargs(args)
        res = wireprotov1server.dispatch(self.serverrepo, proto(args), cmd)
        if isinstance(res, wireprototypes.bytesresponse):
            return res.data
        elif isinstance(res, bytes):
            return res
        else:
            raise error.Abort('dummy client does not support response type')

    def _callstream(self, cmd, **args):
        return stringio(self._call(cmd, **args))

    @wireprotov1peer.batchable
    def greet(self, name):
        return {b'name': mangle(name)}, unmangle


class serverrepo:
    def __init__(self, ui):
        self.ui = ui

    def greet(self, name):
        return b"Hello, " + name

    def filtered(self, name):
        return self


def mangle(s):
    return b''.join(pycompat.bytechr(ord(c) + 1) for c in pycompat.bytestr(s))


def unmangle(s):
    return b''.join(pycompat.bytechr(ord(c) - 1) for c in pycompat.bytestr(s))


def greet(repo, proto, name):
    return mangle(repo.greet(unmangle(name)))


wireprotov1server.commands[b'greet'] = (greet, b'name')

srv = serverrepo(uimod.ui())
clt = clientpeer(srv, uimod.ui())


def printb(data, end=b'\n'):
    out = getattr(sys.stdout, 'buffer', sys.stdout)
    out.write(data + end)
    out.flush()


printb(clt.greet(b"Foobar"))

with clt.commandexecutor() as e:
    fgreet1 = e.callcommand(b'greet', {b'name': b'Fo, =;:<o'})
    fgreet2 = e.callcommand(b'greet', {b'name': b'Bar'})

printb(
    stringutil.pprint([f.result() for f in (fgreet1, fgreet2)], bprefix=True)
)