changeset 41009:dcac24ec935b

commandserver: preload repository in master server and reuse its file cache This greatly speeds up repository operation with lots of obsolete markers: $ ls -lh .hg/store/obsstore -rw-r--r-- 1 yuya yuya 21M Dec 2 17:55 .hg/store/obsstore $ time hg log -G -l10 --pager no (hg) 1.79s user 0.13s system 99% cpu 1.919 total (chg uncached) 0.00s user 0.01s system 0% cpu 1.328 total (chg cached) 0.00s user 0.00s system 3% cpu 0.180 total As you can see, the implementation of the preloader function is highly experimental. It works, but I'm yet to be sure how things can be organized. So I don't want to formalize the API at this point.
author Yuya Nishihara <yuya@tcha.org>
date Wed, 31 Oct 2018 22:43:08 +0900
parents 042ed354b9eb
children b6c610bf567e
files mercurial/commandserver.py mercurial/configitems.py mercurial/repocache.py tests/test-chg.t
diffstat 4 files changed, 244 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/commandserver.py	Wed Oct 31 22:19:03 2018 +0900
+++ b/mercurial/commandserver.py	Wed Oct 31 22:43:08 2018 +0900
@@ -28,6 +28,7 @@
     error,
     loggingutil,
     pycompat,
+    repocache,
     util,
     vfs as vfsmod,
 )
@@ -511,6 +512,11 @@
         self._oldsigchldhandler = None
         self._workerpids = set()  # updated by signal handler; do not iterate
         self._socketunlinked = None
+        # experimental config: cmdserver.max-repo-cache
+        maxlen = ui.configint(b'cmdserver', b'max-repo-cache')
+        if maxlen < 0:
+            raise error.Abort(_('negative max-repo-cache size not allowed'))
+        self._repoloader = repocache.repoloader(ui, maxlen)
 
     def init(self):
         self._sock = socket.socket(socket.AF_UNIX)
@@ -525,6 +531,7 @@
         o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
         self._oldsigchldhandler = o
         self._socketunlinked = False
+        self._repoloader.start()
 
     def _unlinksocket(self):
         if not self._socketunlinked:
@@ -537,6 +544,7 @@
         self._mainipc.close()
         self._workeripc.close()
         self._unlinksocket()
+        self._repoloader.stop()
         # don't kill child processes as they have active clients, just wait
         self._reapworkers(0)
 
@@ -590,6 +598,10 @@
                 return
             raise
 
+        # Future improvement: On Python 3.7, maybe gc.freeze() can be used
+        # to prevent COW memory from being touched by GC.
+        # https://instagram-engineering.com/
+        #   copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
         pid = os.fork()
         if pid:
             try:
@@ -622,8 +634,7 @@
             if inst.args[0] == errno.EINTR:
                 return
             raise
-
-        self.ui.log(b'cmdserver', b'repository: %s\n', path)
+        self._repoloader.load(path)
 
     def _sigchldhandler(self, signal, frame):
         self._reapworkers(os.WNOHANG)
@@ -671,3 +682,9 @@
 
         repo.__class__ = unixcmdserverrepo
         repo._cmdserveripc = self._workeripc
+
+        cachedrepo = self._repoloader.get(repo.root)
+        if cachedrepo is None:
+            return
+        repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root)
+        repocache.copycache(cachedrepo, repo)
--- a/mercurial/configitems.py	Wed Oct 31 22:19:03 2018 +0900
+++ b/mercurial/configitems.py	Wed Oct 31 22:43:08 2018 +0900
@@ -179,11 +179,14 @@
 coreconfigitem('cmdserver', 'max-log-size',
     default='1 MB',
 )
