hgext/sqlitestore.py
changeset 40326 fed697fa1734
child 40387 f1a39128da95
equal deleted inserted replaced
40325:b0fbd1792e2d 40326:fed697fa1734
       
     1 # sqlitestore.py - Storage backend that uses SQLite
       
     2 #
       
     3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
       
     4 #
       
     5 # This software may be used and distributed according to the terms of the
       
     6 # GNU General Public License version 2 or any later version.
       
     7 
       
     8 """store repository data in SQLite (EXPERIMENTAL)
       
     9 
       
    10 The sqlitestore extension enables the storage of repository data in SQLite.
       
    11 
       
    12 This extension is HIGHLY EXPERIMENTAL. There are NO BACKWARDS COMPATIBILITY
       
    13 GUARANTEES. This means that repositories created with this extension may
       
    14 only be usable with the exact version of this extension/Mercurial that was
       
    15 used. The extension attempts to enforce this in order to prevent repository
       
    16 corruption.
       
    17 
       
    18 In addition, several features are not yet supported or have known bugs:
       
    19 
       
    20 * Only some data is stored in SQLite. Changeset, manifest, and other repository
       
    21   data is not yet stored in SQLite.
       
    22 * Transactions are not robust. If the process is aborted at the right time
       
    23   during transaction close/rollback, the repository could be in an inconsistent
       
    24   state. This problem will diminish once all repository data is tracked by
       
    25   SQLite.
       
    26 * Bundle repositories do not work (the ability to use e.g.
       
    27   `hg -R <bundle-file> log` to automatically overlay a bundle on top of the
       
    28   existing repository).
       
    29 * Various other features don't work.
       
    30 
       
    31 This extension should work for basic clone/pull, update, and commit workflows.
       
    32 Some history rewriting operations may fail due to lack of support for bundle
       
    33 repositories.
       
    34 
       
    35 To use, activate the extension and set the ``storage.new-repo-backend`` config
       
    36 option to ``sqlite`` to enable new repositories to use SQLite for storage.
       
    37 """
       
    38 
       
    39 # To run the test suite with repos using SQLite by default, execute the
       
    40 # following:
       
    41 #
       
    42 # HGREPOFEATURES="sqlitestore" run-tests.py \
       
    43 #     --extra-config-opt extensions.sqlitestore= \
       
    44 #     --extra-config-opt storage.new-repo-backend=sqlite
       
    45 
       
    46 from __future__ import absolute_import
       
    47 
       
    48 import hashlib
       
    49 import sqlite3
       
    50 import struct
       
    51 import threading
       
    52 import zlib
       
    53 
       
    54 from mercurial.i18n import _
       
    55 from mercurial.node import (
       
    56     nullid,
       
    57     nullrev,
       
    58     short,
       
    59 )
       
    60 from mercurial.thirdparty import (
       
    61     attr,
       
    62 )
       
    63 from mercurial import (
       
    64     ancestor,
       
    65     dagop,
       
    66     error,
       
    67     extensions,
       
    68     localrepo,
       
    69     mdiff,
       
    70     pycompat,
       
    71     registrar,
       
    72     repository,
       
    73     util,
       
    74     verify,
       
    75 )
       
    76 from mercurial.utils import (
       
    77     interfaceutil,
       
    78     storageutil,
       
    79 )
       
    80 
       
    81 try:
       
    82     from mercurial import zstd
       
    83     zstd.__version__
       
    84 except ImportError:
       
    85     zstd = None
       
    86 
       
    87 configtable = {}
       
    88 configitem = registrar.configitem(configtable)
       
    89 
       
    90 # experimental config: storage.sqlite.compression
       
    91 configitem('storage', 'sqlite.compression',
       
    92            default='zstd' if zstd else 'zlib')
       
    93 
       
    94 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
       
    95 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
       
    96 # be specifying the version(s) of Mercurial they are tested with, or
       
    97 # leave the attribute unspecified.
       
    98 testedwith = 'ships-with-hg-core'
       
    99 
       
   100 REQUIREMENT = b'exp-sqlite-001'
       
   101 REQUIREMENT_ZSTD = b'exp-sqlite-comp-001=zstd'
       
   102 REQUIREMENT_ZLIB = b'exp-sqlite-comp-001=zlib'
       
   103 REQUIREMENT_NONE = b'exp-sqlite-comp-001=none'
       
   104 
       
   105 CURRENT_SCHEMA_VERSION = 1
       
   106 
       
   107 COMPRESSION_NONE = 1
       
   108 COMPRESSION_ZSTD = 2
       
   109 COMPRESSION_ZLIB = 3
       
   110 
       
   111 FLAG_CENSORED = 1
       
   112 
       
   113 CREATE_SCHEMA = [
       
   114     # Deltas are stored as content-indexed blobs.
       
   115     # compression column holds COMPRESSION_* constant for how the
       
   116     # delta is encoded.
       
   117 
       
   118     r'CREATE TABLE delta ('
       
   119     r'    id INTEGER PRIMARY KEY, '
       
   120     r'    compression INTEGER NOT NULL, '
       
   121     r'    hash BLOB UNIQUE ON CONFLICT ABORT, '
       
   122     r'    delta BLOB NOT NULL '
       
   123     r')',
       
   124 
       
   125     # Tracked paths are denormalized to integers to avoid redundant
       
   126     # storage of the path name.
       
   127     r'CREATE TABLE filepath ('
       
   128     r'    id INTEGER PRIMARY KEY, '
       
   129     r'    path BLOB NOT NULL '
       
   130     r')',
       
   131 
       
   132     r'CREATE UNIQUE INDEX filepath_path '
       
   133     r'    ON filepath (path)',
       
   134 
       
   135     # We have a single table for all file revision data.
       
   136     # Each file revision is uniquely described by a (path, rev) and
       
   137     # (path, node).
       
   138     #
       
   139     # Revision data is stored as a pointer to the delta producing this
       
   140     # revision and the file revision whose delta should be applied before
       
   141     # that one. One can reconstruct the delta chain by recursively following
       
   142     # the delta base revision pointers until one encounters NULL.
       
   143     #
       
   144     # flags column holds bitwise integer flags controlling storage options.
       
   145     # These flags are defined by the FLAG_* constants.
       
   146     r'CREATE TABLE fileindex ('
       
   147     r'    id INTEGER PRIMARY KEY, '
       
   148     r'    pathid INTEGER REFERENCES filepath(id), '
       
   149     r'    revnum INTEGER NOT NULL, '
       
   150     r'    p1rev INTEGER NOT NULL, '
       
   151     r'    p2rev INTEGER NOT NULL, '
       
   152     r'    linkrev INTEGER NOT NULL, '
       
   153     r'    flags INTEGER NOT NULL, '
       
   154     r'    deltaid INTEGER REFERENCES delta(id), '
       
   155     r'    deltabaseid INTEGER REFERENCES fileindex(id), '
       
   156     r'    node BLOB NOT NULL '
       
   157     r')',
       
   158 
       
   159     r'CREATE UNIQUE INDEX fileindex_pathrevnum '
       
   160     r'    ON fileindex (pathid, revnum)',
       
   161 
       
   162     r'CREATE UNIQUE INDEX fileindex_pathnode '
       
   163     r'    ON fileindex (pathid, node)',
       
   164 
       
   165     # Provide a view over all file data for convenience.
       
   166     r'CREATE VIEW filedata AS '
       
   167     r'SELECT '
       
   168     r'    fileindex.id AS id, '
       
   169     r'    filepath.id AS pathid, '
       
   170     r'    filepath.path AS path, '
       
   171     r'    fileindex.revnum AS revnum, '
       
   172     r'    fileindex.node AS node, '
       
   173     r'    fileindex.p1rev AS p1rev, '
       
   174     r'    fileindex.p2rev AS p2rev, '
       
   175     r'    fileindex.linkrev AS linkrev, '
       
   176     r'    fileindex.flags AS flags, '
       
   177     r'    fileindex.deltaid AS deltaid, '
       
   178     r'    fileindex.deltabaseid AS deltabaseid '
       
   179     r'FROM filepath, fileindex '
       
   180     r'WHERE fileindex.pathid=filepath.id',
       
   181 
       
   182     r'PRAGMA user_version=%d' % CURRENT_SCHEMA_VERSION,
       
   183 ]
       
   184 
       
   185 def resolvedeltachain(db, pathid, node, revisioncache,
       
   186                       stoprids, zstddctx=None):
       
   187     """Resolve a delta chain for a file node."""
       
   188 
       
   189     # TODO the "not in ({stops})" here is possibly slowing down the query
       
   190     # because it needs to perform the lookup on every recursive invocation.
       
   191     # This could possibly be faster if we created a temporary query with
       
   192     # baseid "poisoned" to null and limited the recursive filter to
       
   193     # "is not null".
       
   194     res = db.execute(
       
   195         r'WITH RECURSIVE '
       
   196         r'    deltachain(deltaid, baseid) AS ('
       
   197         r'        SELECT deltaid, deltabaseid FROM fileindex '
       
   198         r'            WHERE pathid=? AND node=? '
       
   199         r'        UNION ALL '
       
   200         r'        SELECT fileindex.deltaid, deltabaseid '
       
   201         r'            FROM fileindex, deltachain '
       
   202         r'            WHERE '
       
   203         r'                fileindex.id=deltachain.baseid '
       
   204         r'                AND deltachain.baseid IS NOT NULL '
       
   205         r'                AND fileindex.id NOT IN ({stops}) '
       
   206         r'    ) '
       
   207         r'SELECT deltachain.baseid, compression, delta '
       
   208         r'FROM deltachain, delta '
       
   209         r'WHERE delta.id=deltachain.deltaid'.format(
       
   210             stops=r','.join([r'?'] * len(stoprids))),
       
   211         tuple([pathid, node] + list(stoprids.keys())))
       
   212 
       
   213     deltas = []
       
   214     lastdeltabaseid = None
       
   215 
       
   216     for deltabaseid, compression, delta in res:
       
   217         lastdeltabaseid = deltabaseid
       
   218 
       
   219         if compression == COMPRESSION_ZSTD:
       
   220             delta = zstddctx.decompress(delta)
       
   221         elif compression == COMPRESSION_NONE:
       
   222             delta = delta
       
   223         elif compression == COMPRESSION_ZLIB:
       
   224             delta = zlib.decompress(delta)
       
   225         else:
       
   226             raise SQLiteStoreError('unhandled compression type: %d' %
       
   227                                    compression)
       
   228 
       
   229         deltas.append(delta)
       
   230 
       
   231     if lastdeltabaseid in stoprids:
       
   232         basetext = revisioncache[stoprids[lastdeltabaseid]]
       
   233     else:
       
   234         basetext = deltas.pop()
       
   235 
       
   236     deltas.reverse()
       
   237     fulltext = mdiff.patches(basetext, deltas)
       
   238 
       
   239     # SQLite returns buffer instances for blob columns on Python 2. This
       
   240     # type can propagate through the delta application layer. Because
       
   241     # downstream callers assume revisions are bytes, cast as needed.
       
   242     if not isinstance(fulltext, bytes):
       
   243         fulltext = bytes(delta)
       
   244 
       
   245     return fulltext
       
   246 
       
   247 def insertdelta(db, compression, hash, delta):
       
   248     try:
       
   249         return db.execute(
       
   250             r'INSERT INTO delta (compression, hash, delta) '
       
   251             r'VALUES (?, ?, ?)',
       
   252             (compression, hash, delta)).lastrowid
       
   253     except sqlite3.IntegrityError:
       
   254         return db.execute(
       
   255             r'SELECT id FROM delta WHERE hash=?',
       
   256             (hash,)).fetchone()[0]
       
   257 
       
   258 class SQLiteStoreError(error.StorageError):
       
   259     pass
       
   260 
       
   261 @attr.s
       
   262 class revisionentry(object):
       
   263     rid = attr.ib()
       
   264     rev = attr.ib()
       
   265     node = attr.ib()
       
   266     p1rev = attr.ib()
       
   267     p2rev = attr.ib()
       
   268     p1node = attr.ib()
       
   269     p2node = attr.ib()
       
   270     linkrev = attr.ib()
       
   271     flags = attr.ib()
       
   272 
       
   273 @interfaceutil.implementer(repository.irevisiondelta)
       
   274 @attr.s(slots=True)
       
   275 class sqliterevisiondelta(object):
       
   276     node = attr.ib()
       
   277     p1node = attr.ib()
       
   278     p2node = attr.ib()
       
   279     basenode = attr.ib()
       
   280     flags = attr.ib()
       
   281     baserevisionsize = attr.ib()
       
   282     revision = attr.ib()
       
   283     delta = attr.ib()
       
   284     linknode = attr.ib(default=None)
       
   285 
       
   286 @interfaceutil.implementer(repository.iverifyproblem)
       
   287 @attr.s(frozen=True)
       
   288 class sqliteproblem(object):
       
   289     warning = attr.ib(default=None)
       
   290     error = attr.ib(default=None)
       
   291     node = attr.ib(default=None)
       
   292 
       
   293 @interfaceutil.implementer(repository.ifilestorage)
       
   294 class sqlitefilestore(object):
       
   295     """Implements storage for an individual tracked path."""
       
   296 
       
   297     def __init__(self, db, path, compression):
       
   298         self._db = db
       
   299         self._path = path
       
   300 
       
   301         self._pathid = None
       
   302 
       
   303         # revnum -> node
       
   304         self._revtonode = {}
       
   305         # node -> revnum
       
   306         self._nodetorev = {}
       
   307         # node -> data structure
       
   308         self._revisions = {}
       
   309 
       
   310         self._revisioncache = util.lrucachedict(10)
       
   311 
       
   312         self._compengine = compression
       
   313 
       
   314         if compression == 'zstd':
       
   315             self._cctx = zstd.ZstdCompressor(level=3)
       
   316             self._dctx = zstd.ZstdDecompressor()
       
   317         else:
       
   318             self._cctx = None
       
   319             self._dctx = None
       
   320 
       
   321         self._refreshindex()
       
   322 
       
   323     def _refreshindex(self):
       
   324         self._revtonode = {}
       
   325         self._nodetorev = {}
       
   326         self._revisions = {}
       
   327 
       
   328         res = list(self._db.execute(
       
   329             r'SELECT id FROM filepath WHERE path=?', (self._path,)))
       
   330 
       
   331         if not res:
       
   332             self._pathid = None
       
   333             return
       
   334 
       
   335         self._pathid = res[0][0]
       
   336 
       
   337         res = self._db.execute(
       
   338             r'SELECT id, revnum, node, p1rev, p2rev, linkrev, flags '
       
   339             r'FROM fileindex '
       
   340             r'WHERE pathid=? '
       
   341             r'ORDER BY revnum ASC',
       
   342             (self._pathid,))
       
   343 
       
   344         for i, row in enumerate(res):
       
   345             rid, rev, node, p1rev, p2rev, linkrev, flags = row
       
   346 
       
   347             if i != rev:
       
   348                 raise SQLiteStoreError(_('sqlite database has inconsistent '
       
   349                                          'revision numbers'))
       
   350 
       
   351             if p1rev == nullrev:
       
   352                 p1node = nullid
       
   353             else:
       
   354                 p1node = self._revtonode[p1rev]
       
   355 
       
   356             if p2rev == nullrev:
       
   357                 p2node = nullid
       
   358             else:
       
   359                 p2node = self._revtonode[p2rev]
       
   360 
       
   361             entry = revisionentry(
       
   362                 rid=rid,
       
   363                 rev=rev,
       
   364                 node=node,
       
   365                 p1rev=p1rev,
       
   366                 p2rev=p2rev,
       
   367                 p1node=p1node,
       
   368                 p2node=p2node,
       
   369                 linkrev=linkrev,
       
   370                 flags=flags)
       
   371 
       
   372             self._revtonode[rev] = node
       
   373             self._nodetorev[node] = rev
       
   374             self._revisions[node] = entry
       
   375 
       
   376     # Start of ifileindex interface.
       
   377 
       
   378     def __len__(self):
       
   379         return len(self._revisions)
       
   380 
       
   381     def __iter__(self):
       
   382         return iter(pycompat.xrange(len(self._revisions)))
       
   383 
       
   384     def revs(self, start=0, stop=None):
       
   385         return storageutil.iterrevs(len(self._revisions), start=start,
       
   386                                     stop=stop)
       
   387 
       
   388     def parents(self, node):
       
   389         if node == nullid:
       
   390             return nullid, nullid
       
   391 
       
   392         if node not in self._revisions:
       
   393             raise error.LookupError(node, self._path, _('no node'))
       
   394 
       
   395         entry = self._revisions[node]
       
   396         return entry.p1node, entry.p2node
       
   397 
       
   398     def parentrevs(self, rev):
       
   399         if rev == nullrev:
       
   400             return nullrev, nullrev
       
   401 
       
   402         if rev not in self._revtonode:
       
   403             raise IndexError(rev)
       
   404 
       
   405         entry = self._revisions[self._revtonode[rev]]
       
   406         return entry.p1rev, entry.p2rev
       
   407 
       
   408     def rev(self, node):
       
   409         if node == nullid:
       
   410             return nullrev
       
   411 
       
   412         if node not in self._nodetorev:
       
   413             raise error.LookupError(node, self._path, _('no node'))
       
   414 
       
   415         return self._nodetorev[node]
       
   416 
       
   417     def node(self, rev):
       
   418         if rev == nullrev:
       
   419             return nullid
       
   420 
       
   421         if rev not in self._revtonode:
       
   422             raise IndexError(rev)
       
   423 
       
   424         return self._revtonode[rev]
       
   425 
       
   426     def lookup(self, node):
       
   427         return storageutil.fileidlookup(self, node, self._path)
       
   428 
       
   429     def linkrev(self, rev):
       
   430         if rev == nullrev:
       
   431             return nullrev
       
   432 
       
   433         if rev not in self._revtonode:
       
   434             raise IndexError(rev)
       
   435 
       
   436         entry = self._revisions[self._revtonode[rev]]
       
   437         return entry.linkrev
       
   438 
       
   439     def iscensored(self, rev):
       
   440         if rev == nullrev:
       
   441             return False
       
   442 
       
   443         if rev not in self._revtonode:
       
   444             raise IndexError(rev)
       
   445 
       
   446         return self._revisions[self._revtonode[rev]].flags & FLAG_CENSORED
       
   447 
       
   448     def commonancestorsheads(self, node1, node2):
       
   449         rev1 = self.rev(node1)
       
   450         rev2 = self.rev(node2)
       
   451 
       
   452         ancestors = ancestor.commonancestorsheads(self.parentrevs, rev1, rev2)
       
   453         return pycompat.maplist(self.node, ancestors)
       
   454 
       
   455     def descendants(self, revs):
       
   456         # TODO we could implement this using a recursive SQL query, which
       
   457         # might be faster.
       
   458         return dagop.descendantrevs(revs, self.revs, self.parentrevs)
       
   459 
       
   460     def heads(self, start=None, stop=None):
       
   461         if start is None and stop is None:
       
   462             if not len(self):
       
   463                 return [nullid]
       
   464 
       
   465         startrev = self.rev(start) if start is not None else nullrev
       
   466         stoprevs = {self.rev(n) for n in stop or []}
       
   467 
       
   468         revs = dagop.headrevssubset(self.revs, self.parentrevs,
       
   469                                     startrev=startrev, stoprevs=stoprevs)
       
   470 
       
   471         return [self.node(rev) for rev in revs]
       
   472 
       
   473     def children(self, node):
       
   474         rev = self.rev(node)
       
   475 
       
   476         res = self._db.execute(
       
   477             r'SELECT'
       
   478             r'  node '
       
   479             r'  FROM filedata '
       
   480             r'  WHERE path=? AND (p1rev=? OR p2rev=?) '
       
   481             r'  ORDER BY revnum ASC',
       
   482             (self._path, rev, rev))
       
   483 
       
   484         return [row[0] for row in res]
       
   485 
       
   486     # End of ifileindex interface.
       
   487 
       
   488     # Start of ifiledata interface.
       
   489 
       
   490     def size(self, rev):
       
   491         if rev == nullrev:
       
   492             return 0
       
   493 
       
   494         if rev not in self._revtonode:
       
   495             raise IndexError(rev)
       
   496 
       
   497         node = self._revtonode[rev]
       
   498 
       
   499         if self.renamed(node):
       
   500             return len(self.read(node))
       
   501 
       
   502         return len(self.revision(node))
       
   503 
       
   504     def revision(self, node, raw=False, _verifyhash=True):
       
   505         if node in (nullid, nullrev):
       
   506             return b''
       
   507 
       
   508         if isinstance(node, int):
       
   509             node = self.node(node)
       
   510 
       
   511         if node not in self._nodetorev:
       
   512             raise error.LookupError(node, self._path, _('no node'))
       
   513 
       
   514         if node in self._revisioncache:
       
   515             return self._revisioncache[node]
       
   516 
       
   517         # Because we have a fulltext revision cache, we are able to
       
   518         # short-circuit delta chain traversal and decompression as soon as
       
   519         # we encounter a revision in the cache.
       
   520 
       
   521         stoprids = {self._revisions[n].rid: n
       
   522                     for n in self._revisioncache}
       
   523 
       
   524         if not stoprids:
       
   525             stoprids[-1] = None
       
   526 
       
   527         fulltext = resolvedeltachain(self._db, self._pathid, node,
       
   528                                      self._revisioncache, stoprids,
       
   529                                      zstddctx=self._dctx)
       
   530 
       
   531         if _verifyhash:
       
   532             self._checkhash(fulltext, node)
       
   533             self._revisioncache[node] = fulltext
       
   534 
       
   535         return fulltext
       
   536 
       
   537     def read(self, node):
       
   538         return storageutil.filtermetadata(self.revision(node))
       
   539 
       
   540     def renamed(self, node):
       
   541         return storageutil.filerevisioncopied(self, node)
       
   542 
       
   543     def cmp(self, node, fulltext):
       
   544         return not storageutil.filedataequivalent(self, node, fulltext)
       
   545 
       
   546     def emitrevisions(self, nodes, nodesorder=None, revisiondata=False,
       
   547                       assumehaveparentrevisions=False, deltaprevious=False):
       
   548         if nodesorder not in ('nodes', 'storage', None):
       
   549             raise error.ProgrammingError('unhandled value for nodesorder: %s' %
       
   550                                          nodesorder)
       
   551 
       
   552         nodes = [n for n in nodes if n != nullid]
       
   553 
       
   554         if not nodes:
       
   555             return
       
   556 
       
   557         # TODO perform in a single query.
       
   558         res = self._db.execute(
       
   559             r'SELECT revnum, deltaid FROM fileindex '
       
   560             r'WHERE pathid=? '
       
   561             r'    AND node in (%s)' % (r','.join([r'?'] * len(nodes))),
       
   562             tuple([self._pathid] + nodes))
       
   563 
       
   564         deltabases = {}
       
   565 
       
   566         for rev, deltaid in res:
       
   567             res = self._db.execute(
       
   568                 r'SELECT revnum from fileindex WHERE pathid=? AND deltaid=?',
       
   569                 (self._pathid, deltaid))
       
   570             deltabases[rev] = res.fetchone()[0]
       
   571 
       
   572         # TODO define revdifffn so we can use delta from storage.
       
   573         for delta in storageutil.emitrevisions(
       
   574             self, nodes, nodesorder, sqliterevisiondelta,
       
   575             deltaparentfn=deltabases.__getitem__,
       
   576             revisiondata=revisiondata,
       
   577             assumehaveparentrevisions=assumehaveparentrevisions,
       
   578             deltaprevious=deltaprevious):
       
   579 
       
   580             yield delta
       
   581 
       
   582     # End of ifiledata interface.
       
   583 
       
   584     # Start of ifilemutation interface.
       
   585 
       
   586     def add(self, filedata, meta, transaction, linkrev, p1, p2):
       
   587         if meta or filedata.startswith(b'\x01\n'):
       
   588             filedata = storageutil.packmeta(meta, filedata)
       
   589 
       
   590         return self.addrevision(filedata, transaction, linkrev, p1, p2)
       
   591 
       
   592     def addrevision(self, revisiondata, transaction, linkrev, p1, p2, node=None,
       
   593                     flags=0, cachedelta=None):
       
   594         if flags:
       
   595             raise SQLiteStoreError(_('flags not supported on revisions'))
       
   596 
       
   597         validatehash = node is not None
       
   598         node = node or storageutil.hashrevisionsha1(revisiondata, p1, p2)
       
   599 
       
   600         if validatehash:
       
   601             self._checkhash(revisiondata, node, p1, p2)
       
   602 
       
   603         if node in self._nodetorev:
       
   604             return node
       
   605 
       
   606         node = self._addrawrevision(node, revisiondata, transaction, linkrev,
       
   607                                     p1, p2)
       
   608 
       
   609         self._revisioncache[node] = revisiondata
       
   610         return node
       
   611 
       
   612     def addgroup(self, deltas, linkmapper, transaction, addrevisioncb=None):
       
   613         nodes = []
       
   614 
       
   615         for node, p1, p2, linknode, deltabase, delta, wireflags in deltas:
       
   616             storeflags = 0
       
   617 
       
   618             if wireflags & repository.REVISION_FLAG_CENSORED:
       
   619                 storeflags |= FLAG_CENSORED
       
   620 
       
   621             if wireflags & ~repository.REVISION_FLAG_CENSORED:
       
   622                 raise SQLiteStoreError('unhandled revision flag')
       
   623 
       
   624             baserev = self.rev(deltabase)
       
   625 
       
   626             # If base is censored, delta must be full replacement in a single
       
   627             # patch operation.
       
   628             if baserev != nullrev and self.iscensored(baserev):
       
   629                 hlen = struct.calcsize('>lll')
       
   630                 oldlen = len(self.revision(deltabase, raw=True,
       
   631                                            _verifyhash=False))
       
   632                 newlen = len(delta) - hlen
       
   633 
       
   634                 if delta[:hlen] != mdiff.replacediffheader(oldlen, newlen):
       
   635                     raise error.CensoredBaseError(self._path,
       
   636                                                   deltabase)
       
   637 
       
   638             if (not (storeflags & FLAG_CENSORED)
       
   639                 and storageutil.deltaiscensored(
       
   640                     delta, baserev, lambda x: len(self.revision(x, raw=True)))):
       
   641                 storeflags |= FLAG_CENSORED
       
   642 
       
   643             linkrev = linkmapper(linknode)
       
   644 
       
   645             nodes.append(node)
       
   646 
       
   647             if node in self._revisions:
       
   648                 continue
       
   649 
       
   650             if deltabase == nullid:
       
   651                 text = mdiff.patch(b'', delta)
       
   652                 storedelta = None
       
   653             else:
       
   654                 text = None
       
   655                 storedelta = (deltabase, delta)
       
   656 
       
   657             self._addrawrevision(node, text, transaction, linkrev, p1, p2,
       
   658                                  storedelta=storedelta, flags=storeflags)
       
   659 
       
   660             if addrevisioncb:
       
   661                 addrevisioncb(self, node)
       
   662 
       
   663         return nodes
       
   664 
       
   665     def censorrevision(self, tr, censornode, tombstone=b''):
       
   666         tombstone = storageutil.packmeta({b'censored': tombstone}, b'')
       
   667 
       
   668         # This restriction is cargo culted from revlogs and makes no sense for
       
   669         # SQLite, since columns can be resized at will.
       
   670         if len(tombstone) > len(self.revision(censornode, raw=True)):
       
   671             raise error.Abort(_('censor tombstone must be no longer than '
       
   672                                 'censored data'))
       
   673 
       
   674         # We need to replace the censored revision's data with the tombstone.
       
   675         # But replacing that data will have implications for delta chains that
       
   676         # reference it.
       
   677         #
       
   678         # While "better," more complex strategies are possible, we do something
       
   679         # simple: we find delta chain children of the censored revision and we
       
   680         # replace those incremental deltas with fulltexts of their corresponding
       
   681         # revision. Then we delete the now-unreferenced delta and original
       
   682         # revision and insert a replacement.
       
   683 
       
   684         # Find the delta to be censored.
       
   685         censoreddeltaid = self._db.execute(
       
   686             r'SELECT deltaid FROM fileindex WHERE id=?',
       
   687             (self._revisions[censornode].rid,)).fetchone()[0]
       
   688 
       
   689         # Find all its delta chain children.
       
   690         # TODO once we support storing deltas for !files, we'll need to look
       
   691         # for those delta chains too.
       
   692         rows = list(self._db.execute(
       
   693             r'SELECT id, pathid, node FROM fileindex '
       
   694             r'WHERE deltabaseid=? OR deltaid=?',
       
   695             (censoreddeltaid, censoreddeltaid)))
       
   696 
       
   697         for row in rows:
       
   698             rid, pathid, node = row
       
   699 
       
   700             fulltext = resolvedeltachain(self._db, pathid, node, {}, {-1: None},
       
   701                                          zstddctx=self._dctx)
       
   702 
       
   703             deltahash = hashlib.sha1(fulltext).digest()
       
   704 
       
   705             if self._compengine == 'zstd':
       
   706                 deltablob = self._cctx.compress(fulltext)
       
   707                 compression = COMPRESSION_ZSTD
       
   708             elif self._compengine == 'zlib':
       
   709                 deltablob = zlib.compress(fulltext)
       
   710                 compression = COMPRESSION_ZLIB
       
   711             elif self._compengine == 'none':
       
   712                 deltablob = fulltext
       
   713                 compression = COMPRESSION_NONE
       
   714             else:
       
   715                 raise error.ProgrammingError('unhandled compression engine: %s'
       
   716                                              % self._compengine)
       
   717 
       
   718             if len(deltablob) >= len(fulltext):
       
   719                 deltablob = fulltext
       
   720                 compression = COMPRESSION_NONE
       
   721 
       
   722             deltaid = insertdelta(self._db, compression, deltahash, deltablob)
       
   723 
       
   724             self._db.execute(
       
   725                 r'UPDATE fileindex SET deltaid=?, deltabaseid=NULL '
       
   726                 r'WHERE id=?', (deltaid, rid))
       
   727 
       
   728         # Now create the tombstone delta and replace the delta on the censored
       
   729         # node.
       
   730         deltahash = hashlib.sha1(tombstone).digest()
       
   731         tombstonedeltaid = insertdelta(self._db, COMPRESSION_NONE,
       
   732                                        deltahash, tombstone)
       
   733 
       
   734         flags = self._revisions[censornode].flags
       
   735         flags |= FLAG_CENSORED
       
   736 
       
   737         self._db.execute(
       
   738             r'UPDATE fileindex SET flags=?, deltaid=?, deltabaseid=NULL '
       
   739             r'WHERE pathid=? AND node=?',
       
   740             (flags, tombstonedeltaid, self._pathid, censornode))
       
   741 
       
   742         self._db.execute(
       
   743             r'DELETE FROM delta WHERE id=?', (censoreddeltaid,))
       
   744 
       
   745         self._refreshindex()
       
   746         self._revisioncache.clear()
       
   747 
       
   748     def getstrippoint(self, minlink):
       
   749         return storageutil.resolvestripinfo(minlink, len(self) - 1,
       
   750                                             [self.rev(n) for n in self.heads()],
       
   751                                             self.linkrev,
       
   752                                             self.parentrevs)
       
   753 
       
   754     def strip(self, minlink, transaction):
       
   755         if not len(self):
       
   756             return
       
   757 
       
   758         rev, _ignored = self.getstrippoint(minlink)
       
   759 
       
   760         if rev == len(self):
       
   761             return
       
   762 
       
   763         for rev in self.revs(rev):
       
   764             self._db.execute(
       
   765                 r'DELETE FROM fileindex WHERE pathid=? AND node=?',
       
   766                 (self._pathid, self.node(rev)))
       
   767 
       
   768         # TODO how should we garbage collect data in delta table?
       
   769 
       
   770         self._refreshindex()
       
   771 
       
   772     # End of ifilemutation interface.
       
   773 
       
   774     # Start of ifilestorage interface.
       
   775 
       
   776     def files(self):
       
   777         return []
       
   778 
       
   779     def storageinfo(self, exclusivefiles=False, sharedfiles=False,
       
   780                     revisionscount=False, trackedsize=False,
       
   781                     storedsize=False):
       
   782         d = {}
       
   783 
       
   784         if exclusivefiles:
       
   785             d['exclusivefiles'] = []
       
   786 
       
   787         if sharedfiles:
       
   788             # TODO list sqlite file(s) here.
       
   789             d['sharedfiles'] = []
       
   790 
       
   791         if revisionscount:
       
   792             d['revisionscount'] = len(self)
       
   793 
       
   794         if trackedsize:
       
   795             d['trackedsize'] = sum(len(self.revision(node))
       
   796                                        for node in self._nodetorev)
       
   797 
       
   798         if storedsize:
       
   799             # TODO implement this?
       
   800             d['storedsize'] = None
       
   801 
       
   802         return d
       
   803 
       
   804     def verifyintegrity(self, state):
       
   805         state['skipread'] = set()
       
   806 
       
   807         for rev in self:
       
   808             node = self.node(rev)
       
   809 
       
   810             try:
       
   811                 self.revision(node)
       
   812             except Exception as e:
       
   813                 yield sqliteproblem(
       
   814                     error=_('unpacking %s: %s') % (short(node), e),
       
   815                     node=node)
       
   816 
       
   817                 state['skipread'].add(node)
       
   818 
       
   819     # End of ifilestorage interface.
       
   820 
       
   821     def _checkhash(self, fulltext, node, p1=None, p2=None):
       
   822         if p1 is None and p2 is None:
       
   823             p1, p2 = self.parents(node)
       
   824 
       
   825         if node == storageutil.hashrevisionsha1(fulltext, p1, p2):
       
   826             return
       
   827 
       
   828         try:
       
   829             del self._revisioncache[node]
       
   830         except KeyError:
       
   831             pass
       
   832 
       
   833         if storageutil.iscensoredtext(fulltext):
       
   834             raise error.CensoredNodeError(self._path, node, fulltext)
       
   835 
       
   836         raise SQLiteStoreError(_('integrity check failed on %s') %
       
   837                                self._path)
       
   838 
       
   839     def _addrawrevision(self, node, revisiondata, transaction, linkrev,
       
   840                         p1, p2, storedelta=None, flags=0):
       
   841         if self._pathid is None:
       
   842             res = self._db.execute(
       
   843                 r'INSERT INTO filepath (path) VALUES (?)', (self._path,))
       
   844             self._pathid = res.lastrowid
       
   845 
       
   846         # For simplicity, always store a delta against p1.
       
   847         # TODO we need a lot more logic here to make behavior reasonable.
       
   848 
       
   849         if storedelta:
       
   850             deltabase, delta = storedelta
       
   851 
       
   852             if isinstance(deltabase, int):
       
   853                 deltabase = self.node(deltabase)
       
   854 
       
   855         else:
       
   856             assert revisiondata is not None
       
   857             deltabase = p1
       
   858 
       
   859             if deltabase == nullid:
       
   860                 delta = revisiondata
       
   861             else:
       
   862                 delta = mdiff.textdiff(self.revision(self.rev(deltabase)),
       
   863                                        revisiondata)
       
   864 
       
   865         # File index stores a pointer to its delta and the parent delta.
       
   866         # The parent delta is stored via a pointer to the fileindex PK.
       
   867         if deltabase == nullid:
       
   868             baseid = None
       
   869         else:
       
   870             baseid = self._revisions[deltabase].rid
       
   871 
       
   872         # Deltas are stored with a hash of their content. This allows
       
   873         # us to de-duplicate. The table is configured to ignore conflicts
       
   874         # and it is faster to just insert and silently noop than to look
       
   875         # first.
       
   876         deltahash = hashlib.sha1(delta).digest()
       
   877 
       
   878         if self._compengine == 'zstd':
       
   879             deltablob = self._cctx.compress(delta)
       
   880             compression = COMPRESSION_ZSTD
       
   881         elif self._compengine == 'zlib':
       
   882             deltablob = zlib.compress(delta)
       
   883             compression = COMPRESSION_ZLIB
       
   884         elif self._compengine == 'none':
       
   885             deltablob = delta
       
   886             compression = COMPRESSION_NONE
       
   887         else:
       
   888             raise error.ProgrammingError('unhandled compression engine: %s' %
       
   889                                          self._compengine)
       
   890 
       
   891         # Don't store compressed data if it isn't practical.
       
   892         if len(deltablob) >= len(delta):
       
   893             deltablob = delta
       
   894             compression = COMPRESSION_NONE
       
   895 
       
   896         deltaid = insertdelta(self._db, compression, deltahash, deltablob)
       
   897 
       
   898         rev = len(self)
       
   899 
       
   900         if p1 == nullid:
       
   901             p1rev = nullrev
       
   902         else:
       
   903             p1rev = self._nodetorev[p1]
       
   904 
       
   905         if p2 == nullid:
       
   906             p2rev = nullrev
       
   907         else:
       
   908             p2rev = self._nodetorev[p2]
       
   909 
       
   910         rid = self._db.execute(
       
   911             r'INSERT INTO fileindex ('
       
   912             r'    pathid, revnum, node, p1rev, p2rev, linkrev, flags, '
       
   913             r'    deltaid, deltabaseid) '
       
   914             r'    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
       
   915             (self._pathid, rev, node, p1rev, p2rev, linkrev, flags,
       
   916              deltaid, baseid)
       
   917         ).lastrowid
       
   918 
       
   919         entry = revisionentry(
       
   920             rid=rid,
       
   921             rev=rev,
       
   922             node=node,
       
   923             p1rev=p1rev,
       
   924             p2rev=p2rev,
       
   925             p1node=p1,
       
   926             p2node=p2,
       
   927             linkrev=linkrev,
       
   928             flags=flags)
       
   929 
       
   930         self._nodetorev[node] = rev
       
   931         self._revtonode[rev] = node
       
   932         self._revisions[node] = entry
       
   933 
       
   934         return node
       
   935 
       
   936 class sqliterepository(localrepo.localrepository):
       
   937     def cancopy(self):
       
   938         return False
       
   939 
       
   940     def transaction(self, *args, **kwargs):
       
   941         current = self.currenttransaction()
       
   942 
       
   943         tr = super(sqliterepository, self).transaction(*args, **kwargs)
       
   944 
       
   945         if current:
       
   946             return tr
       
   947 
       
   948         self._dbconn.execute(r'BEGIN TRANSACTION')
       
   949 
       
   950         def committransaction(_):
       
   951             self._dbconn.commit()
       
   952 
       
   953         tr.addfinalize('sqlitestore', committransaction)
       
   954 
       
   955         return tr
       
   956 
       
   957     @property
       
   958     def _dbconn(self):
       
   959         # SQLite connections can only be used on the thread that created
       
   960         # them. In most cases, this "just works." However, hgweb uses
       
   961         # multiple threads.
       
   962         tid = threading.current_thread().ident
       
   963 
       
   964         if self._db:
       
   965             if self._db[0] == tid:
       
   966                 return self._db[1]
       
   967 
       
   968         db = makedb(self.svfs.join('db.sqlite'))
       
   969         self._db = (tid, db)
       
   970 
       
   971         return db
       
   972 
       
   973 def makedb(path):
       
   974     """Construct a database handle for a database at path."""
       
   975 
       
   976     db = sqlite3.connect(path)
       
   977     db.text_factory = bytes
       
   978 
       
   979     res = db.execute(r'PRAGMA user_version').fetchone()[0]
       
   980 
       
   981     # New database.
       
   982     if res == 0:
       
   983         for statement in CREATE_SCHEMA:
       
   984             db.execute(statement)
       
   985 
       
   986         db.commit()
       
   987 
       
   988     elif res == CURRENT_SCHEMA_VERSION:
       
   989         pass
       
   990 
       
   991     else:
       
   992         raise error.Abort(_('sqlite database has unrecognized version'))
       
   993 
       
   994     db.execute(r'PRAGMA journal_mode=WAL')
       
   995 
       
   996     return db
       
   997 
       
   998 def featuresetup(ui, supported):
       
   999     supported.add(REQUIREMENT)
       
  1000 
       
  1001     if zstd:
       
  1002         supported.add(REQUIREMENT_ZSTD)
       
  1003 
       
  1004     supported.add(REQUIREMENT_ZLIB)
       
  1005     supported.add(REQUIREMENT_NONE)
       
  1006 
       
  1007 def newreporequirements(orig, ui, createopts):
       
  1008     if createopts['backend'] != 'sqlite':
       
  1009         return orig(ui, createopts)
       
  1010 
       
  1011     # This restriction can be lifted once we have more confidence.
       
  1012     if 'sharedrepo' in createopts:
       
  1013         raise error.Abort(_('shared repositories not supported with SQLite '
       
  1014                             'store'))
       
  1015 
       
  1016     # This filtering is out of an abundance of caution: we want to ensure
       
  1017     # we honor creation options and we do that by annotating exactly the
       
  1018     # creation options we recognize.
       
  1019     known = {
       
  1020         'narrowfiles',
       
  1021         'backend',
       
  1022     }
       
  1023 
       
  1024     unsupported = set(createopts) - known
       
  1025     if unsupported:
       
  1026         raise error.Abort(_('SQLite store does not support repo creation '
       
  1027                             'option: %s') % ', '.join(sorted(unsupported)))
       
  1028 
       
  1029     # Since we're a hybrid store that still relies on revlogs, we fall back
       
  1030     # to using the revlogv1 backend's storage requirements then adding our
       
  1031     # own requirement.
       
  1032     createopts['backend'] = 'revlogv1'
       
  1033     requirements = orig(ui, createopts)
       
  1034     requirements.add(REQUIREMENT)
       
  1035 
       
  1036     compression = ui.config('storage', 'sqlite.compression')
       
  1037 
       
  1038     if compression == 'zstd' and not zstd:
       
  1039         raise error.Abort(_('storage.sqlite.compression set to "zstd" but '
       
  1040                             'zstandard compression not available to this '
       
  1041                             'Mercurial install'))
       
  1042 
       
  1043     if compression == 'zstd':
       
  1044         requirements.add(REQUIREMENT_ZSTD)
       
  1045     elif compression == 'zlib':
       
  1046         requirements.add(REQUIREMENT_ZLIB)
       
  1047     elif compression == 'none':
       
  1048         requirements.add(REQUIREMENT_NONE)
       
  1049     else:
       
  1050         raise error.Abort(_('unknown compression engine defined in '
       
  1051                             'storage.sqlite.compression: %s') % compression)
       
  1052 
       
  1053     return requirements
       
  1054 
       
  1055 @interfaceutil.implementer(repository.ilocalrepositoryfilestorage)
       
  1056 class sqlitefilestorage(object):
       
  1057     """Repository file storage backed by SQLite."""
       
  1058     def file(self, path):
       
  1059         if path[0] == b'/':
       
  1060             path = path[1:]
       
  1061 
       
  1062         if REQUIREMENT_ZSTD in self.requirements:
       
  1063             compression = 'zstd'
       
  1064         elif REQUIREMENT_ZLIB in self.requirements:
       
  1065             compression = 'zlib'
       
  1066         elif REQUIREMENT_NONE in self.requirements:
       
  1067             compression = 'none'
       
  1068         else:
       
  1069             raise error.Abort(_('unable to determine what compression engine '
       
  1070                                 'to use for SQLite storage'))
       
  1071 
       
  1072         return sqlitefilestore(self._dbconn, path, compression)
       
  1073 
       
  1074 def makefilestorage(orig, requirements, **kwargs):
       
  1075     """Produce a type conforming to ``ilocalrepositoryfilestorage``."""
       
  1076     if REQUIREMENT in requirements:
       
  1077         return sqlitefilestorage
       
  1078     else:
       
  1079         return orig(requirements=requirements, **kwargs)
       
  1080 
       
  1081 def makemain(orig, ui, requirements, **kwargs):
       
  1082     if REQUIREMENT in requirements:
       
  1083         if REQUIREMENT_ZSTD in requirements and not zstd:
       
  1084             raise error.Abort(_('repository uses zstandard compression, which '
       
  1085                                 'is not available to this Mercurial install'))
       
  1086 
       
  1087         return sqliterepository
       
  1088 
       
  1089     return orig(requirements=requirements, **kwargs)
       
  1090 
       
  1091 def verifierinit(orig, self, *args, **kwargs):
       
  1092     orig(self, *args, **kwargs)
       
  1093 
       
  1094     # We don't care that files in the store don't align with what is
       
  1095     # advertised. So suppress these warnings.
       
  1096     self.warnorphanstorefiles = False
       
  1097 
       
  1098 def extsetup(ui):
       
  1099     localrepo.featuresetupfuncs.add(featuresetup)
       
  1100     extensions.wrapfunction(localrepo, 'newreporequirements',
       
  1101                             newreporequirements)
       
  1102     extensions.wrapfunction(localrepo, 'makefilestorage',
       
  1103                             makefilestorage)
       
  1104     extensions.wrapfunction(localrepo, 'makemain',
       
  1105                             makemain)
       
  1106     extensions.wrapfunction(verify.verifier, '__init__',
       
  1107                             verifierinit)
       
  1108 
       
  1109 def reposetup(ui, repo):
       
  1110     if isinstance(repo, sqliterepository):
       
  1111         repo._db = None
       
  1112 
       
  1113     # TODO check for bundlerepository?