# HG changeset patch # User Yuya Nishihara # Date 1463884998 -32400 # Node ID 024e8f82f3de876496997138299b078211c1c54d # Parent d74b8a4fde3b6980b8476217940f0c2709026e18 commandserver: add new forking server implemented without using SocketServer SocketServer.ForkingMixIn of Python 2.x has a couple of issues, such as: - race condition that leads to 100% CPU usage (Python 2.6) https://bugs.python.org/issue21491 - can't wait for children belonging to different process groups (Python 2.6) - leaves at least one zombie process (Python 2.6, 2.7) https://bugs.python.org/issue11109 The first two are critical because we do setpgid(0, 0) in child process to isolate terminal signals. The last one isn't, but ForkingMixIn seems to be doing silly. So there are two choices: a) backport and maintain SocketServer until we can drop support for Python 2.x b) replace SocketServer by simpler one and eliminate glue codes I chose (b) because it's great time for getting rid of utterly complicated SocketServer stuff, and preparing for future move towards prefork service. New unixforkingservice is implemented loosely based on chg 531f8ef64be6. It is monolithic but much simpler than SocketServer. unixservicehandler provides customizing points for chg, and it will be shared with future prefork service. Old unixservice class is still used by chgserver. It will be removed later. Thanks to Jun Wu for investigating these issues. diff -r d74b8a4fde3b -r 024e8f82f3de mercurial/commandserver.py --- a/mercurial/commandserver.py Sun May 22 12:49:22 2016 +0900 +++ b/mercurial/commandserver.py Sun May 22 11:43:18 2016 +0900 @@ -11,6 +11,9 @@ import gc import os import random +import select +import signal +import socket import struct import sys import traceback @@ -385,6 +388,41 @@ # trigger __del__ since ForkingMixIn uses os._exit gc.collect() +class unixservicehandler(object): + """Set of pluggable operations for unix-mode services + + Almost all methods except for createcmdserver() are called in the main + process. You can't pass mutable resource back from createcmdserver(). + """ + + pollinterval = None + + def __init__(self, ui): + self.ui = ui + + def bindsocket(self, sock, address): + util.bindunixsocket(sock, address) + + def unlinksocket(self, address): + os.unlink(address) + + def printbanner(self, address): + self.ui.status(_('listening at %s\n') % address) + self.ui.flush() # avoid buffering of status message + + def shouldexit(self): + """True if server should shut down; checked per pollinterval""" + return False + + def newconnection(self): + """Called when main process notices new connection""" + pass + + def createcmdserver(self, repo, conn, fin, fout): + """Create new command server instance; called in the process that + serves for the current connection""" + return server(self.ui, repo, fin, fout) + class _requesthandler(socketserver.BaseRequestHandler): def handle(self): _serverequest(self.server.ui, self.server.repo, self.request, @@ -424,9 +462,96 @@ finally: self._cleanup() +class unixforkingservice(unixservice): + def __init__(self, ui, repo, opts, handler=None): + super(unixforkingservice, self).__init__(ui, repo, opts) + self._servicehandler = handler or unixservicehandler(ui) + self._sock = None + self._oldsigchldhandler = None + self._workerpids = set() # updated by signal handler; do not iterate + + def init(self): + self._sock = socket.socket(socket.AF_UNIX) + self._servicehandler.bindsocket(self._sock, self.address) + self._sock.listen(5) + o = signal.signal(signal.SIGCHLD, self._sigchldhandler) + self._oldsigchldhandler = o + self._servicehandler.printbanner(self.address) + + def _cleanup(self): + signal.signal(signal.SIGCHLD, self._oldsigchldhandler) + self._sock.close() + self._servicehandler.unlinksocket(self.address) + # don't kill child processes as they have active clients, just wait + self._reapworkers(0) + + def run(self): + try: + self._mainloop() + finally: + self._cleanup() + + def _mainloop(self): + h = self._servicehandler + while not h.shouldexit(): + try: + ready = select.select([self._sock], [], [], h.pollinterval)[0] + if not ready: + continue + conn, _addr = self._sock.accept() + except (select.error, socket.error) as inst: + if inst.args[0] == errno.EINTR: + continue + raise + + pid = os.fork() + if pid: + try: + self.ui.debug('forked worker process (pid=%d)\n' % pid) + self._workerpids.add(pid) + h.newconnection() + finally: + conn.close() # release handle in parent process + else: + try: + self._serveworker(conn) + conn.close() + os._exit(0) + except: # never return, hence no re-raises + try: + self.ui.traceback(force=True) + finally: + os._exit(255) + + def _sigchldhandler(self, signal, frame): + self._reapworkers(os.WNOHANG) + + def _reapworkers(self, options): + while self._workerpids: + try: + pid, _status = os.waitpid(-1, options) + except OSError as inst: + if inst.errno == errno.EINTR: + continue + if inst.errno != errno.ECHILD: + raise + # no child processes at all (reaped by other waitpid()?) + self._workerpids.clear() + return + if pid == 0: + # no waitable child processes + return + self.ui.debug('worker process exited (pid=%d)\n' % pid) + self._workerpids.discard(pid) + + def _serveworker(self, conn): + signal.signal(signal.SIGCHLD, self._oldsigchldhandler) + h = self._servicehandler + _serverequest(self.ui, self.repo, conn, h.createcmdserver) + _servicemap = { 'pipe': pipeservice, - 'unix': unixservice, + 'unix': unixforkingservice, } def createservice(ui, repo, opts):