changeset 40998:042ed354b9eb

commandserver: add IPC channel to teach repository path on command finished The idea is to load recently-used repositories first in the master process, and fork(). The forked worker can reuse a warm repository if it's preloaded. There are a couple of ways of in-memory repository caching. They have pros and cons: a. "preload by master" pros: can use a single cache dict, maximizing cache hit rate cons: need to reload a repo in master process (because worker process dies per command) b. "prefork" pros: can cache a repo without reloading (as worker processes persist) cons: lower cache hit rate since each worker has to maintain its own cache c. "shared memory" (or separate key-value store server) pros: no need to reload a repo in master process, ideally cons: need to serialize objects to sharable form Since my primary goal is to get rid of the cost of loading obsstore without massive rewrites, (c) doesn't work. (b) isn't ideal since it would require much more SDRAMs than (a). So I take (a). The idea credits to Jun Wu.
author Yuya Nishihara <yuya@tcha.org>
date Wed, 31 Oct 2018 22:19:03 +0900
parents 038108a9811c
children dcac24ec935b
files mercurial/commandserver.py tests/test-chg.t
diffstat 2 files changed, 42 insertions(+), 2 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/commandserver.py	Thu Dec 13 23:20:28 2018 -0800
+++ b/mercurial/commandserver.py	Wed Oct 31 22:19:03 2018 +0900
@@ -506,12 +506,19 @@
             raise error.Abort(_('no socket path specified with --address'))
         self._servicehandler = handler or unixservicehandler(ui)
         self._sock = None
+        self._mainipc = None
+        self._workeripc = None
         self._oldsigchldhandler = None
         self._workerpids = set()  # updated by signal handler; do not iterate
         self._socketunlinked = None
 
     def init(self):
         self._sock = socket.socket(socket.AF_UNIX)
+        # IPC channel from many workers to one main process; this is actually
+        # a uni-directional pipe, but is backed by a DGRAM socket so each
+        # message can be easily separated.
+        o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
+        self._mainipc, self._workeripc = o
         self._servicehandler.bindsocket(self._sock, self.address)
         if util.safehasattr(procutil, 'unblocksignal'):
             procutil.unblocksignal(signal.SIGCHLD)
@@ -527,6 +534,8 @@
     def _cleanup(self):
         signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
         self._sock.close()
+        self._mainipc.close()
+        self._workeripc.close()
         self._unlinksocket()
         # don't kill child processes as they have active clients, just wait
         self._reapworkers(0)
@@ -543,6 +552,8 @@
         selector = selectors.DefaultSelector()
         selector.register(self._sock, selectors.EVENT_READ,
                           self._acceptnewconnection)
+        selector.register(self._mainipc, selectors.EVENT_READ,
+                          self._handlemainipc)
         while True:
             if not exiting and h.shouldexit():
                 # clients can no longer connect() to the domain socket, so
@@ -592,8 +603,10 @@
             try:
                 selector.close()
                 sock.close()
+                self._mainipc.close()
                 self._runworker(conn)
                 conn.close()
+                self._workeripc.close()
                 os._exit(0)
             except:  # never return, hence no re-raises
                 try:
@@ -601,6 +614,17 @@
                 finally:
                     os._exit(255)
 
+    def _handlemainipc(self, sock, selector):
+        """Process messages sent from a worker"""
+        try:
+            path = sock.recv(32768)  # large enough to receive path
+        except socket.error as inst:
+            if inst.args[0] == errno.EINTR:
+                return
+            raise
+
+        self.ui.log(b'cmdserver', b'repository: %s\n', path)
+
     def _sigchldhandler(self, signal, frame):
         self._reapworkers(os.WNOHANG)
 
@@ -628,6 +652,22 @@
         h = self._servicehandler
         try:
             _serverequest(self.ui, self.repo, conn, h.createcmdserver,
-                          prereposetups=None)  # TODO: pass in hook functions
+                          prereposetups=[self._reposetup])
         finally:
             gc.collect()  # trigger __del__ since worker process uses os._exit
+
+    def _reposetup(self, ui, repo):
+        if not repo.local():
+            return
+
+        class unixcmdserverrepo(repo.__class__):
+            def close(self):
+                super(unixcmdserverrepo, self).close()
+                try:
+                    self._cmdserveripc.send(self.root)
+                except socket.error:
+                    self.ui.log(b'cmdserver',
+                                b'failed to send repo root to master\n')
+
+        repo.__class__ = unixcmdserverrepo
+        repo._cmdserveripc = self._workeripc
--- a/tests/test-chg.t	Thu Dec 13 23:20:28 2018 -0800
+++ b/tests/test-chg.t	Wed Oct 31 22:19:03 2018 +0900
@@ -230,7 +230,6 @@
 preserved:
 
   $ cat log/server.log.1 log/server.log | tail -10 | filterlog
-  YYYY/MM/DD HH:MM:SS (PID)> forked worker process (pid=...)
   YYYY/MM/DD HH:MM:SS (PID)> setprocname: ...
   YYYY/MM/DD HH:MM:SS (PID)> received fds: ...
   YYYY/MM/DD HH:MM:SS (PID)> chdir to '$TESTTMP/extreload'
@@ -238,5 +237,6 @@
   YYYY/MM/DD HH:MM:SS (PID)> setenv: ...
   YYYY/MM/DD HH:MM:SS (PID)> confighash = ... mtimehash = ...
   YYYY/MM/DD HH:MM:SS (PID)> validate: []
+  YYYY/MM/DD HH:MM:SS (PID)> repository: $TESTTMP/extreload
   YYYY/MM/DD HH:MM:SS (PID)> worker process exited (pid=...)
   YYYY/MM/DD HH:MM:SS (PID)> $TESTTMP/extreload/chgsock/server-... is not owned, exiting.