view tests/test-sshserver.py @ 49788:31b4675ca998 stable

emitrevision: if we need to compute a delta on the fly, try p1 or p2 first Falling back to `prev` does not yield any real value on modern storage and result in pathological changes to be created on the other side. Doing a delta against a parent will likely be smaller (helping the network) and will be safer to apply on the client (helping future pulls by Triggering intermediate snapshop where they will be needed by later deltas).
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Thu, 24 Nov 2022 04:04:19 +0100
parents 642e31cb55f0
children 13c004b54cbe
line wrap: on
line source

import io
import unittest

import silenttestrunner

from mercurial import (
    wireprotoserver,
    wireprotov1server,
)

from mercurial.utils import procutil


class SSHServerGetArgsTests(unittest.TestCase):
    def testparseknown(self):
        tests = [
            (b'* 0\nnodes 0\n', [b'', {}]),
            (
                b'* 0\nnodes 40\n1111111111111111111111111111111111111111\n',
                [b'1111111111111111111111111111111111111111', {}],
            ),
        ]
        for input, expected in tests:
            self.assertparse(b'known', input, expected)

    def assertparse(self, cmd, input, expected):
        server = mockserver(input)
        proto = wireprotoserver.sshv1protocolhandler(
            server._ui, server._fin, server._fout
        )
        _func, spec = wireprotov1server.commands[cmd]
        self.assertEqual(proto.getargs(spec), expected)


def mockserver(inbytes):
    ui = mockui(inbytes)
    repo = mockrepo(ui)
    return wireprotoserver.sshserver(ui, repo)


class mockrepo:
    def __init__(self, ui):
        self.ui = ui


class mockui:
    def __init__(self, inbytes):
        self.fin = io.BytesIO(inbytes)
        self.fout = io.BytesIO()
        self.ferr = io.BytesIO()

    def protectfinout(self):
        return self.fin, self.fout

    def restorefinout(self, fin, fout):
        pass


if __name__ == '__main__':
    # Don't call into msvcrt to set BytesIO to binary mode
    procutil.setbinary = lambda fp: True
    silenttestrunner.main(__name__)