--- a/mercurial/commandserver.py Sun May 22 12:49:22 2016 +0900
+++ b/mercurial/commandserver.py Sun May 22 11:43:18 2016 +0900
@@ -11,6 +11,9 @@
import gc
import os
import random
+import select
+import signal
+import socket
import struct
import sys
import traceback
@@ -385,6 +388,41 @@
# trigger __del__ since ForkingMixIn uses os._exit
gc.collect()
+class unixservicehandler(object):
+ """Set of pluggable operations for unix-mode services
+
+ Almost all methods except for createcmdserver() are called in the main
+ process. You can't pass mutable resource back from createcmdserver().
+ """
+
+ pollinterval = None
+
+ def __init__(self, ui):
+ self.ui = ui
+
+ def bindsocket(self, sock, address):
+ util.bindunixsocket(sock, address)
+
+ def unlinksocket(self, address):
+ os.unlink(address)
+
+ def printbanner(self, address):
+ self.ui.status(_('listening at %s\n') % address)
+ self.ui.flush() # avoid buffering of status message
+
+ def shouldexit(self):
+ """True if server should shut down; checked per pollinterval"""
+ return False
+
+ def newconnection(self):
+ """Called when main process notices new connection"""
+ pass
+
+ def createcmdserver(self, repo, conn, fin, fout):
+ """Create new command server instance; called in the process that
+ serves for the current connection"""
+ return server(self.ui, repo, fin, fout)
+
class _requesthandler(socketserver.BaseRequestHandler):
def handle(self):
_serverequest(self.server.ui, self.server.repo, self.request,
@@ -424,9 +462,96 @@
finally:
self._cleanup()
+class unixforkingservice(unixservice):
+ def __init__(self, ui, repo, opts, handler=None):
+ super(unixforkingservice, self).__init__(ui, repo, opts)
+ self._servicehandler = handler or unixservicehandler(ui)
+ self._sock = None
+ self._oldsigchldhandler = None
+ self._workerpids = set() # updated by signal handler; do not iterate
+
+ def init(self):
+ self._sock = socket.socket(socket.AF_UNIX)
+ self._servicehandler.bindsocket(self._sock, self.address)
+ self._sock.listen(5)
+ o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
+ self._oldsigchldhandler = o
+ self._servicehandler.printbanner(self.address)
+
+ def _cleanup(self):
+ signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
+ self._sock.close()
+ self._servicehandler.unlinksocket(self.address)
+ # don't kill child processes as they have active clients, just wait
+ self._reapworkers(0)
+
+ def run(self):
+ try:
+ self._mainloop()
+ finally:
+ self._cleanup()
+
+ def _mainloop(self):
+ h = self._servicehandler
+ while not h.shouldexit():
+ try:
+ ready = select.select([self._sock], [], [], h.pollinterval)[0]
+ if not ready:
+ continue
+ conn, _addr = self._sock.accept()
+ except (select.error, socket.error) as inst:
+ if inst.args[0] == errno.EINTR:
+ continue
+ raise
+
+ pid = os.fork()
+ if pid:
+ try:
+ self.ui.debug('forked worker process (pid=%d)\n' % pid)
+ self._workerpids.add(pid)
+ h.newconnection()
+ finally:
+ conn.close() # release handle in parent process
+ else:
+ try:
+ self._serveworker(conn)
+ conn.close()
+ os._exit(0)
+ except: # never return, hence no re-raises
+ try:
+ self.ui.traceback(force=True)
+ finally:
+ os._exit(255)
+
+ def _sigchldhandler(self, signal, frame):
+ self._reapworkers(os.WNOHANG)
+
+ def _reapworkers(self, options):
+ while self._workerpids:
+ try:
+ pid, _status = os.waitpid(-1, options)
+ except OSError as inst:
+ if inst.errno == errno.EINTR:
+ continue
+ if inst.errno != errno.ECHILD:
+ raise
+ # no child processes at all (reaped by other waitpid()?)
+ self._workerpids.clear()
+ return
+ if pid == 0:
+ # no waitable child processes
+ return
+ self.ui.debug('worker process exited (pid=%d)\n' % pid)
+ self._workerpids.discard(pid)
+
+ def _serveworker(self, conn):
+ signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
+ h = self._servicehandler
+ _serverequest(self.ui, self.repo, conn, h.createcmdserver)
+
_servicemap = {
'pipe': pipeservice,
- 'unix': unixservice,
+ 'unix': unixforkingservice,
}
def createservice(ui, repo, opts):