Mercurial > hg
view tests/sshprotoext.py @ 36008:006ff7268c5c
diff: remove fp.write() wrapper which drops label argument
It's no longer needed since we've split labeled write() from file-like
write().
author | Yuya Nishihara <yuya@tcha.org> |
---|---|
date | Wed, 07 Feb 2018 23:22:53 +0900 |
parents | 48a3a9283f09 |
children | 5767664d39a5 |
line wrap: on
line source
# sshprotoext.py - Extension to test behavior of SSH protocol # # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com> # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. # This extension replaces the SSH server started via `hg serve --stdio`. # The server behaves differently depending on environment variables. from __future__ import absolute_import from mercurial import ( error, extensions, registrar, sshpeer, wireproto, wireprotoserver, ) configtable = {} configitem = registrar.configitem(configtable) configitem('sshpeer', 'mode', default=None) configitem('sshpeer', 'handshake-mode', default=None) class bannerserver(wireprotoserver.sshserver): """Server that sends a banner to stdout.""" def serve_forever(self): for i in range(10): self._fout.write(b'banner: line %d\n' % i) super(bannerserver, self).serve_forever() class prehelloserver(wireprotoserver.sshserver): """Tests behavior when connecting to <0.9.1 servers. The ``hello`` wire protocol command was introduced in Mercurial 0.9.1. Modern clients send the ``hello`` command when connecting to SSH servers. This mock server tests behavior of the handshake when ``hello`` is not supported. """ def serve_forever(self): l = self._fin.readline() assert l == b'hello\n' # Respond to unknown commands with an empty reply. self._sendresponse(b'') l = self._fin.readline() assert l == b'between\n' rsp = wireproto.dispatch(self._repo, self, b'between') self._handlers[rsp.__class__](self, rsp) super(prehelloserver, self).serve_forever() class upgradev2server(wireprotoserver.sshserver): """Tests behavior for clients that issue upgrade to version 2.""" def serve_forever(self): name = wireprotoserver.SSHV2 l = self._fin.readline() assert l.startswith(b'upgrade ') token, caps = l[:-1].split(b' ')[1:] assert caps == b'proto=%s' % name # Filter hello and between requests. l = self._fin.readline() assert l == b'hello\n' l = self._fin.readline() assert l == b'between\n' l = self._fin.readline() assert l == 'pairs 81\n' self._fin.read(81) # Send the upgrade response. self._fout.write(b'upgraded %s %s\n' % (token, name)) servercaps = wireproto.capabilities(self._repo, self) rsp = b'capabilities: %s' % servercaps self._fout.write(b'%d\n' % len(rsp)) self._fout.write(rsp) self._fout.write(b'\n') self._fout.flush() super(upgradev2server, self).serve_forever() def performhandshake(orig, ui, stdin, stdout, stderr): """Wrapped version of sshpeer._performhandshake to send extra commands.""" mode = ui.config(b'sshpeer', b'handshake-mode') if mode == b'pre-no-args': ui.debug(b'sending no-args command\n') stdin.write(b'no-args\n') stdin.flush() return orig(ui, stdin, stdout, stderr) elif mode == b'pre-multiple-no-args': ui.debug(b'sending unknown1 command\n') stdin.write(b'unknown1\n') ui.debug(b'sending unknown2 command\n') stdin.write(b'unknown2\n') ui.debug(b'sending unknown3 command\n') stdin.write(b'unknown3\n') stdin.flush() return orig(ui, stdin, stdout, stderr) else: raise error.ProgrammingError(b'unknown HANDSHAKECOMMANDMODE: %s' % mode) def extsetup(ui): # It's easier for tests to define the server behavior via environment # variables than config options. This is because `hg serve --stdio` # has to be invoked with a certain form for security reasons and # `dummyssh` can't just add `--config` flags to the command line. servermode = ui.environ.get(b'SSHSERVERMODE') if servermode == b'banner': wireprotoserver.sshserver = bannerserver elif servermode == b'no-hello': wireprotoserver.sshserver = prehelloserver elif servermode == b'upgradev2': wireprotoserver.sshserver = upgradev2server elif servermode: raise error.ProgrammingError(b'unknown server mode: %s' % servermode) peermode = ui.config(b'sshpeer', b'mode') if peermode == b'extra-handshake-commands': extensions.wrapfunction(sshpeer, '_performhandshake', performhandshake) elif peermode: raise error.ProgrammingError(b'unknown peer mode: %s' % peermode)