mercurial/commandserver.py
changeset 29544 024e8f82f3de
parent 29543 d74b8a4fde3b
child 29548 9da1adc18639
--- a/mercurial/commandserver.py	Sun May 22 12:49:22 2016 +0900
+++ b/mercurial/commandserver.py	Sun May 22 11:43:18 2016 +0900
@@ -11,6 +11,9 @@
 import gc
 import os
 import random
+import select
+import signal
+import socket
 import struct
 import sys
 import traceback
@@ -385,6 +388,41 @@
             # trigger __del__ since ForkingMixIn uses os._exit
             gc.collect()
 
+class unixservicehandler(object):
+    """Set of pluggable operations for unix-mode services
+
+    Almost all methods except for createcmdserver() are called in the main
+    process. You can't pass mutable resource back from createcmdserver().
+    """
+
+    pollinterval = None
+
+    def __init__(self, ui):
+        self.ui = ui
+
+    def bindsocket(self, sock, address):
+        util.bindunixsocket(sock, address)
+
+    def unlinksocket(self, address):
+        os.unlink(address)
+
+    def printbanner(self, address):
+        self.ui.status(_('listening at %s\n') % address)
+        self.ui.flush()  # avoid buffering of status message
+
+    def shouldexit(self):
+        """True if server should shut down; checked per pollinterval"""
+        return False
+
+    def newconnection(self):
+        """Called when main process notices new connection"""
+        pass
+
+    def createcmdserver(self, repo, conn, fin, fout):
+        """Create new command server instance; called in the process that
+        serves for the current connection"""
+        return server(self.ui, repo, fin, fout)
+
 class _requesthandler(socketserver.BaseRequestHandler):
     def handle(self):
         _serverequest(self.server.ui, self.server.repo, self.request,
@@ -424,9 +462,96 @@
         finally:
             self._cleanup()
 
+class unixforkingservice(unixservice):
+    def __init__(self, ui, repo, opts, handler=None):
+        super(unixforkingservice, self).__init__(ui, repo, opts)
+        self._servicehandler = handler or unixservicehandler(ui)
+        self._sock = None
+        self._oldsigchldhandler = None
+        self._workerpids = set()  # updated by signal handler; do not iterate
+
+    def init(self):
+        self._sock = socket.socket(socket.AF_UNIX)
+        self._servicehandler.bindsocket(self._sock, self.address)
+        self._sock.listen(5)
+        o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
+        self._oldsigchldhandler = o
+        self._servicehandler.printbanner(self.address)
+
+    def _cleanup(self):
+        signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
+        self._sock.close()
+        self._servicehandler.unlinksocket(self.address)
+        # don't kill child processes as they have active clients, just wait
+        self._reapworkers(0)
+
+    def run(self):
+        try:
+            self._mainloop()
+        finally:
+            self._cleanup()
+
+    def _mainloop(self):
+        h = self._servicehandler
+        while not h.shouldexit():
+            try:
+                ready = select.select([self._sock], [], [], h.pollinterval)[0]
+                if not ready:
+                    continue
+                conn, _addr = self._sock.accept()
+            except (select.error, socket.error) as inst:
+                if inst.args[0] == errno.EINTR:
+                    continue
+                raise
+
+            pid = os.fork()
+            if pid:
+                try:
+                    self.ui.debug('forked worker process (pid=%d)\n' % pid)
+                    self._workerpids.add(pid)
+                    h.newconnection()
+                finally:
+                    conn.close()  # release handle in parent process
+            else:
+                try:
+                    self._serveworker(conn)
+                    conn.close()
+                    os._exit(0)
+                except:  # never return, hence no re-raises
+                    try:
+                        self.ui.traceback(force=True)
+                    finally:
+                        os._exit(255)
+
+    def _sigchldhandler(self, signal, frame):
+        self._reapworkers(os.WNOHANG)
+
+    def _reapworkers(self, options):
+        while self._workerpids:
+            try:
+                pid, _status = os.waitpid(-1, options)
+            except OSError as inst:
+                if inst.errno == errno.EINTR:
+                    continue
+                if inst.errno != errno.ECHILD:
+                    raise
+                # no child processes at all (reaped by other waitpid()?)
+                self._workerpids.clear()
+                return
+            if pid == 0:
+                # no waitable child processes
+                return
+            self.ui.debug('worker process exited (pid=%d)\n' % pid)
+            self._workerpids.discard(pid)
+
+    def _serveworker(self, conn):
+        signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
+        h = self._servicehandler
+        _serverequest(self.ui, self.repo, conn, h.createcmdserver)
+
 _servicemap = {
     'pipe': pipeservice,
-    'unix': unixservice,
+    'unix': unixforkingservice,
     }
 
 def createservice(ui, repo, opts):