+coreconfigitem('cmdserver', 'max-repo-cache',
+    default=0,
+)
 coreconfigitem('cmdserver', 'message-encodings',
     default=list,
 )
 coreconfigitem('cmdserver', 'track-log',
-    default=lambda: ['chgserver', 'cmdserver'],
+    default=lambda: ['chgserver', 'cmdserver', 'repocache'],
 )
 coreconfigitem('color', '.*',
     default=None,
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mercurial/repocache.py	Wed Oct 31 22:43:08 2018 +0900
@@ -0,0 +1,131 @@
+# repocache.py - in-memory repository cache for long-running services
+#
+# Copyright 2018 Yuya Nishihara <yuya@tcha.org>
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+from __future__ import absolute_import
+
+import collections
+import gc
+import threading
+
+from . import (
+    error,
+    hg,
+    obsolete,
+    scmutil,
+    util,
+)
+
+class repoloader(object):
+    """Load repositories in background thread
+
+    This is designed for a forking server. A cached repo cannot be obtained
+    until the server fork()s a worker and the loader thread stops.
+    """
+
+    def __init__(self, ui, maxlen):
+        self._ui = ui.copy()
+        self._cache = util.lrucachedict(max=maxlen)
+        # use deque and Event instead of Queue since deque can discard
+        # old items to keep at most maxlen items.
+        self._inqueue = collections.deque(maxlen=maxlen)
+        self._accepting = False
+        self._newentry = threading.Event()
+        self._thread = None
+
+    def start(self):
+        assert not self._thread
+        if self._inqueue.maxlen == 0:
+            # no need to spawn loader thread as the cache is disabled
+            return
+        self._accepting = True
+        self._thread = threading.Thread(target=self._mainloop)
+        self._thread.start()
+
+    def stop(self):
+        if not self._thread:
+            return
+        self._accepting = False
+        self._newentry.set()
+        self._thread.join()
+        self._thread = None
+        self._cache.clear()
+        self._inqueue.clear()
+
+    def load(self, path):
+        """Request to load the specified repository in background"""
+        self._inqueue.append(path)
+        self._newentry.set()
+
+    def get(self, path):
+        """Return a cached repo if available
+
+        This function must be called after fork(), where the loader thread
+        is stopped. Otherwise, the returned repo might be updated by the
+        loader thread.
+        """
+        if self._thread and self._thread.is_alive():
+            raise error.ProgrammingError(b'cannot obtain cached repo while '
+                                         b'loader is active')
+        return self._cache.peek(path, None)
+
+    def _mainloop(self):
+        while self._accepting:
+            # Avoid heavy GC after fork(), which would cancel the benefit of
+            # COW. We assume that GIL is acquired while GC is underway in the
+            # loader thread. If that isn't true, we might have to move
+            # gc.collect() to the main thread so that fork() would never stop
+            # the thread where GC is in progress.
+            gc.collect()
+
+            self._newentry.wait()
+            while self._accepting:
+                self._newentry.clear()
+                try:
+                    path = self._inqueue.popleft()
+                except IndexError:
+                    break
+                scmutil.callcatch(self._ui, lambda: self._load(path))
+
+    def _load(self, path):
+        start = util.timer()
+        # TODO: repo should be recreated if storage configuration changed
+        try:
+            # pop before loading so inconsistent state wouldn't be exposed
+            repo = self._cache.pop(path)
+        except KeyError:
+            repo = hg.repository(self._ui, path).unfiltered()
+        _warmupcache(repo)
+        repo.ui.log(b'repocache', b'loaded repo into cache: %s (in %.3fs)\n',
+                    path, util.timer() - start)
+        self._cache.insert(path, repo)
+
+# TODO: think about proper API of preloading cache
+def _warmupcache(repo):
+    repo.invalidateall()
+    repo.changelog
+    repo.obsstore._all
+    repo.obsstore.successors
+    repo.obsstore.predecessors
+    repo.obsstore.children
+    for name in obsolete.cachefuncs:
+        obsolete.getrevs(repo, name)
+    repo._phasecache.loadphaserevs(repo)
+
+# TODO: think about proper API of attaching preloaded attributes
+def copycache(srcrepo, destrepo):
+    """Copy cached attributes from srcrepo to destrepo"""
+    destfilecache = destrepo._filecache
+    srcfilecache = srcrepo._filecache
+    if 'changelog' in srcfilecache:
+        destfilecache['changelog'] = ce = srcfilecache['changelog']
+        ce.obj.opener = ce.obj._realopener = destrepo.svfs
+    if 'obsstore' in srcfilecache:
+        destfilecache['obsstore'] = ce = srcfilecache['obsstore']
+        ce.obj.svfs = destrepo.svfs
+    if '_phasecache' in srcfilecache:
+        destfilecache['_phasecache'] = ce = srcfilecache['_phasecache']
+        ce.obj.opener = destrepo.svfs
--- a/tests/test-chg.t	Wed Oct 31 22:19:03 2018 +0900
+++ b/tests/test-chg.t	Wed Oct 31 22:43:08 2018 +0900
@@ -1,6 +1,7 @@
 #require chg
 
   $ mkdir log
+  $ cp $HGRCPATH $HGRCPATH.unconfigured
   $ cat <<'EOF' >> $HGRCPATH
   > [cmdserver]
   > log = $TESTTMP/log/server.log
@@ -13,6 +14,7 @@
   >   sed -e 's!^[0-9/]* [0-9:]* ([0-9]*)>!YYYY/MM/DD HH:MM:SS (PID)>!' \
   >       -e 's!\(setprocname\|received fds\|setenv\): .*!\1: ...!' \
   >       -e 's!\(confighash\|mtimehash\) = [0-9a-f]*!\1 = ...!g' \
+  >       -e 's!\(in \)[0-9.]*s\b!\1 ...s!g' \
   >       -e 's!\(pid\)=[0-9]*!\1=...!g' \
   >       -e 's!\(/server-\)[0-9a-f]*!\1...!g'
   > }
