Mercurial > hg
view tests/dummysmtpd.py @ 33377:5d63e5f40bea
revset: define successors revset
This revset returns all successors, including transit nodes and the source
nodes (to be consistent with existing revsets like "ancestors").
To filter out transit nodes, use `successors(X)-obsolete()`.
To filter out divergent case, use `successors(X)-divergent()-obsolete()`.
The revset could be useful to define rebase destination, like:
`max(successors(BASE)-divergent()-obsolete())`. The `max` is to deal with
splits.
There are other implementations where `successors` returns just one level of
successors, and `allsuccessors` returns everything. I think `successors`
returning all successors by default is more user friendly. We have seen
cases in production where people use 1-level `successors` while they really
want `allsuccessors`. So it seems better to just have one single revset
returning all successors by default to avoid user errors.
In the future we might want to add `depth` keyword argument to it and for
other revsets like `ancestors` etc. Or even build some flexible indexing
syntax [1] to satisfy people having the depth limit requirement.
[1]: https://www.mercurial-scm.org/pipermail/mercurial-devel/2017-July/101140.html
author | Jun Wu <quark@fb.com> |
---|---|
date | Mon, 10 Jul 2017 10:56:40 -0700 |
parents | d83ca854fa21 |
children | 75bae69747f0 |
line wrap: on
line source
#!/usr/bin/env python """dummy SMTP server for use in tests""" from __future__ import absolute_import import asyncore import optparse import smtpd import ssl import sys from mercurial import ( server, sslutil, ui as uimod, ) def log(msg): sys.stdout.write(msg) sys.stdout.flush() class dummysmtpserver(smtpd.SMTPServer): def __init__(self, localaddr): smtpd.SMTPServer.__init__(self, localaddr, remoteaddr=None) def process_message(self, peer, mailfrom, rcpttos, data): log('%s from=%s to=%s\n' % (peer[0], mailfrom, ', '.join(rcpttos))) class dummysmtpsecureserver(dummysmtpserver): def __init__(self, localaddr, certfile): dummysmtpserver.__init__(self, localaddr) self._certfile = certfile def handle_accept(self): pair = self.accept() if not pair: return conn, addr = pair ui = uimod.ui.load() try: # wrap_socket() would block, but we don't care conn = sslutil.wrapserversocket(conn, ui, certfile=self._certfile) except ssl.SSLError: log('%s ssl error\n' % addr[0]) conn.close() return smtpd.SMTPChannel(self, conn, addr) def run(): try: asyncore.loop() except KeyboardInterrupt: pass def main(): op = optparse.OptionParser() op.add_option('-d', '--daemon', action='store_true') op.add_option('--daemon-postexec', action='append') op.add_option('-p', '--port', type=int, default=8025) op.add_option('-a', '--address', default='localhost') op.add_option('--pid-file', metavar='FILE') op.add_option('--tls', choices=['none', 'smtps'], default='none') op.add_option('--certificate', metavar='FILE') opts, args = op.parse_args() if opts.tls == 'smtps' and not opts.certificate: op.error('--certificate must be specified') addr = (opts.address, opts.port) def init(): if opts.tls == 'none': dummysmtpserver(addr) else: dummysmtpsecureserver(addr, opts.certificate) log('listening at %s:%d\n' % addr) server.runservice(vars(opts), initfn=init, runfn=run, runargs=[sys.executable, __file__] + sys.argv[1:]) if __name__ == '__main__': main()