Mercurial > hg
changeset 40998:042ed354b9eb
commandserver: add IPC channel to teach repository path on command finished
The idea is to load recently-used repositories first in the master process,
and fork(). The forked worker can reuse a warm repository if it's preloaded.
There are a couple of ways of in-memory repository caching. They have pros
and cons:
a. "preload by master"
pros: can use a single cache dict, maximizing cache hit rate
cons: need to reload a repo in master process (because worker process
dies per command)
b. "prefork"
pros: can cache a repo without reloading (as worker processes persist)
cons: lower cache hit rate since each worker has to maintain its own cache
c. "shared memory" (or separate key-value store server)
pros: no need to reload a repo in master process, ideally
cons: need to serialize objects to sharable form
Since my primary goal is to get rid of the cost of loading obsstore without
massive rewrites, (c) doesn't work. (b) isn't ideal since it would require
much more SDRAMs than (a). So I take (a).
The idea credits to Jun Wu.
author | Yuya Nishihara <yuya@tcha.org> |
---|---|
date | Wed, 31 Oct 2018 22:19:03 +0900 |
parents | 038108a9811c |
children | dcac24ec935b |
files | mercurial/commandserver.py tests/test-chg.t |
diffstat | 2 files changed, 42 insertions(+), 2 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/commandserver.py Thu Dec 13 23:20:28 2018 -0800 +++ b/mercurial/commandserver.py Wed Oct 31 22:19:03 2018 +0900 @@ -506,12 +506,19 @@ raise error.Abort(_('no socket path specified with --address')) self._servicehandler = handler or unixservicehandler(ui) self._sock = None + self._mainipc = None + self._workeripc = None self._oldsigchldhandler = None self._workerpids = set() # updated by signal handler; do not iterate self._socketunlinked = None def init(self): self._sock = socket.socket(socket.AF_UNIX) + # IPC channel from many workers to one main process; this is actually + # a uni-directional pipe, but is backed by a DGRAM socket so each + # message can be easily separated. + o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM) + self._mainipc, self._workeripc = o self._servicehandler.bindsocket(self._sock, self.address) if util.safehasattr(procutil, 'unblocksignal'): procutil.unblocksignal(signal.SIGCHLD) @@ -527,6 +534,8 @@ def _cleanup(self): signal.signal(signal.SIGCHLD, self._oldsigchldhandler) self._sock.close() + self._mainipc.close() + self._workeripc.close() self._unlinksocket() # don't kill child processes as they have active clients, just wait self._reapworkers(0) @@ -543,6 +552,8 @@ selector = selectors.DefaultSelector() selector.register(self._sock, selectors.EVENT_READ, self._acceptnewconnection) + selector.register(self._mainipc, selectors.EVENT_READ, + self._handlemainipc) while True: if not exiting and h.shouldexit(): # clients can no longer connect() to the domain socket, so @@ -592,8 +603,10 @@ try: selector.close() sock.close() + self._mainipc.close() self._runworker(conn) conn.close() + self._workeripc.close() os._exit(0) except: # never return, hence no re-raises try: @@ -601,6 +614,17 @@ finally: os._exit(255) + def _handlemainipc(self, sock, selector): + """Process messages sent from a worker""" + try: + path = sock.recv(32768) # large enough to receive path + except socket.error as inst: + if inst.args[0] == errno.EINTR: + return + raise + + self.ui.log(b'cmdserver', b'repository: %s\n', path) + def _sigchldhandler(self, signal, frame): self._reapworkers(os.WNOHANG) @@ -628,6 +652,22 @@ h = self._servicehandler try: _serverequest(self.ui, self.repo, conn, h.createcmdserver, - prereposetups=None) # TODO: pass in hook functions + prereposetups=[self._reposetup]) finally: gc.collect() # trigger __del__ since worker process uses os._exit + + def _reposetup(self, ui, repo): + if not repo.local(): + return + + class unixcmdserverrepo(repo.__class__): + def close(self): + super(unixcmdserverrepo, self).close() + try: + self._cmdserveripc.send(self.root) + except socket.error: + self.ui.log(b'cmdserver', + b'failed to send repo root to master\n') + + repo.__class__ = unixcmdserverrepo + repo._cmdserveripc = self._workeripc
--- a/tests/test-chg.t Thu Dec 13 23:20:28 2018 -0800 +++ b/tests/test-chg.t Wed Oct 31 22:19:03 2018 +0900 @@ -230,7 +230,6 @@ preserved: $ cat log/server.log.1 log/server.log | tail -10 | filterlog - YYYY/MM/DD HH:MM:SS (PID)> forked worker process (pid=...) YYYY/MM/DD HH:MM:SS (PID)> setprocname: ... YYYY/MM/DD HH:MM:SS (PID)> received fds: ... YYYY/MM/DD HH:MM:SS (PID)> chdir to '$TESTTMP/extreload' @@ -238,5 +237,6 @@ YYYY/MM/DD HH:MM:SS (PID)> setenv: ... YYYY/MM/DD HH:MM:SS (PID)> confighash = ... mtimehash = ... YYYY/MM/DD HH:MM:SS (PID)> validate: [] + YYYY/MM/DD HH:MM:SS (PID)> repository: $TESTTMP/extreload YYYY/MM/DD HH:MM:SS (PID)> worker process exited (pid=...) YYYY/MM/DD HH:MM:SS (PID)> $TESTTMP/extreload/chgsock/server-... is not owned, exiting.