@@ -230,6 +232,7 @@
 preserved:
 
   $ cat log/server.log.1 log/server.log | tail -10 | filterlog
+  YYYY/MM/DD HH:MM:SS (PID)> forked worker process (pid=...)
   YYYY/MM/DD HH:MM:SS (PID)> setprocname: ...
   YYYY/MM/DD HH:MM:SS (PID)> received fds: ...
   YYYY/MM/DD HH:MM:SS (PID)> chdir to '$TESTTMP/extreload'
@@ -237,6 +240,92 @@
   YYYY/MM/DD HH:MM:SS (PID)> setenv: ...
   YYYY/MM/DD HH:MM:SS (PID)> confighash = ... mtimehash = ...
   YYYY/MM/DD HH:MM:SS (PID)> validate: []
-  YYYY/MM/DD HH:MM:SS (PID)> repository: $TESTTMP/extreload
   YYYY/MM/DD HH:MM:SS (PID)> worker process exited (pid=...)
   YYYY/MM/DD HH:MM:SS (PID)> $TESTTMP/extreload/chgsock/server-... is not owned, exiting.
+
+repository cache
+----------------
+
+  $ rm log/server.log*
+  $ cp $HGRCPATH.unconfigured $HGRCPATH
+  $ cat <<'EOF' >> $HGRCPATH
+  > [cmdserver]
+  > log = $TESTTMP/log/server.log
+  > max-repo-cache = 1
+  > track-log = command, repocache
+  > EOF
+
+isolate socket directory for stable result:
+
+  $ OLDCHGSOCKNAME=$CHGSOCKNAME
+  $ mkdir chgsock
+  $ CHGSOCKNAME=`pwd`/chgsock/server
+
+create empty repo and cache it:
+
+  $ hg init cached
+  $ hg id -R cached
+  000000000000 tip
+  $ sleep 1
+
+modify repo (and cache will be invalidated):
+
+  $ touch cached/a
+  $ hg ci -R cached -Am 'add a'
+  adding a
+  $ sleep 1
+
+read cached repo:
+
+  $ hg log -R cached
+  changeset:   0:ac82d8b1f7c4
+  tag:         tip
+  user:        test
+  date:        Thu Jan 01 00:00:00 1970 +0000
+  summary:     add a
+  
+  $ sleep 1
+
+discard cached from LRU cache:
+
+  $ hg clone cached cached2
+  updating to branch default
+  1 files updated, 0 files merged, 0 files removed, 0 files unresolved
+  $ hg id -R cached2
+  ac82d8b1f7c4 tip
+  $ sleep 1
+
+read uncached repo:
+
+  $ hg log -R cached
+  changeset:   0:ac82d8b1f7c4
+  tag:         tip
+  user:        test
+  date:        Thu Jan 01 00:00:00 1970 +0000
+  summary:     add a
+  
+  $ sleep 1
+
+shut down servers and restore environment:
+
+  $ rm -R chgsock
+  $ sleep 2
+  $ CHGSOCKNAME=$OLDCHGSOCKNAME
+
+check server log:
+
+  $ cat log/server.log | filterlog
+  YYYY/MM/DD HH:MM:SS (PID)> init cached
+  YYYY/MM/DD HH:MM:SS (PID)> id -R cached
+  YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in  ...s)
+  YYYY/MM/DD HH:MM:SS (PID)> repo from cache: $TESTTMP/cached
+  YYYY/MM/DD HH:MM:SS (PID)> ci -R cached -Am 'add a'
+  YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in  ...s)
+  YYYY/MM/DD HH:MM:SS (PID)> repo from cache: $TESTTMP/cached
+  YYYY/MM/DD HH:MM:SS (PID)> log -R cached
+  YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in  ...s)
+  YYYY/MM/DD HH:MM:SS (PID)> clone cached cached2
+  YYYY/MM/DD HH:MM:SS (PID)> id -R cached2
+  YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached2 (in  ...s)
+  YYYY/MM/DD HH:MM:SS (PID)> log -R cached
+  YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in  ...s)