commandserver: preload repository in master server and reuse its file cache
This greatly speeds up repository operation with lots of obsolete markers:
$ ls -lh .hg/store/obsstore
-rw-r--r-- 1 yuya yuya 21M Dec 2 17:55 .hg/store/obsstore
$ time hg log -G -l10 --pager no
(hg) 1.79s user 0.13s system 99% cpu 1.919 total
(chg uncached) 0.00s user 0.01s system 0% cpu 1.328 total
(chg cached) 0.00s user 0.00s system 3% cpu 0.180 total
As you can see, the implementation of the preloader function is highly
experimental. It works, but I'm yet to be sure how things can be organized.
So I don't want to formalize the API at this point.
--- a/mercurial/commandserver.py Wed Oct 31 22:19:03 2018 +0900
+++ b/mercurial/commandserver.py Wed Oct 31 22:43:08 2018 +0900
@@ -28,6 +28,7 @@
error,
loggingutil,
pycompat,
+ repocache,
util,
vfs as vfsmod,
)
@@ -511,6 +512,11 @@
self._oldsigchldhandler = None
self._workerpids = set() # updated by signal handler; do not iterate
self._socketunlinked = None
+ # experimental config: cmdserver.max-repo-cache
+ maxlen = ui.configint(b'cmdserver', b'max-repo-cache')
+ if maxlen < 0:
+ raise error.Abort(_('negative max-repo-cache size not allowed'))
+ self._repoloader = repocache.repoloader(ui, maxlen)
def init(self):
self._sock = socket.socket(socket.AF_UNIX)
@@ -525,6 +531,7 @@
o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
self._oldsigchldhandler = o
self._socketunlinked = False
+ self._repoloader.start()
def _unlinksocket(self):
if not self._socketunlinked:
@@ -537,6 +544,7 @@
self._mainipc.close()
self._workeripc.close()
self._unlinksocket()
+ self._repoloader.stop()
# don't kill child processes as they have active clients, just wait
self._reapworkers(0)
@@ -590,6 +598,10 @@
return
raise
+ # Future improvement: On Python 3.7, maybe gc.freeze() can be used
+ # to prevent COW memory from being touched by GC.
+ # https://instagram-engineering.com/
+ # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
pid = os.fork()
if pid:
try:
@@ -622,8 +634,7 @@
if inst.args[0] == errno.EINTR:
return
raise
-
- self.ui.log(b'cmdserver', b'repository: %s\n', path)
+ self._repoloader.load(path)
def _sigchldhandler(self, signal, frame):
self._reapworkers(os.WNOHANG)
@@ -671,3 +682,9 @@
repo.__class__ = unixcmdserverrepo
repo._cmdserveripc = self._workeripc
+
+ cachedrepo = self._repoloader.get(repo.root)
+ if cachedrepo is None:
+ return
+ repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root)
+ repocache.copycache(cachedrepo, repo)
--- a/mercurial/configitems.py Wed Oct 31 22:19:03 2018 +0900
+++ b/mercurial/configitems.py Wed Oct 31 22:43:08 2018 +0900
@@ -179,11 +179,14 @@
coreconfigitem('cmdserver', 'max-log-size',
default='1 MB',
)
+coreconfigitem('cmdserver', 'max-repo-cache',
+ default=0,
+)
coreconfigitem('cmdserver', 'message-encodings',
default=list,
)
coreconfigitem('cmdserver', 'track-log',
- default=lambda: ['chgserver', 'cmdserver'],
+ default=lambda: ['chgserver', 'cmdserver', 'repocache'],
)
coreconfigitem('color', '.*',
default=None,
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/mercurial/repocache.py Wed Oct 31 22:43:08 2018 +0900
@@ -0,0 +1,131 @@
+# repocache.py - in-memory repository cache for long-running services
+#
+# Copyright 2018 Yuya Nishihara <yuya@tcha.org>
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+from __future__ import absolute_import
+
+import collections
+import gc
+import threading
+
+from . import (
+ error,
+ hg,
+ obsolete,
+ scmutil,
+ util,
+)
+
+class repoloader(object):
+ """Load repositories in background thread
+
+ This is designed for a forking server. A cached repo cannot be obtained
+ until the server fork()s a worker and the loader thread stops.
+ """
+
+ def __init__(self, ui, maxlen):
+ self._ui = ui.copy()
+ self._cache = util.lrucachedict(max=maxlen)
+ # use deque and Event instead of Queue since deque can discard
+ # old items to keep at most maxlen items.
+ self._inqueue = collections.deque(maxlen=maxlen)
+ self._accepting = False
+ self._newentry = threading.Event()
+ self._thread = None
+
+ def start(self):
+ assert not self._thread
+ if self._inqueue.maxlen == 0:
+ # no need to spawn loader thread as the cache is disabled
+ return
+ self._accepting = True
+ self._thread = threading.Thread(target=self._mainloop)
+ self._thread.start()
+
+ def stop(self):
+ if not self._thread:
+ return
+ self._accepting = False
+ self._newentry.set()
+ self._thread.join()
+ self._thread = None
+ self._cache.clear()
+ self._inqueue.clear()
+
+ def load(self, path):
+ """Request to load the specified repository in background"""
+ self._inqueue.append(path)
+ self._newentry.set()
+
+ def get(self, path):
+ """Return a cached repo if available
+
+ This function must be called after fork(), where the loader thread
+ is stopped. Otherwise, the returned repo might be updated by the
+ loader thread.
+ """
+ if self._thread and self._thread.is_alive():
+ raise error.ProgrammingError(b'cannot obtain cached repo while '
+ b'loader is active')
+ return self._cache.peek(path, None)
+
+ def _mainloop(self):
+ while self._accepting:
+ # Avoid heavy GC after fork(), which would cancel the benefit of
+ # COW. We assume that GIL is acquired while GC is underway in the
+ # loader thread. If that isn't true, we might have to move
+ # gc.collect() to the main thread so that fork() would never stop
+ # the thread where GC is in progress.
+ gc.collect()
+
+ self._newentry.wait()
+ while self._accepting:
+ self._newentry.clear()
+ try:
+ path = self._inqueue.popleft()
+ except IndexError:
+ break
+ scmutil.callcatch(self._ui, lambda: self._load(path))
+
+ def _load(self, path):
+ start = util.timer()
+ # TODO: repo should be recreated if storage configuration changed
+ try:
+ # pop before loading so inconsistent state wouldn't be exposed
+ repo = self._cache.pop(path)
+ except KeyError:
+ repo = hg.repository(self._ui, path).unfiltered()
+ _warmupcache(repo)
+ repo.ui.log(b'repocache', b'loaded repo into cache: %s (in %.3fs)\n',
+ path, util.timer() - start)
+ self._cache.insert(path, repo)
+
+# TODO: think about proper API of preloading cache
+def _warmupcache(repo):
+ repo.invalidateall()
+ repo.changelog
+ repo.obsstore._all
+ repo.obsstore.successors
+ repo.obsstore.predecessors
+ repo.obsstore.children
+ for name in obsolete.cachefuncs:
+ obsolete.getrevs(repo, name)
+ repo._phasecache.loadphaserevs(repo)
+
+# TODO: think about proper API of attaching preloaded attributes
+def copycache(srcrepo, destrepo):
+ """Copy cached attributes from srcrepo to destrepo"""
+ destfilecache = destrepo._filecache
+ srcfilecache = srcrepo._filecache
+ if 'changelog' in srcfilecache:
+ destfilecache['changelog'] = ce = srcfilecache['changelog']
+ ce.obj.opener = ce.obj._realopener = destrepo.svfs
+ if 'obsstore' in srcfilecache:
+ destfilecache['obsstore'] = ce = srcfilecache['obsstore']
+ ce.obj.svfs = destrepo.svfs
+ if '_phasecache' in srcfilecache:
+ destfilecache['_phasecache'] = ce = srcfilecache['_phasecache']
+ ce.obj.opener = destrepo.svfs
--- a/tests/test-chg.t Wed Oct 31 22:19:03 2018 +0900
+++ b/tests/test-chg.t Wed Oct 31 22:43:08 2018 +0900
@@ -1,6 +1,7 @@
#require chg
$ mkdir log
+ $ cp $HGRCPATH $HGRCPATH.unconfigured
$ cat <<'EOF' >> $HGRCPATH
> [cmdserver]
> log = $TESTTMP/log/server.log
@@ -13,6 +14,7 @@
> sed -e 's!^[0-9/]* [0-9:]* ([0-9]*)>!YYYY/MM/DD HH:MM:SS (PID)>!' \
> -e 's!\(setprocname\|received fds\|setenv\): .*!\1: ...!' \
> -e 's!\(confighash\|mtimehash\) = [0-9a-f]*!\1 = ...!g' \
+ > -e 's!\(in \)[0-9.]*s\b!\1 ...s!g' \
> -e 's!\(pid\)=[0-9]*!\1=...!g' \
> -e 's!\(/server-\)[0-9a-f]*!\1...!g'
> }
@@ -230,6 +232,7 @@
preserved:
$ cat log/server.log.1 log/server.log | tail -10 | filterlog
+ YYYY/MM/DD HH:MM:SS (PID)> forked worker process (pid=...)
YYYY/MM/DD HH:MM:SS (PID)> setprocname: ...
YYYY/MM/DD HH:MM:SS (PID)> received fds: ...
YYYY/MM/DD HH:MM:SS (PID)> chdir to '$TESTTMP/extreload'
@@ -237,6 +240,92 @@
YYYY/MM/DD HH:MM:SS (PID)> setenv: ...
YYYY/MM/DD HH:MM:SS (PID)> confighash = ... mtimehash = ...
YYYY/MM/DD HH:MM:SS (PID)> validate: []
- YYYY/MM/DD HH:MM:SS (PID)> repository: $TESTTMP/extreload
YYYY/MM/DD HH:MM:SS (PID)> worker process exited (pid=...)
YYYY/MM/DD HH:MM:SS (PID)> $TESTTMP/extreload/chgsock/server-... is not owned, exiting.
+
+repository cache
+----------------
+
+ $ rm log/server.log*
+ $ cp $HGRCPATH.unconfigured $HGRCPATH
+ $ cat <<'EOF' >> $HGRCPATH
+ > [cmdserver]
+ > log = $TESTTMP/log/server.log
+ > max-repo-cache = 1
+ > track-log = command, repocache
+ > EOF
+
+isolate socket directory for stable result:
+
+ $ OLDCHGSOCKNAME=$CHGSOCKNAME
+ $ mkdir chgsock
+ $ CHGSOCKNAME=`pwd`/chgsock/server
+
+create empty repo and cache it:
+
+ $ hg init cached
+ $ hg id -R cached
+ 000000000000 tip
+ $ sleep 1
+
+modify repo (and cache will be invalidated):
+
+ $ touch cached/a
+ $ hg ci -R cached -Am 'add a'
+ adding a
+ $ sleep 1
+
+read cached repo:
+
+ $ hg log -R cached
+ changeset: 0:ac82d8b1f7c4
+ tag: tip
+ user: test
+ date: Thu Jan 01 00:00:00 1970 +0000
+ summary: add a
+
+ $ sleep 1
+
+discard cached from LRU cache:
+
+ $ hg clone cached cached2
+ updating to branch default
+ 1 files updated, 0 files merged, 0 files removed, 0 files unresolved
+ $ hg id -R cached2
+ ac82d8b1f7c4 tip
+ $ sleep 1
+
+read uncached repo:
+
+ $ hg log -R cached
+ changeset: 0:ac82d8b1f7c4
+ tag: tip
+ user: test
+ date: Thu Jan 01 00:00:00 1970 +0000
+ summary: add a
+
+ $ sleep 1
+
+shut down servers and restore environment:
+
+ $ rm -R chgsock
+ $ sleep 2
+ $ CHGSOCKNAME=$OLDCHGSOCKNAME
+
+check server log:
+
+ $ cat log/server.log | filterlog
+ YYYY/MM/DD HH:MM:SS (PID)> init cached
+ YYYY/MM/DD HH:MM:SS (PID)> id -R cached
+ YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s)
+ YYYY/MM/DD HH:MM:SS (PID)> repo from cache: $TESTTMP/cached
+ YYYY/MM/DD HH:MM:SS (PID)> ci -R cached -Am 'add a'
+ YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s)
+ YYYY/MM/DD HH:MM:SS (PID)> repo from cache: $TESTTMP/cached
+ YYYY/MM/DD HH:MM:SS (PID)> log -R cached
+ YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s)
+ YYYY/MM/DD HH:MM:SS (PID)> clone cached cached2
+ YYYY/MM/DD HH:MM:SS (PID)> id -R cached2
+ YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached2 (in ...s)
+ YYYY/MM/DD HH:MM:SS (PID)> log -R cached
+ YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s)