Mercurial > hg
view tests/test-wireproto.py @ 45239:13814622b3b1
commitctx: extract all the file preparation logic in a new function
Before we actually start to create a new commit we have a large block of logic
that do the necessary file and manifest commit and that determine which files
are been affected by the commit (and how).
This is a complex process on its own. It return a "simple" output that can be
fed to the next step. The output itself is not that simple as we return a lot of
individual items (files, added, removed, ...). My next step (and actual goal for
this cleanup) will be to simplify the return by returning a richer object that
will be more suited for the variation of data we want to store.
After this changeset the `commitctx` is a collection of smaller function with
limited scope. The largest one is still `_filecommit` without about 100 lines of
code.
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Thu, 23 Jul 2020 23:52:31 +0200 |
parents | 2372284d9457 |
children | c424ff4807e6 |
line wrap: on
line source
from __future__ import absolute_import, print_function import sys from mercurial import ( error, pycompat, ui as uimod, util, wireprototypes, wireprotov1peer, wireprotov1server, ) from mercurial.utils import stringutil stringio = util.stringio class proto(object): def __init__(self, args): self.args = args self.name = 'dummyproto' def getargs(self, spec): args = self.args args.setdefault(b'*', {}) names = spec.split() return [args[n] for n in names] def checkperm(self, perm): pass wireprototypes.TRANSPORTS['dummyproto'] = { 'transport': 'dummy', 'version': 1, } class clientpeer(wireprotov1peer.wirepeer): def __init__(self, serverrepo, ui): self.serverrepo = serverrepo self.ui = ui def url(self): return b'test' def local(self): return None def peer(self): return self def canpush(self): return True def close(self): pass def capabilities(self): return [b'batch'] def _call(self, cmd, **args): args = pycompat.byteskwargs(args) res = wireprotov1server.dispatch(self.serverrepo, proto(args), cmd) if isinstance(res, wireprototypes.bytesresponse): return res.data elif isinstance(res, bytes): return res else: raise error.Abort('dummy client does not support response type') def _callstream(self, cmd, **args): return stringio(self._call(cmd, **args)) @wireprotov1peer.batchable def greet(self, name): f = wireprotov1peer.future() yield {b'name': mangle(name)}, f yield unmangle(f.value) class serverrepo(object): def __init__(self, ui): self.ui = ui def greet(self, name): return b"Hello, " + name def filtered(self, name): return self def mangle(s): return b''.join(pycompat.bytechr(ord(c) + 1) for c in pycompat.bytestr(s)) def unmangle(s): return b''.join(pycompat.bytechr(ord(c) - 1) for c in pycompat.bytestr(s)) def greet(repo, proto, name): return mangle(repo.greet(unmangle(name))) wireprotov1server.commands[b'greet'] = (greet, b'name') srv = serverrepo(uimod.ui()) clt = clientpeer(srv, uimod.ui()) def printb(data, end=b'\n'): out = getattr(sys.stdout, 'buffer', sys.stdout) out.write(data + end) out.flush() printb(clt.greet(b"Foobar")) with clt.commandexecutor() as e: fgreet1 = e.callcommand(b'greet', {b'name': b'Fo, =;:<o'}) fgreet2 = e.callcommand(b'greet', {b'name': b'Bar'}) printb( stringutil.pprint([f.result() for f in (fgreet1, fgreet2)], bprefix=True) )