view tests/dummysmtpd.py @ 34682:7e3001b74ab3

tersestatus: re-implement the functionality to terse the status The previous terse status implementation was hacking around os.listdir() and was flaky. There have been a lot of instances of mercurial buildbots failing and google's internal builds failing because of the hacky implementation of terse status. Even though I wrote the last implementation but it was hard for me to find the reason for the flake. The new implementation can be slower than the old one but is clean and easy to understand. In this we create a node object for each directory and create a tree like structure starting from the root of the working copy. While building the tree like structure we store some information on the nodes which will be helpful for deciding later whether we can terse the dir or not. Once the whole tree is build we traverse and built the list of files for each status with required tersing. There is no behaviour change as the old test, test-status-terse.t passes with the new implementation. Differential Revision: https://phab.mercurial-scm.org/D985
author Pulkit Goyal <7895pulkit@gmail.com>
date Fri, 06 Oct 2017 20:54:23 +0530
parents d83ca854fa21
children 75bae69747f0
line wrap: on
line source

#!/usr/bin/env python

"""dummy SMTP server for use in tests"""

from __future__ import absolute_import

import asyncore
import optparse
import smtpd
import ssl
import sys

from mercurial import (
    server,
    sslutil,
    ui as uimod,
)

def log(msg):
    sys.stdout.write(msg)
    sys.stdout.flush()

class dummysmtpserver(smtpd.SMTPServer):
    def __init__(self, localaddr):
        smtpd.SMTPServer.__init__(self, localaddr, remoteaddr=None)

    def process_message(self, peer, mailfrom, rcpttos, data):
        log('%s from=%s to=%s\n' % (peer[0], mailfrom, ', '.join(rcpttos)))

class dummysmtpsecureserver(dummysmtpserver):
    def __init__(self, localaddr, certfile):
        dummysmtpserver.__init__(self, localaddr)
        self._certfile = certfile

    def handle_accept(self):
        pair = self.accept()
        if not pair:
            return
        conn, addr = pair
        ui = uimod.ui.load()
        try:
            # wrap_socket() would block, but we don't care
            conn = sslutil.wrapserversocket(conn, ui, certfile=self._certfile)
        except ssl.SSLError:
            log('%s ssl error\n' % addr[0])
            conn.close()
            return
        smtpd.SMTPChannel(self, conn, addr)

def run():
    try:
        asyncore.loop()
    except KeyboardInterrupt:
        pass

def main():
    op = optparse.OptionParser()
    op.add_option('-d', '--daemon', action='store_true')
    op.add_option('--daemon-postexec', action='append')
    op.add_option('-p', '--port', type=int, default=8025)
    op.add_option('-a', '--address', default='localhost')
    op.add_option('--pid-file', metavar='FILE')
    op.add_option('--tls', choices=['none', 'smtps'], default='none')
    op.add_option('--certificate', metavar='FILE')

    opts, args = op.parse_args()
    if opts.tls == 'smtps' and not opts.certificate:
        op.error('--certificate must be specified')

    addr = (opts.address, opts.port)
    def init():
        if opts.tls == 'none':
            dummysmtpserver(addr)
        else:
            dummysmtpsecureserver(addr, opts.certificate)
        log('listening at %s:%d\n' % addr)

    server.runservice(vars(opts), initfn=init, runfn=run,
                      runargs=[sys.executable, __file__] + sys.argv[1:])

if __name__ == '__main__':
    main()