commandserver: add new forking server implemented without using SocketServer
authorYuya Nishihara <yuya@tcha.org>
Sun, 22 May 2016 11:43:18 +0900
changeset 29544 024e8f82f3de
parent 29543 d74b8a4fde3b
child 29545 28aca3fafc2a
commandserver: add new forking server implemented without using SocketServer SocketServer.ForkingMixIn of Python 2.x has a couple of issues, such as: - race condition that leads to 100% CPU usage (Python 2.6) https://bugs.python.org/issue21491 - can't wait for children belonging to different process groups (Python 2.6) - leaves at least one zombie process (Python 2.6, 2.7) https://bugs.python.org/issue11109 The first two are critical because we do setpgid(0, 0) in child process to isolate terminal signals. The last one isn't, but ForkingMixIn seems to be doing silly. So there are two choices: a) backport and maintain SocketServer until we can drop support for Python 2.x b) replace SocketServer by simpler one and eliminate glue codes I chose (b) because it's great time for getting rid of utterly complicated SocketServer stuff, and preparing for future move towards prefork service. New unixforkingservice is implemented loosely based on chg 531f8ef64be6. It is monolithic but much simpler than SocketServer. unixservicehandler provides customizing points for chg, and it will be shared with future prefork service. Old unixservice class is still used by chgserver. It will be removed later. Thanks to Jun Wu for investigating these issues.
mercurial/commandserver.py
--- a/mercurial/commandserver.py	Sun May 22 12:49:22 2016 +0900
+++ b/mercurial/commandserver.py	Sun May 22 11:43:18 2016 +0900
@@ -11,6 +11,9 @@
 import gc
 import os
 import random
+import select
+import signal
+import socket
 import struct
 import sys
 import traceback
@@ -385,6 +388,41 @@
             # trigger __del__ since ForkingMixIn uses os._exit
             gc.collect()
 
+class unixservicehandler(object):
+    """Set of pluggable operations for unix-mode services
+
+    Almost all methods except for createcmdserver() are called in the main
+    process. You can't pass mutable resource back from createcmdserver().
+    """
+
+    pollinterval = None
+
+    def __init__(self, ui):
+        self.ui = ui
+
+    def bindsocket(self, sock, address):
+        util.bindunixsocket(sock, address)
+
+    def unlinksocket(self, address):
+        os.unlink(address)
+
+    def printbanner(self, address):
+        self.ui.status(_('listening at %s\n') % address)
+        self.ui.flush()  # avoid buffering of status message
+
+    def shouldexit(self):
+        """True if server should shut down; checked per pollinterval"""
+        return False
+
+    def newconnection(self):
+        """Called when main process notices new connection"""
+        pass
+
+    def createcmdserver(self, repo, conn, fin, fout):
+        """Create new command server instance; called in the process that
+        serves for the current connection"""
+        return server(self.ui, repo, fin, fout)
+
 class _requesthandler(socketserver.BaseRequestHandler):
     def handle(self):
         _serverequest(self.server.ui, self.server.repo, self.request,
@@ -424,9 +462,96 @@
         finally:
             self._cleanup()
 
+class unixforkingservice(unixservice):
+    def __init__(self, ui, repo, opts, handler=None):
+        super(unixforkingservice, self).__init__(ui, repo, opts)
+        self._servicehandler = handler or unixservicehandler(ui)
+        self._sock = None
+        self._oldsigchldhandler = None
+        self._workerpids = set()  # updated by signal handler; do not iterate
+
+    def init(self):
+        self._sock = socket.socket(socket.AF_UNIX)
+        self._servicehandler.bindsocket(self._sock, self.address)
+        self._sock.listen(5)
+        o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
+        self._oldsigchldhandler = o
+        self._servicehandler.printbanner(self.address)
+
+    def _cleanup(self):
+        signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
+        self._sock.close()
+        self._servicehandler.unlinksocket(self.address)
+        # don't kill child processes as they have active clients, just wait
+        self._reapworkers(0)
+
+    def run(self):
+        try:
+            self._mainloop()
+        finally:
+            self._cleanup()
+
+    def _mainloop(self):
+        h = self._servicehandler
+        while not h.shouldexit():
+            try:
+                ready = select.select([self._sock], [], [], h.pollinterval)[0]
+                if not ready:
+                    continue
+                conn, _addr = self._sock.accept()
+            except (select.error, socket.error) as inst:
+                if inst.args[0] == errno.EINTR:
+                    continue
+                raise
+
+            pid = os.fork()
+            if pid:
+                try:
+                    self.ui.debug('forked worker process (pid=%d)\n' % pid)
+                    self._workerpids.add(pid)
+                    h.newconnection()
+                finally:
+                    conn.close()  # release handle in parent process
+            else:
+                try:
+                    self._serveworker(conn)
+                    conn.close()
+                    os._exit(0)
+                except:  # never return, hence no re-raises
+                    try:
+                        self.ui.traceback(force=True)
+                    finally:
+                        os._exit(255)
+
+    def _sigchldhandler(self, signal, frame):
+        self._reapworkers(os.WNOHANG)
+
+    def _reapworkers(self, options):
+        while self._workerpids:
+            try:
+                pid, _status = os.waitpid(-1, options)
+            except OSError as inst:
+                if inst.errno == errno.EINTR:
+                    continue
+                if inst.errno != errno.ECHILD:
+                    raise
+                # no child processes at all (reaped by other waitpid()?)
+                self._workerpids.clear()
+                return
+            if pid == 0:
+                # no waitable child processes
+                return
+            self.ui.debug('worker process exited (pid=%d)\n' % pid)
+            self._workerpids.discard(pid)
+
+    def _serveworker(self, conn):
+        signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
+        h = self._servicehandler
+        _serverequest(self.ui, self.repo, conn, h.createcmdserver)
+
 _servicemap = {
     'pipe': pipeservice,
-    'unix': unixservice,
+    'unix': unixforkingservice,
     }
 
 def createservice(ui, repo, opts):