Mercurial > hg
diff tests/dummysmtpd.py @ 50751:0a55206c5a1e
branching: merge stable into default
author | Raphaël Gomès <rgomes@octobus.net> |
---|---|
date | Thu, 06 Jul 2023 16:07:34 +0200 |
parents | b3a5af04da35 |
children | 8f0b0df79039 |
line wrap: on
line diff
--- a/tests/dummysmtpd.py Thu Jun 22 11:28:17 2023 +0200 +++ b/tests/dummysmtpd.py Thu Jul 06 16:07:34 2023 +0200 @@ -3,12 +3,11 @@ """dummy SMTP server for use in tests""" -import asyncore import optparse -import smtpd +import os +import socket import ssl import sys -import traceback from mercurial import ( pycompat, @@ -18,54 +17,97 @@ ) +if os.environ.get('HGIPV6', '0') == '1': + family = socket.AF_INET6 +else: + family = socket.AF_INET + + def log(msg): sys.stdout.write(msg) sys.stdout.flush() -class dummysmtpserver(smtpd.SMTPServer): - def __init__(self, localaddr): - smtpd.SMTPServer.__init__(self, localaddr, remoteaddr=None) +def mocksmtpserversession(conn, addr): + conn.send(b'220 smtp.example.com ESMTP\r\n') + + line = conn.recv(1024) + if not line.lower().startswith(b'ehlo '): + log('no hello: %s\n' % line) + return + + conn.send(b'250 Hello\r\n') - def process_message(self, peer, mailfrom, rcpttos, data, **kwargs): - log('%s from=%s to=%s\n' % (peer[0], mailfrom, ', '.join(rcpttos))) + line = conn.recv(1024) + if not line.lower().startswith(b'mail from:'): + log('no mail from: %s\n' % line) + return + mailfrom = line[10:].decode().rstrip() + if mailfrom.startswith('<') and mailfrom.endswith('>'): + mailfrom = mailfrom[1:-1] + + conn.send(b'250 Ok\r\n') - def handle_error(self): - # On Windows, a bad SSL connection sometimes generates a WSAECONNRESET. - # The default handler will shutdown this server, and then both the - # current connection and subsequent ones fail on the client side with - # "No connection could be made because the target machine actively - # refused it". If we eat the error, then the client properly aborts in - # the expected way, and the server is available for subsequent requests. - traceback.print_exc() + rcpttos = [] + while True: + line = conn.recv(1024) + if not line.lower().startswith(b'rcpt to:'): + break + rcptto = line[8:].decode().rstrip() + if rcptto.startswith('<') and rcptto.endswith('>'): + rcptto = rcptto[1:-1] + rcpttos.append(rcptto) + + conn.send(b'250 Ok\r\n') + + if not line.lower().strip() == b'data': + log('no rcpt to or data: %s' % line) + + conn.send(b'354 Go ahead\r\n') + + data = b'' + while True: + line = conn.recv(1024) + if not line: + log('connection closed before end of data') + break + data += line + if data.endswith(b'\r\n.\r\n'): + data = data[:-5] + break + + conn.send(b'250 Ok\r\n') + + log( + '%s from=%s to=%s\n%s\n' + % (addr[0], mailfrom, ', '.join(rcpttos), data.decode()) + ) -class dummysmtpsecureserver(dummysmtpserver): - def __init__(self, localaddr, certfile): - dummysmtpserver.__init__(self, localaddr) - self._certfile = certfile - - def handle_accept(self): - pair = self.accept() - if not pair: - return - conn, addr = pair - ui = uimod.ui.load() +def run(host, port, certificate): + ui = uimod.ui.load() + with socket.socket(family, socket.SOCK_STREAM) as s: + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind((host, port)) + # log('listening at %s:%d\n' % (host, port)) + s.listen(1) try: - # wrap_socket() would block, but we don't care - conn = sslutil.wrapserversocket(conn, ui, certfile=self._certfile) - except ssl.SSLError: - log('%s ssl error\n' % addr[0]) - conn.close() - return - smtpd.SMTPChannel(self, conn, addr) - - -def run(): - try: - asyncore.loop() - except KeyboardInterrupt: - pass + while True: + conn, addr = s.accept() + if certificate: + try: + conn = sslutil.wrapserversocket( + conn, ui, certfile=certificate + ) + except ssl.SSLError as e: + log('%s ssl error: %s\n' % (addr[0], e)) + conn.close() + continue + log("connection from %s:%s\n" % addr) + mocksmtpserversession(conn, addr) + conn.close() + except KeyboardInterrupt: + pass def _encodestrsonly(v): @@ -93,26 +135,18 @@ op.add_option('--pid-file', metavar='FILE') op.add_option('--tls', choices=['none', 'smtps'], default='none') op.add_option('--certificate', metavar='FILE') + op.add_option('--logfile', metavar='FILE') opts, args = op.parse_args() - if opts.tls == 'smtps' and not opts.certificate: - op.error('--certificate must be specified') - - addr = (opts.address, opts.port) - - def init(): - if opts.tls == 'none': - dummysmtpserver(addr) - else: - dummysmtpsecureserver(addr, opts.certificate) - log('listening at %s:%d\n' % addr) + if (opts.tls == 'smtps') != bool(opts.certificate): + op.error('--certificate must be specified with --tls=smtps') server.runservice( bytesvars(opts), - initfn=init, - runfn=run, + runfn=lambda: run(opts.address, opts.port, opts.certificate), runargs=[pycompat.sysexecutable, pycompat.fsencode(__file__)] + pycompat.sysargv[1:], + logfile=opts.logfile, )