# HG changeset patch # User Pierre-Yves David # Date 1513554007 -3600 # Node ID a1ab2588a628acf6cbb48a912605640464b065c1 # Parent b67e0f676a288d049326d21779509ee60a4d2579 stablerange: split pure algorithm part from the on disk cache The stable cache file is getting long, we split the logic about keeping data on disk into a dedicated file. diff -r b67e0f676a28 -r a1ab2588a628 hgext3rd/evolve/obsdiscovery.py --- a/hgext3rd/evolve/obsdiscovery.py Sun Dec 10 05:17:04 2017 +0100 +++ b/hgext3rd/evolve/obsdiscovery.py Mon Dec 18 00:40:07 2017 +0100 @@ -52,6 +52,7 @@ obscache, utility, stablerange, + stablerangecache, ) # prior to hg-4.2 there are not util.timer @@ -69,7 +70,7 @@ _calcsize = struct.calcsize eh = exthelper.exthelper() -eh.merge(stablerange.eh) +eh.merge(stablerangecache.eh) obsexcmsg = utility.obsexcmsg # Config diff -r b67e0f676a28 -r a1ab2588a628 hgext3rd/evolve/stablerange.py --- a/hgext3rd/evolve/stablerange.py Sun Dec 10 05:17:04 2017 +0100 +++ b/hgext3rd/evolve/stablerange.py Mon Dec 18 00:40:07 2017 +0100 @@ -11,13 +11,10 @@ import heapq import math import os -import sqlite3 import time -import weakref from mercurial import ( error, - localrepo, node as nodemod, pycompat, scmutil, @@ -791,225 +788,3 @@ top = (rangeid[0], globalindex) result.append(top) return result - -############################# -### simple sqlite caching ### -############################# - -_sqliteschema = [ - """CREATE TABLE meta(schemaversion INTEGER NOT NULL, - tiprev INTEGER NOT NULL, - tipnode BLOB NOT NULL - );""", - """CREATE TABLE range(rev INTEGER NOT NULL, - idx INTEGER NOT NULL, - PRIMARY KEY(rev, idx));""", - """CREATE TABLE subranges(listidx INTEGER NOT NULL, - suprev INTEGER NOT NULL, - supidx INTEGER NOT NULL, - subrev INTEGER NOT NULL, - subidx INTEGER NOT NULL, - PRIMARY KEY(listidx, suprev, supidx), - FOREIGN KEY (suprev, supidx) REFERENCES range(rev, idx), - FOREIGN KEY (subrev, subidx) REFERENCES range(rev, idx) - );""", - "CREATE INDEX subranges_index ON subranges (suprev, supidx);", - "CREATE INDEX range_index ON range (rev, idx);", -] -_newmeta = "INSERT INTO meta (schemaversion, tiprev, tipnode) VALUES (?,?,?);" -_updatemeta = "UPDATE meta SET tiprev = ?, tipnode = ?;" -_updaterange = "INSERT INTO range(rev, idx) VALUES (?,?);" -_updatesubranges = """INSERT - INTO subranges(listidx, suprev, supidx, subrev, subidx) - VALUES (?,?,?,?,?);""" -_queryexist = "SELECT name FROM sqlite_master WHERE type='table' AND name='meta';" -_querymeta = "SELECT schemaversion, tiprev, tipnode FROM meta;" -_queryrange = "SELECT * FROM range WHERE (rev = ? AND idx = ?);" -_querysubranges = """SELECT subrev, subidx - FROM subranges - WHERE (suprev = ? AND supidx = ?) - ORDER BY listidx;""" - -class sqlstablerange(stablerange): - - _schemaversion = 1 - - def __init__(self, repo): - lrusize = repo.ui.configint('experimental', 'obshashrange.lru-size', - 2000) - super(sqlstablerange, self).__init__(lrusize=lrusize) - self._vfs = repo.vfs - self._path = repo.vfs.join('cache/evoext_stablerange_v1.sqlite') - self._cl = repo.unfiltered().changelog # (okay to keep an old one) - self._ondisktiprev = None - self._ondisktipnode = None - self._unsavedsubranges = {} - - def warmup(self, repo, upto=None): - self._con # make sure the data base is loaded - try: - # samelessly lock the repo to ensure nobody will update the repo - # concurently. This should not be too much of an issue if we warm - # at the end of the transaction. - # - # XXX However, we lock even if we are up to date so we should check - # before locking - with repo.lock(): - super(sqlstablerange, self).warmup(repo, upto) - self._save(repo) - except error.LockError: - # Exceptionnally we are noisy about it since performance impact is - # large We should address that before using this more widely. - repo.ui.warn('stable-range cache: unable to lock repo while warming\n') - repo.ui.warn('(cache will not be saved)\n') - super(sqlstablerange, self).warmup(repo, upto) - - def _getsub(self, rangeid): - cache = self._subrangescache - if rangeid not in cache and rangeid[0] <= self._ondisktiprev and self._con is not None: - value = None - result = self._con.execute(_queryrange, rangeid).fetchone() - if result is not None: # database know about this node (skip in the future?) - value = self._con.execute(_querysubranges, rangeid).fetchall() - # in memory caching of the value - cache[rangeid] = value - return cache.get(rangeid) - - def _setsub(self, rangeid, value): - assert rangeid not in self._unsavedsubranges - self._unsavedsubranges[rangeid] = value - super(sqlstablerange, self)._setsub(rangeid, value) - - def _db(self): - try: - util.makedirs(self._vfs.dirname(self._path)) - except OSError: - return None - con = sqlite3.connect(self._path) - con.text_factory = str - return con - - @util.propertycache - def _con(self): - con = self._db() - if con is None: - return None - cur = con.execute(_queryexist) - if cur.fetchone() is None: - return None - meta = con.execute(_querymeta).fetchone() - if meta is None: - return None - if meta[0] != self._schemaversion: - return None - if len(self._cl) <= meta[1]: - return None - if self._cl.node(meta[1]) != meta[2]: - return None - self._ondisktiprev = meta[1] - self._ondisktipnode = meta[2] - if self._tiprev < self._ondisktiprev: - self._tiprev = self._ondisktiprev - self._tipnode = self._ondisktipnode - return con - - def _save(self, repo): - repo = repo.unfiltered() - repo.depthcache.save(repo) - if not self._unsavedsubranges: - return # no new data - - if self._con is None: - util.unlinkpath(self._path, ignoremissing=True) - if '_con' in vars(self): - del self._con - - con = self._db() - if con is None: - return - with con: - for req in _sqliteschema: - con.execute(req) - - meta = [self._schemaversion, - self._tiprev, - self._tipnode, - ] - con.execute(_newmeta, meta) - else: - con = self._con - meta = con.execute(_querymeta).fetchone() - if meta[2] != self._ondisktipnode or meta[1] != self._ondisktiprev: - # drifting is currently an issue because this means another - # process might have already added the cache line we are about - # to add. This will confuse sqlite - msg = _('stable-range cache: skipping write, ' - 'database drifted under my feet\n') - hint = _('(disk: %s-%s vs mem: %s%s)\n') - data = (meta[2], meta[1], self._ondisktiprev, self._ondisktipnode) - repo.ui.warn(msg) - repo.ui.warn(hint % data) - return - meta = [self._tiprev, - self._tipnode, - ] - con.execute(_updatemeta, meta) - - self._saverange(con, repo) - con.commit() - self._ondisktiprev = self._tiprev - self._ondisktipnode = self._tipnode - self._unsavedsubranges.clear() - - def _saverange(self, con, repo): - repo = repo.unfiltered() - data = [] - allranges = set() - for key, value in self._unsavedsubranges.items(): - allranges.add(key) - for idx, sub in enumerate(value): - data.append((idx, key[0], key[1], sub[0], sub[1])) - - con.executemany(_updaterange, allranges) - con.executemany(_updatesubranges, data) - - -@eh.reposetup -def setupcache(ui, repo): - - class stablerangerepo(repo.__class__): - - @localrepo.unfilteredpropertycache - def stablerange(self): - return sqlstablerange(repo) - - @localrepo.unfilteredmethod - def destroyed(self): - if 'stablerange' in vars(self): - del self.stablerange - super(stablerangerepo, self).destroyed() - - def transaction(self, *args, **kwargs): - tr = super(stablerangerepo, self).transaction(*args, **kwargs) - if not repo.ui.configbool('experimental', 'obshashrange', False): - return tr - if not repo.ui.configbool('experimental', 'obshashrange.warm-cache', - True): - return tr - maxrevs = self.ui.configint('experimental', 'obshashrange.max-revs', None) - if maxrevs is not None and maxrevs < len(self.unfiltered()): - return tr - reporef = weakref.ref(self) - - def _warmcache(tr): - repo = reporef() - if repo is None: - return - if 'node' in tr.hookargs: - # new nodes ! - repo.stablerange.warmup(repo) - - tr.addpostclose('warmcache-10-stablerange', _warmcache) - return tr - - repo.__class__ = stablerangerepo diff -r b67e0f676a28 -r a1ab2588a628 hgext3rd/evolve/stablerangecache.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/hgext3rd/evolve/stablerangecache.py Mon Dec 18 00:40:07 2017 +0100 @@ -0,0 +1,238 @@ +import sqlite3 +import weakref + +from mercurial import ( + error, + localrepo, + util, +) + +from . import ( + exthelper, + stablerange, +) + +from mercurial.i18n import _ + +eh = exthelper.exthelper() +eh.merge(stablerange.eh) +############################# +### simple sqlite caching ### +############################# + +_sqliteschema = [ + """CREATE TABLE meta(schemaversion INTEGER NOT NULL, + tiprev INTEGER NOT NULL, + tipnode BLOB NOT NULL + );""", + """CREATE TABLE range(rev INTEGER NOT NULL, + idx INTEGER NOT NULL, + PRIMARY KEY(rev, idx));""", + """CREATE TABLE subranges(listidx INTEGER NOT NULL, + suprev INTEGER NOT NULL, + supidx INTEGER NOT NULL, + subrev INTEGER NOT NULL, + subidx INTEGER NOT NULL, + PRIMARY KEY(listidx, suprev, supidx), + FOREIGN KEY (suprev, supidx) REFERENCES range(rev, idx), + FOREIGN KEY (subrev, subidx) REFERENCES range(rev, idx) + );""", + "CREATE INDEX subranges_index ON subranges (suprev, supidx);", + "CREATE INDEX range_index ON range (rev, idx);", +] +_newmeta = "INSERT INTO meta (schemaversion, tiprev, tipnode) VALUES (?,?,?);" +_updatemeta = "UPDATE meta SET tiprev = ?, tipnode = ?;" +_updaterange = "INSERT INTO range(rev, idx) VALUES (?,?);" +_updatesubranges = """INSERT + INTO subranges(listidx, suprev, supidx, subrev, subidx) + VALUES (?,?,?,?,?);""" +_queryexist = "SELECT name FROM sqlite_master WHERE type='table' AND name='meta';" +_querymeta = "SELECT schemaversion, tiprev, tipnode FROM meta;" +_queryrange = "SELECT * FROM range WHERE (rev = ? AND idx = ?);" +_querysubranges = """SELECT subrev, subidx + FROM subranges + WHERE (suprev = ? AND supidx = ?) + ORDER BY listidx;""" + +class sqlstablerange(stablerange.stablerange): + + _schemaversion = 1 + + def __init__(self, repo): + lrusize = repo.ui.configint('experimental', 'obshashrange.lru-size', + 2000) + super(sqlstablerange, self).__init__(lrusize=lrusize) + self._vfs = repo.vfs + self._path = repo.vfs.join('cache/evoext_stablerange_v1.sqlite') + self._cl = repo.unfiltered().changelog # (okay to keep an old one) + self._ondisktiprev = None + self._ondisktipnode = None + self._unsavedsubranges = {} + + def warmup(self, repo, upto=None): + self._con # make sure the data base is loaded + try: + # samelessly lock the repo to ensure nobody will update the repo + # concurently. This should not be too much of an issue if we warm + # at the end of the transaction. + # + # XXX However, we lock even if we are up to date so we should check + # before locking + with repo.lock(): + super(sqlstablerange, self).warmup(repo, upto) + self._save(repo) + except error.LockError: + # Exceptionnally we are noisy about it since performance impact is + # large We should address that before using this more widely. + repo.ui.warn('stable-range cache: unable to lock repo while warming\n') + repo.ui.warn('(cache will not be saved)\n') + super(sqlstablerange, self).warmup(repo, upto) + + def _getsub(self, rangeid): + cache = self._subrangescache + if rangeid not in cache and rangeid[0] <= self._ondisktiprev and self._con is not None: + value = None + result = self._con.execute(_queryrange, rangeid).fetchone() + if result is not None: # database know about this node (skip in the future?) + value = self._con.execute(_querysubranges, rangeid).fetchall() + # in memory caching of the value + cache[rangeid] = value + return cache.get(rangeid) + + def _setsub(self, rangeid, value): + assert rangeid not in self._unsavedsubranges + self._unsavedsubranges[rangeid] = value + super(sqlstablerange, self)._setsub(rangeid, value) + + def _db(self): + try: + util.makedirs(self._vfs.dirname(self._path)) + except OSError: + return None + con = sqlite3.connect(self._path) + con.text_factory = str + return con + + @util.propertycache + def _con(self): + con = self._db() + if con is None: + return None + cur = con.execute(_queryexist) + if cur.fetchone() is None: + return None + meta = con.execute(_querymeta).fetchone() + if meta is None: + return None + if meta[0] != self._schemaversion: + return None + if len(self._cl) <= meta[1]: + return None + if self._cl.node(meta[1]) != meta[2]: + return None + self._ondisktiprev = meta[1] + self._ondisktipnode = meta[2] + if self._tiprev < self._ondisktiprev: + self._tiprev = self._ondisktiprev + self._tipnode = self._ondisktipnode + return con + + def _save(self, repo): + repo = repo.unfiltered() + repo.depthcache.save(repo) + if not self._unsavedsubranges: + return # no new data + + if self._con is None: + util.unlinkpath(self._path, ignoremissing=True) + if '_con' in vars(self): + del self._con + + con = self._db() + if con is None: + return + with con: + for req in _sqliteschema: + con.execute(req) + + meta = [self._schemaversion, + self._tiprev, + self._tipnode, + ] + con.execute(_newmeta, meta) + else: + con = self._con + meta = con.execute(_querymeta).fetchone() + if meta[2] != self._ondisktipnode or meta[1] != self._ondisktiprev: + # drifting is currently an issue because this means another + # process might have already added the cache line we are about + # to add. This will confuse sqlite + msg = _('stable-range cache: skipping write, ' + 'database drifted under my feet\n') + hint = _('(disk: %s-%s vs mem: %s%s)\n') + data = (meta[2], meta[1], self._ondisktiprev, self._ondisktipnode) + repo.ui.warn(msg) + repo.ui.warn(hint % data) + return + meta = [self._tiprev, + self._tipnode, + ] + con.execute(_updatemeta, meta) + + self._saverange(con, repo) + con.commit() + self._ondisktiprev = self._tiprev + self._ondisktipnode = self._tipnode + self._unsavedsubranges.clear() + + def _saverange(self, con, repo): + repo = repo.unfiltered() + data = [] + allranges = set() + for key, value in self._unsavedsubranges.items(): + allranges.add(key) + for idx, sub in enumerate(value): + data.append((idx, key[0], key[1], sub[0], sub[1])) + + con.executemany(_updaterange, allranges) + con.executemany(_updatesubranges, data) + +@eh.reposetup +def setupcache(ui, repo): + + class stablerangerepo(repo.__class__): + + @localrepo.unfilteredpropertycache + def stablerange(self): + return sqlstablerange(repo) + + @localrepo.unfilteredmethod + def destroyed(self): + if 'stablerange' in vars(self): + del self.stablerange + super(stablerangerepo, self).destroyed() + + def transaction(self, *args, **kwargs): + tr = super(stablerangerepo, self).transaction(*args, **kwargs) + if not repo.ui.configbool('experimental', 'obshashrange', False): + return tr + if not repo.ui.configbool('experimental', 'obshashrange.warm-cache', + True): + return tr + maxrevs = self.ui.configint('experimental', 'obshashrange.max-revs', None) + if maxrevs is not None and maxrevs < len(self.unfiltered()): + return tr + reporef = weakref.ref(self) + + def _warmcache(tr): + repo = reporef() + if repo is None: + return + if 'node' in tr.hookargs: + # new nodes ! + repo.stablerange.warmup(repo) + + tr.addpostclose('warmcache-10-stablerange', _warmcache) + return tr + + repo.__class__ = stablerangerepo