comparison hgext/sqlitestore.py @ 40326:fed697fa1734

sqlitestore: file storage backend using SQLite This commit provides an extension which uses SQLite to store file data (as opposed to revlogs). As the inline documentation describes, there are still several aspects to the extension that are incomplete. But it's a start. The extension does support basic clone, checkout, and commit workflows, which makes it suitable for simple use cases. One notable missing feature is support for "bundlerepos." This is probably responsible for the most test failures when the extension is activated as part of the test suite. All revision data is stored in SQLite. Data is stored as zstd compressed chunks (default if zstd is available), zlib compressed chunks (default if zstd is not available), or raw chunks (if configured or if a compressed delta is not smaller than the raw delta). This makes things very similar to revlogs. Unlike revlogs, the extension doesn't yet enforce a limit on delta chain length. This is an obvious limitation and should be addressed. This is somewhat mitigated by the use of zstd, which is much faster than zlib to decompress. There is a dedicated table for storing deltas. Deltas are stored by the SHA-1 hash of their uncompressed content. The "fileindex" table has columns that reference the delta for each revision and the base delta that delta should be applied against. A recursive SQL query is used to resolve the delta chain along with the delta data. By storing deltas by hash, we are able to de-duplicate delta storage! With revlogs, the same deltas in different revlogs would result in duplicate storage of that delta. In this scheme, inserting the duplicate delta is a no-op and delta chains simply reference the existing delta. When initially implementing this extension, I did not have content-indexed deltas and deltas could be duplicated across files (just like revlogs). When I implemented content-indexed deltas, the size of the SQLite database for a full clone of mozilla-unified dropped: before: 2,554,261,504 bytes after: 2,488,754,176 bytes Surprisingly, this is still larger than the bytes size of revlog files: revlog files: 2,104,861,230 bytes du -b: 2,254,381,614 I would have expected storage to be smaller since we're not limiting delta chain length and since we're using zstd instead of zlib. I suspect the SQLite indexes and per-column overhead account for the bulk of the differences. (Keep in mind that revlog uses a 64-byte packed struct for revision index data and deltas are stored without padding. Aside from the 12 unused bytes in the 32 byte node field, revlogs are pretty efficient.) Another source of overhead is file name storage. With revlogs, file names are stored in the filesystem. But with SQLite, we need to store file names in the database. This is roughly equivalent to the size of the fncache file, which for the mozilla-unified repository is ~34MB. Since the SQLite database isn't append-only and since delta chains can reference any delta, this opens some interesting possibilities. For example, we could store deltas in reverse, such that fulltexts are stored for newer revisions and deltas are applied to reconstruct older revisions. This is likely a more optimal storage strategy for version control, as new data tends to be more frequently accessed than old data. We would obviously need wire protocol support for transferring revision data from newest to oldest. And we would probably need some kind of mechanism for "re-encoding" stores. But it should be doable. This extension is very much experimental quality. There are a handful of features that don't work. It probably isn't suitable for day-to-day use. But it could be used in limited cases (e.g. read-only checkouts like in CI). And it is also a good proving ground for alternate storage backends. As we continue to define interfaces for all things storage, it will be useful to have a viable alternate storage backend to see how things shake out in practice. test-storage.py passes on Python 2 and introduces no new test failures on Python 3. Having the storage-level unit tests has proved to be insanely useful when developing this extension. Those tests caught numerous bugs during development and I'm convinced this style of testing is the way forward for ensuring alternate storage backends work as intended. Of course, test coverage isn't close to what it needs to be. But it is a start. And what coverage we have gives me confidence that basic store functionality is implemented properly. Differential Revision: https://phab.mercurial-scm.org/D4928
author Gregory Szorc <gregory.szorc@gmail.com>
date Tue, 09 Oct 2018 08:50:13 -0700
parents
children f1a39128da95
comparison
equal deleted inserted replaced
40325:b0fbd1792e2d 40326:fed697fa1734
1 # sqlitestore.py - Storage backend that uses SQLite
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
7
8 """store repository data in SQLite (EXPERIMENTAL)
9
10 The sqlitestore extension enables the storage of repository data in SQLite.
11
12 This extension is HIGHLY EXPERIMENTAL. There are NO BACKWARDS COMPATIBILITY
13 GUARANTEES. This means that repositories created with this extension may
14 only be usable with the exact version of this extension/Mercurial that was
15 used. The extension attempts to enforce this in order to prevent repository
16 corruption.
17
18 In addition, several features are not yet supported or have known bugs:
19
20 * Only some data is stored in SQLite. Changeset, manifest, and other repository
21 data is not yet stored in SQLite.
22 * Transactions are not robust. If the process is aborted at the right time
23 during transaction close/rollback, the repository could be in an inconsistent
24 state. This problem will diminish once all repository data is tracked by
25 SQLite.
26 * Bundle repositories do not work (the ability to use e.g.
27 `hg -R <bundle-file> log` to automatically overlay a bundle on top of the
28 existing repository).
29 * Various other features don't work.
30
31 This extension should work for basic clone/pull, update, and commit workflows.
32 Some history rewriting operations may fail due to lack of support for bundle
33 repositories.
34
35 To use, activate the extension and set the ``storage.new-repo-backend`` config
36 option to ``sqlite`` to enable new repositories to use SQLite for storage.
37 """
38
39 # To run the test suite with repos using SQLite by default, execute the
40 # following:
41 #
42 # HGREPOFEATURES="sqlitestore" run-tests.py \
43 # --extra-config-opt extensions.sqlitestore= \
44 # --extra-config-opt storage.new-repo-backend=sqlite
45
46 from __future__ import absolute_import
47
48 import hashlib
49 import sqlite3
50 import struct
51 import threading
52 import zlib
53
54 from mercurial.i18n import _
55 from mercurial.node import (
56 nullid,
57 nullrev,
58 short,
59 )
60 from mercurial.thirdparty import (
61 attr,
62 )
63 from mercurial import (
64 ancestor,
65 dagop,
66 error,
67 extensions,
68 localrepo,
69 mdiff,
70 pycompat,
71 registrar,
72 repository,
73 util,
74 verify,
75 )
76 from mercurial.utils import (
77 interfaceutil,
78 storageutil,
79 )
80
81 try:
82 from mercurial import zstd
83 zstd.__version__
84 except ImportError:
85 zstd = None
86
87 configtable = {}
88 configitem = registrar.configitem(configtable)
89
90 # experimental config: storage.sqlite.compression
91 configitem('storage', 'sqlite.compression',
92 default='zstd' if zstd else 'zlib')
93
94 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
95 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
96 # be specifying the version(s) of Mercurial they are tested with, or
97 # leave the attribute unspecified.
98 testedwith = 'ships-with-hg-core'
99
100 REQUIREMENT = b'exp-sqlite-001'
101 REQUIREMENT_ZSTD = b'exp-sqlite-comp-001=zstd'
102 REQUIREMENT_ZLIB = b'exp-sqlite-comp-001=zlib'
103 REQUIREMENT_NONE = b'exp-sqlite-comp-001=none'
104
105 CURRENT_SCHEMA_VERSION = 1
106
107 COMPRESSION_NONE = 1
108 COMPRESSION_ZSTD = 2
109 COMPRESSION_ZLIB = 3
110
111 FLAG_CENSORED = 1
112
113 CREATE_SCHEMA = [
114 # Deltas are stored as content-indexed blobs.
115 # compression column holds COMPRESSION_* constant for how the
116 # delta is encoded.
117
118 r'CREATE TABLE delta ('
119 r' id INTEGER PRIMARY KEY, '
120 r' compression INTEGER NOT NULL, '
121 r' hash BLOB UNIQUE ON CONFLICT ABORT, '
122 r' delta BLOB NOT NULL '
123 r')',
124
125 # Tracked paths are denormalized to integers to avoid redundant
126 # storage of the path name.
127 r'CREATE TABLE filepath ('
128 r' id INTEGER PRIMARY KEY, '
129 r' path BLOB NOT NULL '
130 r')',
131
132 r'CREATE UNIQUE INDEX filepath_path '
133 r' ON filepath (path)',
134
135 # We have a single table for all file revision data.
136 # Each file revision is uniquely described by a (path, rev) and
137 # (path, node).
138 #
139 # Revision data is stored as a pointer to the delta producing this
140 # revision and the file revision whose delta should be applied before
141 # that one. One can reconstruct the delta chain by recursively following
142 # the delta base revision pointers until one encounters NULL.
143 #
144 # flags column holds bitwise integer flags controlling storage options.
145 # These flags are defined by the FLAG_* constants.
146 r'CREATE TABLE fileindex ('
147 r' id INTEGER PRIMARY KEY, '
148 r' pathid INTEGER REFERENCES filepath(id), '
149 r' revnum INTEGER NOT NULL, '
150 r' p1rev INTEGER NOT NULL, '
151 r' p2rev INTEGER NOT NULL, '
152 r' linkrev INTEGER NOT NULL, '
153 r' flags INTEGER NOT NULL, '
154 r' deltaid INTEGER REFERENCES delta(id), '
155 r' deltabaseid INTEGER REFERENCES fileindex(id), '
156 r' node BLOB NOT NULL '
157 r')',
158
159 r'CREATE UNIQUE INDEX fileindex_pathrevnum '
160 r' ON fileindex (pathid, revnum)',
161
162 r'CREATE UNIQUE INDEX fileindex_pathnode '
163 r' ON fileindex (pathid, node)',
164
165 # Provide a view over all file data for convenience.
166 r'CREATE VIEW filedata AS '
167 r'SELECT '
168 r' fileindex.id AS id, '
169 r' filepath.id AS pathid, '
170 r' filepath.path AS path, '
171 r' fileindex.revnum AS revnum, '
172 r' fileindex.node AS node, '
173 r' fileindex.p1rev AS p1rev, '
174 r' fileindex.p2rev AS p2rev, '
175 r' fileindex.linkrev AS linkrev, '
176 r' fileindex.flags AS flags, '
177 r' fileindex.deltaid AS deltaid, '
178 r' fileindex.deltabaseid AS deltabaseid '
179 r'FROM filepath, fileindex '
180 r'WHERE fileindex.pathid=filepath.id',
181
182 r'PRAGMA user_version=%d' % CURRENT_SCHEMA_VERSION,
183 ]
184
185 def resolvedeltachain(db, pathid, node, revisioncache,
186 stoprids, zstddctx=None):
187 """Resolve a delta chain for a file node."""
188
189 # TODO the "not in ({stops})" here is possibly slowing down the query
190 # because it needs to perform the lookup on every recursive invocation.
191 # This could possibly be faster if we created a temporary query with
192 # baseid "poisoned" to null and limited the recursive filter to
193 # "is not null".
194 res = db.execute(
195 r'WITH RECURSIVE '
196 r' deltachain(deltaid, baseid) AS ('
197 r' SELECT deltaid, deltabaseid FROM fileindex '
198 r' WHERE pathid=? AND node=? '
199 r' UNION ALL '
200 r' SELECT fileindex.deltaid, deltabaseid '
201 r' FROM fileindex, deltachain '
202 r' WHERE '
203 r' fileindex.id=deltachain.baseid '
204 r' AND deltachain.baseid IS NOT NULL '
205 r' AND fileindex.id NOT IN ({stops}) '
206 r' ) '
207 r'SELECT deltachain.baseid, compression, delta '
208 r'FROM deltachain, delta '
209 r'WHERE delta.id=deltachain.deltaid'.format(
210 stops=r','.join([r'?'] * len(stoprids))),
211 tuple([pathid, node] + list(stoprids.keys())))
212
213 deltas = []
214 lastdeltabaseid = None
215
216 for deltabaseid, compression, delta in res:
217 lastdeltabaseid = deltabaseid
218
219 if compression == COMPRESSION_ZSTD:
220 delta = zstddctx.decompress(delta)
221 elif compression == COMPRESSION_NONE:
222 delta = delta
223 elif compression == COMPRESSION_ZLIB:
224 delta = zlib.decompress(delta)
225 else:
226 raise SQLiteStoreError('unhandled compression type: %d' %
227 compression)
228
229 deltas.append(delta)
230
231 if lastdeltabaseid in stoprids:
232 basetext = revisioncache[stoprids[lastdeltabaseid]]
233 else:
234 basetext = deltas.pop()
235
236 deltas.reverse()
237 fulltext = mdiff.patches(basetext, deltas)
238
239 # SQLite returns buffer instances for blob columns on Python 2. This
240 # type can propagate through the delta application layer. Because
241 # downstream callers assume revisions are bytes, cast as needed.
242 if not isinstance(fulltext, bytes):
243 fulltext = bytes(delta)
244
245 return fulltext
246
247 def insertdelta(db, compression, hash, delta):
248 try:
249 return db.execute(
250 r'INSERT INTO delta (compression, hash, delta) '
251 r'VALUES (?, ?, ?)',
252 (compression, hash, delta)).lastrowid
253 except sqlite3.IntegrityError:
254 return db.execute(
255 r'SELECT id FROM delta WHERE hash=?',
256 (hash,)).fetchone()[0]
257
258 class SQLiteStoreError(error.StorageError):
259 pass
260
261 @attr.s
262 class revisionentry(object):
263 rid = attr.ib()
264 rev = attr.ib()
265 node = attr.ib()
266 p1rev = attr.ib()
267 p2rev = attr.ib()
268 p1node = attr.ib()
269 p2node = attr.ib()
270 linkrev = attr.ib()
271 flags = attr.ib()
272
273 @interfaceutil.implementer(repository.irevisiondelta)
274 @attr.s(slots=True)
275 class sqliterevisiondelta(object):
276 node = attr.ib()
277 p1node = attr.ib()
278 p2node = attr.ib()
279 basenode = attr.ib()
280 flags = attr.ib()
281 baserevisionsize = attr.ib()
282 revision = attr.ib()
283 delta = attr.ib()
284 linknode = attr.ib(default=None)
285
286 @interfaceutil.implementer(repository.iverifyproblem)
287 @attr.s(frozen=True)
288 class sqliteproblem(object):
289 warning = attr.ib(default=None)
290 error = attr.ib(default=None)
291 node = attr.ib(default=None)
292
293 @interfaceutil.implementer(repository.ifilestorage)
294 class sqlitefilestore(object):
295 """Implements storage for an individual tracked path."""
296
297 def __init__(self, db, path, compression):
298 self._db = db
299 self._path = path
300
301 self._pathid = None
302
303 # revnum -> node
304 self._revtonode = {}
305 # node -> revnum
306 self._nodetorev = {}
307 # node -> data structure
308 self._revisions = {}
309
310 self._revisioncache = util.lrucachedict(10)
311
312 self._compengine = compression
313
314 if compression == 'zstd':
315 self._cctx = zstd.ZstdCompressor(level=3)
316 self._dctx = zstd.ZstdDecompressor()
317 else:
318 self._cctx = None
319 self._dctx = None
320
321 self._refreshindex()
322
323 def _refreshindex(self):
324 self._revtonode = {}
325 self._nodetorev = {}
326 self._revisions = {}
327
328 res = list(self._db.execute(
329 r'SELECT id FROM filepath WHERE path=?', (self._path,)))
330
331 if not res:
332 self._pathid = None
333 return
334
335 self._pathid = res[0][0]
336
337 res = self._db.execute(
338 r'SELECT id, revnum, node, p1rev, p2rev, linkrev, flags '
339 r'FROM fileindex '
340 r'WHERE pathid=? '
341 r'ORDER BY revnum ASC',
342 (self._pathid,))
343
344 for i, row in enumerate(res):
345 rid, rev, node, p1rev, p2rev, linkrev, flags = row
346
347 if i != rev:
348 raise SQLiteStoreError(_('sqlite database has inconsistent '
349 'revision numbers'))
350
351 if p1rev == nullrev:
352 p1node = nullid
353 else:
354 p1node = self._revtonode[p1rev]
355
356 if p2rev == nullrev:
357 p2node = nullid
358 else:
359 p2node = self._revtonode[p2rev]
360
361 entry = revisionentry(
362 rid=rid,
363 rev=rev,
364 node=node,
365 p1rev=p1rev,
366 p2rev=p2rev,
367 p1node=p1node,
368 p2node=p2node,
369 linkrev=linkrev,
370 flags=flags)
371
372 self._revtonode[rev] = node
373 self._nodetorev[node] = rev
374 self._revisions[node] = entry
375
376 # Start of ifileindex interface.
377
378 def __len__(self):
379 return len(self._revisions)
380
381 def __iter__(self):
382 return iter(pycompat.xrange(len(self._revisions)))
383
384 def revs(self, start=0, stop=None):
385 return storageutil.iterrevs(len(self._revisions), start=start,
386 stop=stop)
387
388 def parents(self, node):
389 if node == nullid:
390 return nullid, nullid
391
392 if node not in self._revisions:
393 raise error.LookupError(node, self._path, _('no node'))
394
395 entry = self._revisions[node]
396 return entry.p1node, entry.p2node
397
398 def parentrevs(self, rev):
399 if rev == nullrev:
400 return nullrev, nullrev
401
402 if rev not in self._revtonode:
403 raise IndexError(rev)
404
405 entry = self._revisions[self._revtonode[rev]]
406 return entry.p1rev, entry.p2rev
407
408 def rev(self, node):
409 if node == nullid:
410 return nullrev
411
412 if node not in self._nodetorev:
413 raise error.LookupError(node, self._path, _('no node'))
414
415 return self._nodetorev[node]
416
417 def node(self, rev):
418 if rev == nullrev:
419 return nullid
420
421 if rev not in self._revtonode:
422 raise IndexError(rev)
423
424 return self._revtonode[rev]
425
426 def lookup(self, node):
427 return storageutil.fileidlookup(self, node, self._path)
428
429 def linkrev(self, rev):
430 if rev == nullrev:
431 return nullrev
432
433 if rev not in self._revtonode:
434 raise IndexError(rev)
435
436 entry = self._revisions[self._revtonode[rev]]
437 return entry.linkrev
438
439 def iscensored(self, rev):
440 if rev == nullrev:
441 return False
442
443 if rev not in self._revtonode:
444 raise IndexError(rev)
445
446 return self._revisions[self._revtonode[rev]].flags & FLAG_CENSORED
447
448 def commonancestorsheads(self, node1, node2):
449 rev1 = self.rev(node1)
450 rev2 = self.rev(node2)
451
452 ancestors = ancestor.commonancestorsheads(self.parentrevs, rev1, rev2)
453 return pycompat.maplist(self.node, ancestors)
454
455 def descendants(self, revs):
456 # TODO we could implement this using a recursive SQL query, which
457 # might be faster.
458 return dagop.descendantrevs(revs, self.revs, self.parentrevs)
459
460 def heads(self, start=None, stop=None):
461 if start is None and stop is None:
462 if not len(self):
463 return [nullid]
464
465 startrev = self.rev(start) if start is not None else nullrev
466 stoprevs = {self.rev(n) for n in stop or []}
467
468 revs = dagop.headrevssubset(self.revs, self.parentrevs,
469 startrev=startrev, stoprevs=stoprevs)
470
471 return [self.node(rev) for rev in revs]
472
473 def children(self, node):
474 rev = self.rev(node)
475
476 res = self._db.execute(
477 r'SELECT'
478 r' node '
479 r' FROM filedata '
480 r' WHERE path=? AND (p1rev=? OR p2rev=?) '
481 r' ORDER BY revnum ASC',
482 (self._path, rev, rev))
483
484 return [row[0] for row in res]
485
486 # End of ifileindex interface.
487
488 # Start of ifiledata interface.
489
490 def size(self, rev):
491 if rev == nullrev:
492 return 0
493
494 if rev not in self._revtonode:
495 raise IndexError(rev)
496
497 node = self._revtonode[rev]
498
499 if self.renamed(node):
500 return len(self.read(node))
501
502 return len(self.revision(node))
503
504 def revision(self, node, raw=False, _verifyhash=True):
505 if node in (nullid, nullrev):
506 return b''
507
508 if isinstance(node, int):
509 node = self.node(node)
510
511 if node not in self._nodetorev:
512 raise error.LookupError(node, self._path, _('no node'))
513
514 if node in self._revisioncache:
515 return self._revisioncache[node]
516
517 # Because we have a fulltext revision cache, we are able to
518 # short-circuit delta chain traversal and decompression as soon as
519 # we encounter a revision in the cache.
520
521 stoprids = {self._revisions[n].rid: n
522 for n in self._revisioncache}
523
524 if not stoprids:
525 stoprids[-1] = None
526
527 fulltext = resolvedeltachain(self._db, self._pathid, node,
528 self._revisioncache, stoprids,
529 zstddctx=self._dctx)
530
531 if _verifyhash:
532 self._checkhash(fulltext, node)
533 self._revisioncache[node] = fulltext
534
535 return fulltext
536
537 def read(self, node):
538 return storageutil.filtermetadata(self.revision(node))
539
540 def renamed(self, node):
541 return storageutil.filerevisioncopied(self, node)
542
543 def cmp(self, node, fulltext):
544 return not storageutil.filedataequivalent(self, node, fulltext)
545
546 def emitrevisions(self, nodes, nodesorder=None, revisiondata=False,
547 assumehaveparentrevisions=False, deltaprevious=False):
548 if nodesorder not in ('nodes', 'storage', None):
549 raise error.ProgrammingError('unhandled value for nodesorder: %s' %
550 nodesorder)
551
552 nodes = [n for n in nodes if n != nullid]
553
554 if not nodes:
555 return
556
557 # TODO perform in a single query.
558 res = self._db.execute(
559 r'SELECT revnum, deltaid FROM fileindex '
560 r'WHERE pathid=? '
561 r' AND node in (%s)' % (r','.join([r'?'] * len(nodes))),
562 tuple([self._pathid] + nodes))
563
564 deltabases = {}
565
566 for rev, deltaid in res:
567 res = self._db.execute(
568 r'SELECT revnum from fileindex WHERE pathid=? AND deltaid=?',
569 (self._pathid, deltaid))
570 deltabases[rev] = res.fetchone()[0]
571
572 # TODO define revdifffn so we can use delta from storage.
573 for delta in storageutil.emitrevisions(
574 self, nodes, nodesorder, sqliterevisiondelta,
575 deltaparentfn=deltabases.__getitem__,
576 revisiondata=revisiondata,
577 assumehaveparentrevisions=assumehaveparentrevisions,
578 deltaprevious=deltaprevious):
579
580 yield delta
581
582 # End of ifiledata interface.
583
584 # Start of ifilemutation interface.
585
586 def add(self, filedata, meta, transaction, linkrev, p1, p2):
587 if meta or filedata.startswith(b'\x01\n'):
588 filedata = storageutil.packmeta(meta, filedata)
589
590 return self.addrevision(filedata, transaction, linkrev, p1, p2)
591
592 def addrevision(self, revisiondata, transaction, linkrev, p1, p2, node=None,
593 flags=0, cachedelta=None):
594 if flags:
595 raise SQLiteStoreError(_('flags not supported on revisions'))
596
597 validatehash = node is not None
598 node = node or storageutil.hashrevisionsha1(revisiondata, p1, p2)
599
600 if validatehash:
601 self._checkhash(revisiondata, node, p1, p2)
602
603 if node in self._nodetorev:
604 return node
605
606 node = self._addrawrevision(node, revisiondata, transaction, linkrev,
607 p1, p2)
608
609 self._revisioncache[node] = revisiondata
610 return node
611
612 def addgroup(self, deltas, linkmapper, transaction, addrevisioncb=None):
613 nodes = []
614
615 for node, p1, p2, linknode, deltabase, delta, wireflags in deltas:
616 storeflags = 0
617
618 if wireflags & repository.REVISION_FLAG_CENSORED:
619 storeflags |= FLAG_CENSORED
620
621 if wireflags & ~repository.REVISION_FLAG_CENSORED:
622 raise SQLiteStoreError('unhandled revision flag')
623
624 baserev = self.rev(deltabase)
625
626 # If base is censored, delta must be full replacement in a single
627 # patch operation.
628 if baserev != nullrev and self.iscensored(baserev):
629 hlen = struct.calcsize('>lll')
630 oldlen = len(self.revision(deltabase, raw=True,
631 _verifyhash=False))
632 newlen = len(delta) - hlen
633
634 if delta[:hlen] != mdiff.replacediffheader(oldlen, newlen):
635 raise error.CensoredBaseError(self._path,
636 deltabase)
637
638 if (not (storeflags & FLAG_CENSORED)
639 and storageutil.deltaiscensored(
640 delta, baserev, lambda x: len(self.revision(x, raw=True)))):
641 storeflags |= FLAG_CENSORED
642
643 linkrev = linkmapper(linknode)
644
645 nodes.append(node)
646
647 if node in self._revisions:
648 continue
649
650 if deltabase == nullid:
651 text = mdiff.patch(b'', delta)
652 storedelta = None
653 else:
654 text = None
655 storedelta = (deltabase, delta)
656
657 self._addrawrevision(node, text, transaction, linkrev, p1, p2,
658 storedelta=storedelta, flags=storeflags)
659
660 if addrevisioncb:
661 addrevisioncb(self, node)
662
663 return nodes
664
665 def censorrevision(self, tr, censornode, tombstone=b''):
666 tombstone = storageutil.packmeta({b'censored': tombstone}, b'')
667
668 # This restriction is cargo culted from revlogs and makes no sense for
669 # SQLite, since columns can be resized at will.
670 if len(tombstone) > len(self.revision(censornode, raw=True)):
671 raise error.Abort(_('censor tombstone must be no longer than '
672 'censored data'))
673
674 # We need to replace the censored revision's data with the tombstone.
675 # But replacing that data will have implications for delta chains that
676 # reference it.
677 #
678 # While "better," more complex strategies are possible, we do something
679 # simple: we find delta chain children of the censored revision and we
680 # replace those incremental deltas with fulltexts of their corresponding
681 # revision. Then we delete the now-unreferenced delta and original
682 # revision and insert a replacement.
683
684 # Find the delta to be censored.
685 censoreddeltaid = self._db.execute(
686 r'SELECT deltaid FROM fileindex WHERE id=?',
687 (self._revisions[censornode].rid,)).fetchone()[0]
688
689 # Find all its delta chain children.
690 # TODO once we support storing deltas for !files, we'll need to look
691 # for those delta chains too.
692 rows = list(self._db.execute(
693 r'SELECT id, pathid, node FROM fileindex '
694 r'WHERE deltabaseid=? OR deltaid=?',
695 (censoreddeltaid, censoreddeltaid)))
696
697 for row in rows:
698 rid, pathid, node = row
699
700 fulltext = resolvedeltachain(self._db, pathid, node, {}, {-1: None},
701 zstddctx=self._dctx)
702
703 deltahash = hashlib.sha1(fulltext).digest()
704
705 if self._compengine == 'zstd':
706 deltablob = self._cctx.compress(fulltext)
707 compression = COMPRESSION_ZSTD
708 elif self._compengine == 'zlib':
709 deltablob = zlib.compress(fulltext)
710 compression = COMPRESSION_ZLIB
711 elif self._compengine == 'none':
712 deltablob = fulltext
713 compression = COMPRESSION_NONE
714 else:
715 raise error.ProgrammingError('unhandled compression engine: %s'
716 % self._compengine)
717
718 if len(deltablob) >= len(fulltext):
719 deltablob = fulltext
720 compression = COMPRESSION_NONE
721
722 deltaid = insertdelta(self._db, compression, deltahash, deltablob)
723
724 self._db.execute(
725 r'UPDATE fileindex SET deltaid=?, deltabaseid=NULL '
726 r'WHERE id=?', (deltaid, rid))
727
728 # Now create the tombstone delta and replace the delta on the censored
729 # node.
730 deltahash = hashlib.sha1(tombstone).digest()
731 tombstonedeltaid = insertdelta(self._db, COMPRESSION_NONE,
732 deltahash, tombstone)
733
734 flags = self._revisions[censornode].flags
735 flags |= FLAG_CENSORED
736
737 self._db.execute(
738 r'UPDATE fileindex SET flags=?, deltaid=?, deltabaseid=NULL '
739 r'WHERE pathid=? AND node=?',
740 (flags, tombstonedeltaid, self._pathid, censornode))
741
742 self._db.execute(
743 r'DELETE FROM delta WHERE id=?', (censoreddeltaid,))
744
745 self._refreshindex()
746 self._revisioncache.clear()
747
748 def getstrippoint(self, minlink):
749 return storageutil.resolvestripinfo(minlink, len(self) - 1,
750 [self.rev(n) for n in self.heads()],
751 self.linkrev,
752 self.parentrevs)
753
754 def strip(self, minlink, transaction):
755 if not len(self):
756 return
757
758 rev, _ignored = self.getstrippoint(minlink)
759
760 if rev == len(self):
761 return
762
763 for rev in self.revs(rev):
764 self._db.execute(
765 r'DELETE FROM fileindex WHERE pathid=? AND node=?',
766 (self._pathid, self.node(rev)))
767
768 # TODO how should we garbage collect data in delta table?
769
770 self._refreshindex()
771
772 # End of ifilemutation interface.
773
774 # Start of ifilestorage interface.
775
776 def files(self):
777 return []
778
779 def storageinfo(self, exclusivefiles=False, sharedfiles=False,
780 revisionscount=False, trackedsize=False,
781 storedsize=False):
782 d = {}
783
784 if exclusivefiles:
785 d['exclusivefiles'] = []
786
787 if sharedfiles:
788 # TODO list sqlite file(s) here.
789 d['sharedfiles'] = []
790
791 if revisionscount:
792 d['revisionscount'] = len(self)
793
794 if trackedsize:
795 d['trackedsize'] = sum(len(self.revision(node))
796 for node in self._nodetorev)
797
798 if storedsize:
799 # TODO implement this?
800 d['storedsize'] = None
801
802 return d
803
804 def verifyintegrity(self, state):
805 state['skipread'] = set()
806
807 for rev in self:
808 node = self.node(rev)
809
810 try:
811 self.revision(node)
812 except Exception as e:
813 yield sqliteproblem(
814 error=_('unpacking %s: %s') % (short(node), e),
815 node=node)
816
817 state['skipread'].add(node)
818
819 # End of ifilestorage interface.
820
821 def _checkhash(self, fulltext, node, p1=None, p2=None):
822 if p1 is None and p2 is None:
823 p1, p2 = self.parents(node)
824
825 if node == storageutil.hashrevisionsha1(fulltext, p1, p2):
826 return
827
828 try:
829 del self._revisioncache[node]
830 except KeyError:
831 pass
832
833 if storageutil.iscensoredtext(fulltext):
834 raise error.CensoredNodeError(self._path, node, fulltext)
835
836 raise SQLiteStoreError(_('integrity check failed on %s') %
837 self._path)
838
839 def _addrawrevision(self, node, revisiondata, transaction, linkrev,
840 p1, p2, storedelta=None, flags=0):
841 if self._pathid is None:
842 res = self._db.execute(
843 r'INSERT INTO filepath (path) VALUES (?)', (self._path,))
844 self._pathid = res.lastrowid
845
846 # For simplicity, always store a delta against p1.
847 # TODO we need a lot more logic here to make behavior reasonable.
848
849 if storedelta:
850 deltabase, delta = storedelta
851
852 if isinstance(deltabase, int):
853 deltabase = self.node(deltabase)
854
855 else:
856 assert revisiondata is not None
857 deltabase = p1
858
859 if deltabase == nullid:
860 delta = revisiondata
861 else:
862 delta = mdiff.textdiff(self.revision(self.rev(deltabase)),
863 revisiondata)
864
865 # File index stores a pointer to its delta and the parent delta.
866 # The parent delta is stored via a pointer to the fileindex PK.
867 if deltabase == nullid:
868 baseid = None
869 else:
870 baseid = self._revisions[deltabase].rid
871
872 # Deltas are stored with a hash of their content. This allows
873 # us to de-duplicate. The table is configured to ignore conflicts
874 # and it is faster to just insert and silently noop than to look
875 # first.
876 deltahash = hashlib.sha1(delta).digest()
877
878 if self._compengine == 'zstd':
879 deltablob = self._cctx.compress(delta)
880 compression = COMPRESSION_ZSTD
881 elif self._compengine == 'zlib':
882 deltablob = zlib.compress(delta)
883 compression = COMPRESSION_ZLIB
884 elif self._compengine == 'none':
885 deltablob = delta
886 compression = COMPRESSION_NONE
887 else:
888 raise error.ProgrammingError('unhandled compression engine: %s' %
889 self._compengine)
890
891 # Don't store compressed data if it isn't practical.
892 if len(deltablob) >= len(delta):
893 deltablob = delta
894 compression = COMPRESSION_NONE
895
896 deltaid = insertdelta(self._db, compression, deltahash, deltablob)
897
898 rev = len(self)
899
900 if p1 == nullid:
901 p1rev = nullrev
902 else:
903 p1rev = self._nodetorev[p1]
904
905 if p2 == nullid:
906 p2rev = nullrev
907 else:
908 p2rev = self._nodetorev[p2]
909
910 rid = self._db.execute(
911 r'INSERT INTO fileindex ('
912 r' pathid, revnum, node, p1rev, p2rev, linkrev, flags, '
913 r' deltaid, deltabaseid) '
914 r' VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
915 (self._pathid, rev, node, p1rev, p2rev, linkrev, flags,
916 deltaid, baseid)
917 ).lastrowid
918
919 entry = revisionentry(
920 rid=rid,
921 rev=rev,
922 node=node,
923 p1rev=p1rev,
924 p2rev=p2rev,
925 p1node=p1,
926 p2node=p2,
927 linkrev=linkrev,
928 flags=flags)
929
930 self._nodetorev[node] = rev
931 self._revtonode[rev] = node
932 self._revisions[node] = entry
933
934 return node
935
936 class sqliterepository(localrepo.localrepository):
937 def cancopy(self):
938 return False
939
940 def transaction(self, *args, **kwargs):
941 current = self.currenttransaction()
942
943 tr = super(sqliterepository, self).transaction(*args, **kwargs)
944
945 if current:
946 return tr
947
948 self._dbconn.execute(r'BEGIN TRANSACTION')
949
950 def committransaction(_):
951 self._dbconn.commit()
952
953 tr.addfinalize('sqlitestore', committransaction)
954
955 return tr
956
957 @property
958 def _dbconn(self):
959 # SQLite connections can only be used on the thread that created
960 # them. In most cases, this "just works." However, hgweb uses
961 # multiple threads.
962 tid = threading.current_thread().ident
963
964 if self._db:
965 if self._db[0] == tid:
966 return self._db[1]
967
968 db = makedb(self.svfs.join('db.sqlite'))
969 self._db = (tid, db)
970
971 return db
972
973 def makedb(path):
974 """Construct a database handle for a database at path."""
975
976 db = sqlite3.connect(path)
977 db.text_factory = bytes
978
979 res = db.execute(r'PRAGMA user_version').fetchone()[0]
980
981 # New database.
982 if res == 0:
983 for statement in CREATE_SCHEMA:
984 db.execute(statement)
985
986 db.commit()
987
988 elif res == CURRENT_SCHEMA_VERSION:
989 pass
990
991 else:
992 raise error.Abort(_('sqlite database has unrecognized version'))
993
994 db.execute(r'PRAGMA journal_mode=WAL')
995
996 return db
997
998 def featuresetup(ui, supported):
999 supported.add(REQUIREMENT)
1000
1001 if zstd:
1002 supported.add(REQUIREMENT_ZSTD)
1003
1004 supported.add(REQUIREMENT_ZLIB)
1005 supported.add(REQUIREMENT_NONE)
1006
1007 def newreporequirements(orig, ui, createopts):
1008 if createopts['backend'] != 'sqlite':
1009 return orig(ui, createopts)
1010
1011 # This restriction can be lifted once we have more confidence.
1012 if 'sharedrepo' in createopts:
1013 raise error.Abort(_('shared repositories not supported with SQLite '
1014 'store'))
1015
1016 # This filtering is out of an abundance of caution: we want to ensure
1017 # we honor creation options and we do that by annotating exactly the
1018 # creation options we recognize.
1019 known = {
1020 'narrowfiles',
1021 'backend',
1022 }
1023
1024 unsupported = set(createopts) - known
1025 if unsupported:
1026 raise error.Abort(_('SQLite store does not support repo creation '
1027 'option: %s') % ', '.join(sorted(unsupported)))
1028
1029 # Since we're a hybrid store that still relies on revlogs, we fall back
1030 # to using the revlogv1 backend's storage requirements then adding our
1031 # own requirement.
1032 createopts['backend'] = 'revlogv1'
1033 requirements = orig(ui, createopts)
1034 requirements.add(REQUIREMENT)
1035
1036 compression = ui.config('storage', 'sqlite.compression')
1037
1038 if compression == 'zstd' and not zstd:
1039 raise error.Abort(_('storage.sqlite.compression set to "zstd" but '
1040 'zstandard compression not available to this '
1041 'Mercurial install'))
1042
1043 if compression == 'zstd':
1044 requirements.add(REQUIREMENT_ZSTD)
1045 elif compression == 'zlib':
1046 requirements.add(REQUIREMENT_ZLIB)
1047 elif compression == 'none':
1048 requirements.add(REQUIREMENT_NONE)
1049 else:
1050 raise error.Abort(_('unknown compression engine defined in '
1051 'storage.sqlite.compression: %s') % compression)
1052
1053 return requirements
1054
1055 @interfaceutil.implementer(repository.ilocalrepositoryfilestorage)
1056 class sqlitefilestorage(object):
1057 """Repository file storage backed by SQLite."""
1058 def file(self, path):
1059 if path[0] == b'/':
1060 path = path[1:]
1061
1062 if REQUIREMENT_ZSTD in self.requirements:
1063 compression = 'zstd'
1064 elif REQUIREMENT_ZLIB in self.requirements:
1065 compression = 'zlib'
1066 elif REQUIREMENT_NONE in self.requirements:
1067 compression = 'none'
1068 else:
1069 raise error.Abort(_('unable to determine what compression engine '
1070 'to use for SQLite storage'))
1071
1072 return sqlitefilestore(self._dbconn, path, compression)
1073
1074 def makefilestorage(orig, requirements, **kwargs):
1075 """Produce a type conforming to ``ilocalrepositoryfilestorage``."""
1076 if REQUIREMENT in requirements:
1077 return sqlitefilestorage
1078 else:
1079 return orig(requirements=requirements, **kwargs)
1080
1081 def makemain(orig, ui, requirements, **kwargs):
1082 if REQUIREMENT in requirements:
1083 if REQUIREMENT_ZSTD in requirements and not zstd:
1084 raise error.Abort(_('repository uses zstandard compression, which '
1085 'is not available to this Mercurial install'))
1086
1087 return sqliterepository
1088
1089 return orig(requirements=requirements, **kwargs)
1090
1091 def verifierinit(orig, self, *args, **kwargs):
1092 orig(self, *args, **kwargs)
1093
1094 # We don't care that files in the store don't align with what is
1095 # advertised. So suppress these warnings.
1096 self.warnorphanstorefiles = False
1097
1098 def extsetup(ui):
1099 localrepo.featuresetupfuncs.add(featuresetup)
1100 extensions.wrapfunction(localrepo, 'newreporequirements',
1101 newreporequirements)
1102 extensions.wrapfunction(localrepo, 'makefilestorage',
1103 makefilestorage)
1104 extensions.wrapfunction(localrepo, 'makemain',
1105 makemain)
1106 extensions.wrapfunction(verify.verifier, '__init__',
1107 verifierinit)
1108
1109 def reposetup(ui, repo):
1110 if isinstance(repo, sqliterepository):
1111 repo._db = None
1112
1113 # TODO check for bundlerepository?