Mercurial > hg
changeset 29544:024e8f82f3de
commandserver: add new forking server implemented without using SocketServer
SocketServer.ForkingMixIn of Python 2.x has a couple of issues, such as:
- race condition that leads to 100% CPU usage (Python 2.6)
https://bugs.python.org/issue21491
- can't wait for children belonging to different process groups (Python 2.6)
- leaves at least one zombie process (Python 2.6, 2.7)
https://bugs.python.org/issue11109
The first two are critical because we do setpgid(0, 0) in child process to
isolate terminal signals. The last one isn't, but ForkingMixIn seems to be
doing silly. So there are two choices:
a) backport and maintain SocketServer until we can drop support for Python 2.x
b) replace SocketServer by simpler one and eliminate glue codes
I chose (b) because it's great time for getting rid of utterly complicated
SocketServer stuff, and preparing for future move towards prefork service.
New unixforkingservice is implemented loosely based on chg 531f8ef64be6. It
is monolithic but much simpler than SocketServer. unixservicehandler provides
customizing points for chg, and it will be shared with future prefork service.
Old unixservice class is still used by chgserver. It will be removed later.
Thanks to Jun Wu for investigating these issues.
author | Yuya Nishihara <yuya@tcha.org> |
---|---|
date | Sun, 22 May 2016 11:43:18 +0900 |
parents | d74b8a4fde3b |
children | 28aca3fafc2a |
files | mercurial/commandserver.py |
diffstat | 1 files changed, 126 insertions(+), 1 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/commandserver.py Sun May 22 12:49:22 2016 +0900 +++ b/mercurial/commandserver.py Sun May 22 11:43:18 2016 +0900 @@ -11,6 +11,9 @@ import gc import os import random +import select +import signal +import socket import struct import sys import traceback @@ -385,6 +388,41 @@ # trigger __del__ since ForkingMixIn uses os._exit gc.collect() +class unixservicehandler(object): + """Set of pluggable operations for unix-mode services + + Almost all methods except for createcmdserver() are called in the main + process. You can't pass mutable resource back from createcmdserver(). + """ + + pollinterval = None + + def __init__(self, ui): + self.ui = ui + + def bindsocket(self, sock, address): + util.bindunixsocket(sock, address) + + def unlinksocket(self, address): + os.unlink(address) + + def printbanner(self, address): + self.ui.status(_('listening at %s\n') % address) + self.ui.flush() # avoid buffering of status message + + def shouldexit(self): + """True if server should shut down; checked per pollinterval""" + return False + + def newconnection(self): + """Called when main process notices new connection""" + pass + + def createcmdserver(self, repo, conn, fin, fout): + """Create new command server instance; called in the process that + serves for the current connection""" + return server(self.ui, repo, fin, fout) + class _requesthandler(socketserver.BaseRequestHandler): def handle(self): _serverequest(self.server.ui, self.server.repo, self.request, @@ -424,9 +462,96 @@ finally: self._cleanup() +class unixforkingservice(unixservice): + def __init__(self, ui, repo, opts, handler=None): + super(unixforkingservice, self).__init__(ui, repo, opts) + self._servicehandler = handler or unixservicehandler(ui) + self._sock = None + self._oldsigchldhandler = None + self._workerpids = set() # updated by signal handler; do not iterate + + def init(self): + self._sock = socket.socket(socket.AF_UNIX) + self._servicehandler.bindsocket(self._sock, self.address) + self._sock.listen(5) + o = signal.signal(signal.SIGCHLD, self._sigchldhandler) + self._oldsigchldhandler = o + self._servicehandler.printbanner(self.address) + + def _cleanup(self): + signal.signal(signal.SIGCHLD, self._oldsigchldhandler) + self._sock.close() + self._servicehandler.unlinksocket(self.address) + # don't kill child processes as they have active clients, just wait + self._reapworkers(0) + + def run(self): + try: + self._mainloop() + finally: + self._cleanup() + + def _mainloop(self): + h = self._servicehandler + while not h.shouldexit(): + try: + ready = select.select([self._sock], [], [], h.pollinterval)[0] + if not ready: + continue + conn, _addr = self._sock.accept() + except (select.error, socket.error) as inst: + if inst.args[0] == errno.EINTR: + continue + raise + + pid = os.fork() + if pid: + try: + self.ui.debug('forked worker process (pid=%d)\n' % pid) + self._workerpids.add(pid) + h.newconnection() + finally: + conn.close() # release handle in parent process + else: + try: + self._serveworker(conn) + conn.close() + os._exit(0) + except: # never return, hence no re-raises + try: + self.ui.traceback(force=True) + finally: + os._exit(255) + + def _sigchldhandler(self, signal, frame): + self._reapworkers(os.WNOHANG) + + def _reapworkers(self, options): + while self._workerpids: + try: + pid, _status = os.waitpid(-1, options) + except OSError as inst: + if inst.errno == errno.EINTR: + continue + if inst.errno != errno.ECHILD: + raise + # no child processes at all (reaped by other waitpid()?) + self._workerpids.clear() + return + if pid == 0: + # no waitable child processes + return + self.ui.debug('worker process exited (pid=%d)\n' % pid) + self._workerpids.discard(pid) + + def _serveworker(self, conn): + signal.signal(signal.SIGCHLD, self._oldsigchldhandler) + h = self._servicehandler + _serverequest(self.ui, self.repo, conn, h.createcmdserver) + _servicemap = { 'pipe': pipeservice, - 'unix': unixservice, + 'unix': unixforkingservice, } def createservice(ui, repo, opts):