view tests/sshprotoext.py @ 40093:726cfc47f17a

contrib: add an utility module to parse test scripts This patch centralizes the logic to pick up code fragments embedded in *.t script, in order to: - apply checking with patterns in check-code.py on such embedded code fragments Now, check-code.py completely ignores embedded code fragments. I'll post another patch series to check them. - replace similar code path in contrib/import-checker.py Current import-checker.py has problems below. Fixing each of them is a little difficult, because parsing logic and pattern strings are tightly coupled. - overlook (or mis-detect) the end of inline script in doctest style 8a8dd6e4a97a fixed a part of this issue, but not enough. - it overlooks inline script in doctest style at the end of file (and ignores invalid un-closed heredoc at the end of file, too) - it overlooks code fragment in styles below - "python <<EOF" (heredoc should be "cat > file <<EOF" style) - "cat > foobar.py << ANYLIMIT" (limit mark should be "EOF") - "cat << EOF > foobar.py" (filename should be placed before limit mark) - "cat >> foobar.py << EOF" (appending is ignored) - it is not extensible for other than python code fragments (e.g. shell script, hgrc file, and so on) This new module can detect python code fragments in styles below: - inline script in doctest style (starting by " >>> " line) - python invocation with heredoc script ("python <<EOF") - python script in heredoc style (redirected into ".py" file) As an example of extensibility of new module, this patch also contains implementation to pick up code fragment below. This will be useful to add additional restriction for them, for example. - shell script in heredoc style (redirected into ".sh" file) - hgrc configuration in heredoc style (redirected into hgrc or $HGRCPATH)
author FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
date Thu, 23 Aug 2018 12:25:54 +0900
parents b4d85bc122bd
children 2372284d9457
line wrap: on
line source

# sshprotoext.py - Extension to test behavior of SSH protocol
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

# This extension replaces the SSH server started via `hg serve --stdio`.
# The server behaves differently depending on environment variables.

from __future__ import absolute_import

from mercurial import (
    error,
    extensions,
    registrar,
    sshpeer,
    wireprotoserver,
    wireprotov1server,
)

configtable = {}
configitem = registrar.configitem(configtable)

configitem(b'sshpeer', b'mode', default=None)
configitem(b'sshpeer', b'handshake-mode', default=None)

class bannerserver(wireprotoserver.sshserver):
    """Server that sends a banner to stdout."""
    def serve_forever(self):
        for i in range(10):
            self._fout.write(b'banner: line %d\n' % i)

        super(bannerserver, self).serve_forever()

class prehelloserver(wireprotoserver.sshserver):
    """Tests behavior when connecting to <0.9.1 servers.

    The ``hello`` wire protocol command was introduced in Mercurial
    0.9.1. Modern clients send the ``hello`` command when connecting
    to SSH servers. This mock server tests behavior of the handshake
    when ``hello`` is not supported.
    """
    def serve_forever(self):
        l = self._fin.readline()
        assert l == b'hello\n'
        # Respond to unknown commands with an empty reply.
        wireprotoserver._sshv1respondbytes(self._fout, b'')
        l = self._fin.readline()
        assert l == b'between\n'
        proto = wireprotoserver.sshv1protocolhandler(self._ui, self._fin,
                                                     self._fout)
        rsp = wireprotov1server.dispatch(self._repo, proto, b'between')
        wireprotoserver._sshv1respondbytes(self._fout, rsp.data)

        super(prehelloserver, self).serve_forever()

def performhandshake(orig, ui, stdin, stdout, stderr):
    """Wrapped version of sshpeer._performhandshake to send extra commands."""
    mode = ui.config(b'sshpeer', b'handshake-mode')
    if mode == b'pre-no-args':
        ui.debug(b'sending no-args command\n')
        stdin.write(b'no-args\n')
        stdin.flush()
        return orig(ui, stdin, stdout, stderr)
    elif mode == b'pre-multiple-no-args':
        ui.debug(b'sending unknown1 command\n')
        stdin.write(b'unknown1\n')
        ui.debug(b'sending unknown2 command\n')
        stdin.write(b'unknown2\n')
        ui.debug(b'sending unknown3 command\n')
        stdin.write(b'unknown3\n')
        stdin.flush()
        return orig(ui, stdin, stdout, stderr)
    else:
        raise error.ProgrammingError(b'unknown HANDSHAKECOMMANDMODE: %s' %
                                     mode)

def extsetup(ui):
    # It's easier for tests to define the server behavior via environment
    # variables than config options. This is because `hg serve --stdio`
    # has to be invoked with a certain form for security reasons and
    # `dummyssh` can't just add `--config` flags to the command line.
    servermode = ui.environ.get(b'SSHSERVERMODE')

    if servermode == b'banner':
        wireprotoserver.sshserver = bannerserver
    elif servermode == b'no-hello':
        wireprotoserver.sshserver = prehelloserver
    elif servermode:
        raise error.ProgrammingError(b'unknown server mode: %s' % servermode)

    peermode = ui.config(b'sshpeer', b'mode')

    if peermode == b'extra-handshake-commands':
        extensions.wrapfunction(sshpeer, '_performhandshake', performhandshake)
    elif peermode:
        raise error.ProgrammingError(b'unknown peer mode: %s' % peermode)