view tests/test-sshserver.py @ 47682:78f7f0d490ee

dirstate-v2: Move fixed-size tree metadata into the docket file Before this changeset, the dirstate-v2 data file contained not only nodes and paths that may be reused when appending to an existing file, but also some fixed-size metadata that applies to the entire tree and was added at the end of the data file for every append. This moves that metadata into the docket file, so that repeated "append" operations without meaningful changes don’t actually need to grow any file. Differential Revision: https://phab.mercurial-scm.org/D11098
author Simon Sapin <simon.sapin@octobus.net>
date Thu, 15 Jul 2021 23:02:17 +0200
parents 2372284d9457
children 6000f5b25c9b
line wrap: on
line source

from __future__ import absolute_import, print_function

import io
import unittest

import silenttestrunner

from mercurial import (
    wireprotoserver,
    wireprotov1server,
)

from mercurial.utils import procutil


class SSHServerGetArgsTests(unittest.TestCase):
    def testparseknown(self):
        tests = [
            (b'* 0\nnodes 0\n', [b'', {}]),
            (
                b'* 0\nnodes 40\n1111111111111111111111111111111111111111\n',
                [b'1111111111111111111111111111111111111111', {}],
            ),
        ]
        for input, expected in tests:
            self.assertparse(b'known', input, expected)

    def assertparse(self, cmd, input, expected):
        server = mockserver(input)
        proto = wireprotoserver.sshv1protocolhandler(
            server._ui, server._fin, server._fout
        )
        _func, spec = wireprotov1server.commands[cmd]
        self.assertEqual(proto.getargs(spec), expected)


def mockserver(inbytes):
    ui = mockui(inbytes)
    repo = mockrepo(ui)
    return wireprotoserver.sshserver(ui, repo)


class mockrepo(object):
    def __init__(self, ui):
        self.ui = ui


class mockui(object):
    def __init__(self, inbytes):
        self.fin = io.BytesIO(inbytes)
        self.fout = io.BytesIO()
        self.ferr = io.BytesIO()

    def protectfinout(self):
        return self.fin, self.fout

    def restorefinout(self, fin, fout):
        pass


if __name__ == '__main__':
    # Don't call into msvcrt to set BytesIO to binary mode
    procutil.setbinary = lambda fp: True
    silenttestrunner.main(__name__)