Mercurial > hg
view contrib/hgclient.py @ 50342:c54e9bb5737e stable
sslutil: set context security level for legacy tls testing (issue6760)
Current versions of OpenSSL do not allow the use of TLS <1.2 when the
library's security level is >=1 (1 being the default on most distributions).
Setting the security level in addition to the minimum protocol is therefore
necessary for the legacy protocol tests.
This is done here ONLY when testing, when:
- explicitly setting the cipher string, or
- using the "--insecure" flag, or
- using the "devel.serverexactprotocol" testing option.
See: https://github.com/openssl/openssl/blob/master/NEWS.md#major-changes-between-openssl-30-and-openssl-310-14-mar-2023
author | pacien <pacien.trangirard@pacien.net> |
---|---|
date | Wed, 12 Apr 2023 17:28:39 +0200 |
parents | 642e31cb55f0 |
children | 493034cc3265 |
line wrap: on
line source
# A minimal client for Mercurial's command server import io import os import re import signal import socket import struct import subprocess import sys import time if sys.version_info[0] >= 3: stdout = sys.stdout.buffer stderr = sys.stderr.buffer stringio = io.BytesIO def bprint(*args): # remove b'' as well for ease of test migration pargs = [re.sub(br'''\bb(['"])''', br'\1', b'%s' % a) for a in args] stdout.write(b' '.join(pargs) + b'\n') else: import cStringIO stdout = sys.stdout stderr = sys.stderr stringio = cStringIO.StringIO bprint = print def connectpipe(path=None, extraargs=()): cmdline = [b'hg', b'serve', b'--cmdserver', b'pipe'] if path: cmdline += [b'-R', path] cmdline.extend(extraargs) def tonative(cmdline): if os.name != 'nt': return cmdline return [arg.decode("utf-8") for arg in cmdline] server = subprocess.Popen( tonative(cmdline), stdin=subprocess.PIPE, stdout=subprocess.PIPE ) return server class unixconnection: def __init__(self, sockpath): self.sock = sock = socket.socket(socket.AF_UNIX) sock.connect(sockpath) self.stdin = sock.makefile('wb') self.stdout = sock.makefile('rb') def wait(self): self.stdin.close() self.stdout.close() self.sock.close() class unixserver: def __init__(self, sockpath, logpath=None, repopath=None): self.sockpath = sockpath cmdline = [b'hg', b'serve', b'--cmdserver', b'unix', b'-a', sockpath] if repopath: cmdline += [b'-R', repopath] if logpath: stdout = open(logpath, 'a') stderr = subprocess.STDOUT else: stdout = stderr = None self.server = subprocess.Popen(cmdline, stdout=stdout, stderr=stderr) # wait for listen() while self.server.poll() is None: if os.path.exists(sockpath): break time.sleep(0.1) def connect(self): return unixconnection(self.sockpath) def shutdown(self): os.kill(self.server.pid, signal.SIGTERM) self.server.wait() def writeblock(server, data): server.stdin.write(struct.pack(b'>I', len(data))) server.stdin.write(data) server.stdin.flush() def readchannel(server): data = server.stdout.read(5) if not data: raise EOFError channel, length = struct.unpack('>cI', data) if channel in b'IL': return channel, length else: return channel, server.stdout.read(length) def sep(text): return text.replace(b'\\', b'/') def runcommand( server, args, output=stdout, error=stderr, input=None, outfilter=lambda x: x ): bprint(b'*** runcommand', b' '.join(args)) stdout.flush() server.stdin.write(b'runcommand\n') writeblock(server, b'\0'.join(args)) if not input: input = stringio() while True: ch, data = readchannel(server) if ch == b'o': output.write(outfilter(data)) output.flush() elif ch == b'e': error.write(data) error.flush() elif ch == b'I': writeblock(server, input.read(data)) elif ch == b'L': writeblock(server, input.readline(data)) elif ch == b'm': bprint(b"message: %r" % data) elif ch == b'r': (ret,) = struct.unpack('>i', data) if ret != 0: bprint(b' [%d]' % ret) return ret else: bprint(b"unexpected channel %c: %r" % (ch, data)) if ch.isupper(): return def check(func, connect=connectpipe): stdout.flush() server = connect() try: return func(server) finally: server.stdin.close() server.wait() def checkwith(connect=connectpipe, **kwargs): def wrap(func): return check(func, lambda: connect(**kwargs)) return wrap