Mercurial > hg
comparison hgext/sqlitestore.py @ 40326:fed697fa1734
sqlitestore: file storage backend using SQLite
This commit provides an extension which uses SQLite to store file
data (as opposed to revlogs).
As the inline documentation describes, there are still several
aspects to the extension that are incomplete. But it's a start.
The extension does support basic clone, checkout, and commit
workflows, which makes it suitable for simple use cases.
One notable missing feature is support for "bundlerepos." This is
probably responsible for the most test failures when the extension
is activated as part of the test suite.
All revision data is stored in SQLite. Data is stored as zstd
compressed chunks (default if zstd is available), zlib compressed
chunks (default if zstd is not available), or raw chunks (if
configured or if a compressed delta is not smaller than the raw
delta). This makes things very similar to revlogs.
Unlike revlogs, the extension doesn't yet enforce a limit on delta
chain length. This is an obvious limitation and should be addressed.
This is somewhat mitigated by the use of zstd, which is much faster
than zlib to decompress.
There is a dedicated table for storing deltas. Deltas are stored
by the SHA-1 hash of their uncompressed content. The "fileindex" table
has columns that reference the delta for each revision and the base
delta that delta should be applied against. A recursive SQL query
is used to resolve the delta chain along with the delta data.
By storing deltas by hash, we are able to de-duplicate delta storage!
With revlogs, the same deltas in different revlogs would result in
duplicate storage of that delta. In this scheme, inserting the
duplicate delta is a no-op and delta chains simply reference the
existing delta.
When initially implementing this extension, I did not have
content-indexed deltas and deltas could be duplicated across files
(just like revlogs). When I implemented content-indexed deltas, the
size of the SQLite database for a full clone of mozilla-unified
dropped:
before: 2,554,261,504 bytes
after: 2,488,754,176 bytes
Surprisingly, this is still larger than the bytes size of revlog
files:
revlog files: 2,104,861,230 bytes
du -b: 2,254,381,614
I would have expected storage to be smaller since we're not limiting
delta chain length and since we're using zstd instead of zlib. I
suspect the SQLite indexes and per-column overhead account for the
bulk of the differences. (Keep in mind that revlog uses a 64-byte
packed struct for revision index data and deltas are stored without
padding. Aside from the 12 unused bytes in the 32 byte node field,
revlogs are pretty efficient.) Another source of overhead is file
name storage. With revlogs, file names are stored in the filesystem.
But with SQLite, we need to store file names in the database. This is
roughly equivalent to the size of the fncache file, which for the
mozilla-unified repository is ~34MB.
Since the SQLite database isn't append-only and since delta chains
can reference any delta, this opens some interesting possibilities.
For example, we could store deltas in reverse, such that fulltexts
are stored for newer revisions and deltas are applied to reconstruct
older revisions. This is likely a more optimal storage strategy for
version control, as new data tends to be more frequently accessed
than old data. We would obviously need wire protocol support for
transferring revision data from newest to oldest. And we would
probably need some kind of mechanism for "re-encoding" stores. But
it should be doable.
This extension is very much experimental quality. There are a handful
of features that don't work. It probably isn't suitable for day-to-day
use. But it could be used in limited cases (e.g. read-only checkouts
like in CI). And it is also a good proving ground for alternate
storage backends. As we continue to define interfaces for all things
storage, it will be useful to have a viable alternate storage backend
to see how things shake out in practice.
test-storage.py passes on Python 2 and introduces no new test failures on
Python 3. Having the storage-level unit tests has proved to be insanely
useful when developing this extension. Those tests caught numerous bugs
during development and I'm convinced this style of testing is the way
forward for ensuring alternate storage backends work as intended. Of
course, test coverage isn't close to what it needs to be. But it is
a start. And what coverage we have gives me confidence that basic store
functionality is implemented properly.
Differential Revision: https://phab.mercurial-scm.org/D4928
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Tue, 09 Oct 2018 08:50:13 -0700 |
parents | |
children | f1a39128da95 |
comparison
equal
deleted
inserted
replaced
40325:b0fbd1792e2d | 40326:fed697fa1734 |
---|---|
1 # sqlitestore.py - Storage backend that uses SQLite | |
2 # | |
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com> | |
4 # | |
5 # This software may be used and distributed according to the terms of the | |
6 # GNU General Public License version 2 or any later version. | |
7 | |
8 """store repository data in SQLite (EXPERIMENTAL) | |
9 | |
10 The sqlitestore extension enables the storage of repository data in SQLite. | |
11 | |
12 This extension is HIGHLY EXPERIMENTAL. There are NO BACKWARDS COMPATIBILITY | |
13 GUARANTEES. This means that repositories created with this extension may | |
14 only be usable with the exact version of this extension/Mercurial that was | |
15 used. The extension attempts to enforce this in order to prevent repository | |
16 corruption. | |
17 | |
18 In addition, several features are not yet supported or have known bugs: | |
19 | |
20 * Only some data is stored in SQLite. Changeset, manifest, and other repository | |
21 data is not yet stored in SQLite. | |
22 * Transactions are not robust. If the process is aborted at the right time | |
23 during transaction close/rollback, the repository could be in an inconsistent | |
24 state. This problem will diminish once all repository data is tracked by | |
25 SQLite. | |
26 * Bundle repositories do not work (the ability to use e.g. | |
27 `hg -R <bundle-file> log` to automatically overlay a bundle on top of the | |
28 existing repository). | |
29 * Various other features don't work. | |
30 | |
31 This extension should work for basic clone/pull, update, and commit workflows. | |
32 Some history rewriting operations may fail due to lack of support for bundle | |
33 repositories. | |
34 | |
35 To use, activate the extension and set the ``storage.new-repo-backend`` config | |
36 option to ``sqlite`` to enable new repositories to use SQLite for storage. | |
37 """ | |
38 | |
39 # To run the test suite with repos using SQLite by default, execute the | |
40 # following: | |
41 # | |
42 # HGREPOFEATURES="sqlitestore" run-tests.py \ | |
43 # --extra-config-opt extensions.sqlitestore= \ | |
44 # --extra-config-opt storage.new-repo-backend=sqlite | |
45 | |
46 from __future__ import absolute_import | |
47 | |
48 import hashlib | |
49 import sqlite3 | |
50 import struct | |
51 import threading | |
52 import zlib | |
53 | |
54 from mercurial.i18n import _ | |
55 from mercurial.node import ( | |
56 nullid, | |
57 nullrev, | |
58 short, | |
59 ) | |
60 from mercurial.thirdparty import ( | |
61 attr, | |
62 ) | |
63 from mercurial import ( | |
64 ancestor, | |
65 dagop, | |
66 error, | |
67 extensions, | |
68 localrepo, | |
69 mdiff, | |
70 pycompat, | |
71 registrar, | |
72 repository, | |
73 util, | |
74 verify, | |
75 ) | |
76 from mercurial.utils import ( | |
77 interfaceutil, | |
78 storageutil, | |
79 ) | |
80 | |
81 try: | |
82 from mercurial import zstd | |
83 zstd.__version__ | |
84 except ImportError: | |
85 zstd = None | |
86 | |
87 configtable = {} | |
88 configitem = registrar.configitem(configtable) | |
89 | |
90 # experimental config: storage.sqlite.compression | |
91 configitem('storage', 'sqlite.compression', | |
92 default='zstd' if zstd else 'zlib') | |
93 | |
94 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for | |
95 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should | |
96 # be specifying the version(s) of Mercurial they are tested with, or | |
97 # leave the attribute unspecified. | |
98 testedwith = 'ships-with-hg-core' | |
99 | |
100 REQUIREMENT = b'exp-sqlite-001' | |
101 REQUIREMENT_ZSTD = b'exp-sqlite-comp-001=zstd' | |
102 REQUIREMENT_ZLIB = b'exp-sqlite-comp-001=zlib' | |
103 REQUIREMENT_NONE = b'exp-sqlite-comp-001=none' | |
104 | |
105 CURRENT_SCHEMA_VERSION = 1 | |
106 | |
107 COMPRESSION_NONE = 1 | |
108 COMPRESSION_ZSTD = 2 | |
109 COMPRESSION_ZLIB = 3 | |
110 | |
111 FLAG_CENSORED = 1 | |
112 | |
113 CREATE_SCHEMA = [ | |
114 # Deltas are stored as content-indexed blobs. | |
115 # compression column holds COMPRESSION_* constant for how the | |
116 # delta is encoded. | |
117 | |
118 r'CREATE TABLE delta (' | |
119 r' id INTEGER PRIMARY KEY, ' | |
120 r' compression INTEGER NOT NULL, ' | |
121 r' hash BLOB UNIQUE ON CONFLICT ABORT, ' | |
122 r' delta BLOB NOT NULL ' | |
123 r')', | |
124 | |
125 # Tracked paths are denormalized to integers to avoid redundant | |
126 # storage of the path name. | |
127 r'CREATE TABLE filepath (' | |
128 r' id INTEGER PRIMARY KEY, ' | |
129 r' path BLOB NOT NULL ' | |
130 r')', | |
131 | |
132 r'CREATE UNIQUE INDEX filepath_path ' | |
133 r' ON filepath (path)', | |
134 | |
135 # We have a single table for all file revision data. | |
136 # Each file revision is uniquely described by a (path, rev) and | |
137 # (path, node). | |
138 # | |
139 # Revision data is stored as a pointer to the delta producing this | |
140 # revision and the file revision whose delta should be applied before | |
141 # that one. One can reconstruct the delta chain by recursively following | |
142 # the delta base revision pointers until one encounters NULL. | |
143 # | |
144 # flags column holds bitwise integer flags controlling storage options. | |
145 # These flags are defined by the FLAG_* constants. | |
146 r'CREATE TABLE fileindex (' | |
147 r' id INTEGER PRIMARY KEY, ' | |
148 r' pathid INTEGER REFERENCES filepath(id), ' | |
149 r' revnum INTEGER NOT NULL, ' | |
150 r' p1rev INTEGER NOT NULL, ' | |
151 r' p2rev INTEGER NOT NULL, ' | |
152 r' linkrev INTEGER NOT NULL, ' | |
153 r' flags INTEGER NOT NULL, ' | |
154 r' deltaid INTEGER REFERENCES delta(id), ' | |
155 r' deltabaseid INTEGER REFERENCES fileindex(id), ' | |
156 r' node BLOB NOT NULL ' | |
157 r')', | |
158 | |
159 r'CREATE UNIQUE INDEX fileindex_pathrevnum ' | |
160 r' ON fileindex (pathid, revnum)', | |
161 | |
162 r'CREATE UNIQUE INDEX fileindex_pathnode ' | |
163 r' ON fileindex (pathid, node)', | |
164 | |
165 # Provide a view over all file data for convenience. | |
166 r'CREATE VIEW filedata AS ' | |
167 r'SELECT ' | |
168 r' fileindex.id AS id, ' | |
169 r' filepath.id AS pathid, ' | |
170 r' filepath.path AS path, ' | |
171 r' fileindex.revnum AS revnum, ' | |
172 r' fileindex.node AS node, ' | |
173 r' fileindex.p1rev AS p1rev, ' | |
174 r' fileindex.p2rev AS p2rev, ' | |
175 r' fileindex.linkrev AS linkrev, ' | |
176 r' fileindex.flags AS flags, ' | |
177 r' fileindex.deltaid AS deltaid, ' | |
178 r' fileindex.deltabaseid AS deltabaseid ' | |
179 r'FROM filepath, fileindex ' | |
180 r'WHERE fileindex.pathid=filepath.id', | |
181 | |
182 r'PRAGMA user_version=%d' % CURRENT_SCHEMA_VERSION, | |
183 ] | |
184 | |
185 def resolvedeltachain(db, pathid, node, revisioncache, | |
186 stoprids, zstddctx=None): | |
187 """Resolve a delta chain for a file node.""" | |
188 | |
189 # TODO the "not in ({stops})" here is possibly slowing down the query | |
190 # because it needs to perform the lookup on every recursive invocation. | |
191 # This could possibly be faster if we created a temporary query with | |
192 # baseid "poisoned" to null and limited the recursive filter to | |
193 # "is not null". | |
194 res = db.execute( | |
195 r'WITH RECURSIVE ' | |
196 r' deltachain(deltaid, baseid) AS (' | |
197 r' SELECT deltaid, deltabaseid FROM fileindex ' | |
198 r' WHERE pathid=? AND node=? ' | |
199 r' UNION ALL ' | |
200 r' SELECT fileindex.deltaid, deltabaseid ' | |
201 r' FROM fileindex, deltachain ' | |
202 r' WHERE ' | |
203 r' fileindex.id=deltachain.baseid ' | |
204 r' AND deltachain.baseid IS NOT NULL ' | |
205 r' AND fileindex.id NOT IN ({stops}) ' | |
206 r' ) ' | |
207 r'SELECT deltachain.baseid, compression, delta ' | |
208 r'FROM deltachain, delta ' | |
209 r'WHERE delta.id=deltachain.deltaid'.format( | |
210 stops=r','.join([r'?'] * len(stoprids))), | |
211 tuple([pathid, node] + list(stoprids.keys()))) | |
212 | |
213 deltas = [] | |
214 lastdeltabaseid = None | |
215 | |
216 for deltabaseid, compression, delta in res: | |
217 lastdeltabaseid = deltabaseid | |
218 | |
219 if compression == COMPRESSION_ZSTD: | |
220 delta = zstddctx.decompress(delta) | |
221 elif compression == COMPRESSION_NONE: | |
222 delta = delta | |
223 elif compression == COMPRESSION_ZLIB: | |
224 delta = zlib.decompress(delta) | |
225 else: | |
226 raise SQLiteStoreError('unhandled compression type: %d' % | |
227 compression) | |
228 | |
229 deltas.append(delta) | |
230 | |
231 if lastdeltabaseid in stoprids: | |
232 basetext = revisioncache[stoprids[lastdeltabaseid]] | |
233 else: | |
234 basetext = deltas.pop() | |
235 | |
236 deltas.reverse() | |
237 fulltext = mdiff.patches(basetext, deltas) | |
238 | |
239 # SQLite returns buffer instances for blob columns on Python 2. This | |
240 # type can propagate through the delta application layer. Because | |
241 # downstream callers assume revisions are bytes, cast as needed. | |
242 if not isinstance(fulltext, bytes): | |
243 fulltext = bytes(delta) | |
244 | |
245 return fulltext | |
246 | |
247 def insertdelta(db, compression, hash, delta): | |
248 try: | |
249 return db.execute( | |
250 r'INSERT INTO delta (compression, hash, delta) ' | |
251 r'VALUES (?, ?, ?)', | |
252 (compression, hash, delta)).lastrowid | |
253 except sqlite3.IntegrityError: | |
254 return db.execute( | |
255 r'SELECT id FROM delta WHERE hash=?', | |
256 (hash,)).fetchone()[0] | |
257 | |
258 class SQLiteStoreError(error.StorageError): | |
259 pass | |
260 | |
261 @attr.s | |
262 class revisionentry(object): | |
263 rid = attr.ib() | |
264 rev = attr.ib() | |
265 node = attr.ib() | |
266 p1rev = attr.ib() | |
267 p2rev = attr.ib() | |
268 p1node = attr.ib() | |
269 p2node = attr.ib() | |
270 linkrev = attr.ib() | |
271 flags = attr.ib() | |
272 | |
273 @interfaceutil.implementer(repository.irevisiondelta) | |
274 @attr.s(slots=True) | |
275 class sqliterevisiondelta(object): | |
276 node = attr.ib() | |
277 p1node = attr.ib() | |
278 p2node = attr.ib() | |
279 basenode = attr.ib() | |
280 flags = attr.ib() | |
281 baserevisionsize = attr.ib() | |
282 revision = attr.ib() | |
283 delta = attr.ib() | |
284 linknode = attr.ib(default=None) | |
285 | |
286 @interfaceutil.implementer(repository.iverifyproblem) | |
287 @attr.s(frozen=True) | |
288 class sqliteproblem(object): | |
289 warning = attr.ib(default=None) | |
290 error = attr.ib(default=None) | |
291 node = attr.ib(default=None) | |
292 | |
293 @interfaceutil.implementer(repository.ifilestorage) | |
294 class sqlitefilestore(object): | |
295 """Implements storage for an individual tracked path.""" | |
296 | |
297 def __init__(self, db, path, compression): | |
298 self._db = db | |
299 self._path = path | |
300 | |
301 self._pathid = None | |
302 | |
303 # revnum -> node | |
304 self._revtonode = {} | |
305 # node -> revnum | |
306 self._nodetorev = {} | |
307 # node -> data structure | |
308 self._revisions = {} | |
309 | |
310 self._revisioncache = util.lrucachedict(10) | |
311 | |
312 self._compengine = compression | |
313 | |
314 if compression == 'zstd': | |
315 self._cctx = zstd.ZstdCompressor(level=3) | |
316 self._dctx = zstd.ZstdDecompressor() | |
317 else: | |
318 self._cctx = None | |
319 self._dctx = None | |
320 | |
321 self._refreshindex() | |
322 | |
323 def _refreshindex(self): | |
324 self._revtonode = {} | |
325 self._nodetorev = {} | |
326 self._revisions = {} | |
327 | |
328 res = list(self._db.execute( | |
329 r'SELECT id FROM filepath WHERE path=?', (self._path,))) | |
330 | |
331 if not res: | |
332 self._pathid = None | |
333 return | |
334 | |
335 self._pathid = res[0][0] | |
336 | |
337 res = self._db.execute( | |
338 r'SELECT id, revnum, node, p1rev, p2rev, linkrev, flags ' | |
339 r'FROM fileindex ' | |
340 r'WHERE pathid=? ' | |
341 r'ORDER BY revnum ASC', | |
342 (self._pathid,)) | |
343 | |
344 for i, row in enumerate(res): | |
345 rid, rev, node, p1rev, p2rev, linkrev, flags = row | |
346 | |
347 if i != rev: | |
348 raise SQLiteStoreError(_('sqlite database has inconsistent ' | |
349 'revision numbers')) | |
350 | |
351 if p1rev == nullrev: | |
352 p1node = nullid | |
353 else: | |
354 p1node = self._revtonode[p1rev] | |
355 | |
356 if p2rev == nullrev: | |
357 p2node = nullid | |
358 else: | |
359 p2node = self._revtonode[p2rev] | |
360 | |
361 entry = revisionentry( | |
362 rid=rid, | |
363 rev=rev, | |
364 node=node, | |
365 p1rev=p1rev, | |
366 p2rev=p2rev, | |
367 p1node=p1node, | |
368 p2node=p2node, | |
369 linkrev=linkrev, | |
370 flags=flags) | |
371 | |
372 self._revtonode[rev] = node | |
373 self._nodetorev[node] = rev | |
374 self._revisions[node] = entry | |
375 | |
376 # Start of ifileindex interface. | |
377 | |
378 def __len__(self): | |
379 return len(self._revisions) | |
380 | |
381 def __iter__(self): | |
382 return iter(pycompat.xrange(len(self._revisions))) | |
383 | |
384 def revs(self, start=0, stop=None): | |
385 return storageutil.iterrevs(len(self._revisions), start=start, | |
386 stop=stop) | |
387 | |
388 def parents(self, node): | |
389 if node == nullid: | |
390 return nullid, nullid | |
391 | |
392 if node not in self._revisions: | |
393 raise error.LookupError(node, self._path, _('no node')) | |
394 | |
395 entry = self._revisions[node] | |
396 return entry.p1node, entry.p2node | |
397 | |
398 def parentrevs(self, rev): | |
399 if rev == nullrev: | |
400 return nullrev, nullrev | |
401 | |
402 if rev not in self._revtonode: | |
403 raise IndexError(rev) | |
404 | |
405 entry = self._revisions[self._revtonode[rev]] | |
406 return entry.p1rev, entry.p2rev | |
407 | |
408 def rev(self, node): | |
409 if node == nullid: | |
410 return nullrev | |
411 | |
412 if node not in self._nodetorev: | |
413 raise error.LookupError(node, self._path, _('no node')) | |
414 | |
415 return self._nodetorev[node] | |
416 | |
417 def node(self, rev): | |
418 if rev == nullrev: | |
419 return nullid | |
420 | |
421 if rev not in self._revtonode: | |
422 raise IndexError(rev) | |
423 | |
424 return self._revtonode[rev] | |
425 | |
426 def lookup(self, node): | |
427 return storageutil.fileidlookup(self, node, self._path) | |
428 | |
429 def linkrev(self, rev): | |
430 if rev == nullrev: | |
431 return nullrev | |
432 | |
433 if rev not in self._revtonode: | |
434 raise IndexError(rev) | |
435 | |
436 entry = self._revisions[self._revtonode[rev]] | |
437 return entry.linkrev | |
438 | |
439 def iscensored(self, rev): | |
440 if rev == nullrev: | |
441 return False | |
442 | |
443 if rev not in self._revtonode: | |
444 raise IndexError(rev) | |
445 | |
446 return self._revisions[self._revtonode[rev]].flags & FLAG_CENSORED | |
447 | |
448 def commonancestorsheads(self, node1, node2): | |
449 rev1 = self.rev(node1) | |
450 rev2 = self.rev(node2) | |
451 | |
452 ancestors = ancestor.commonancestorsheads(self.parentrevs, rev1, rev2) | |
453 return pycompat.maplist(self.node, ancestors) | |
454 | |
455 def descendants(self, revs): | |
456 # TODO we could implement this using a recursive SQL query, which | |
457 # might be faster. | |
458 return dagop.descendantrevs(revs, self.revs, self.parentrevs) | |
459 | |
460 def heads(self, start=None, stop=None): | |
461 if start is None and stop is None: | |
462 if not len(self): | |
463 return [nullid] | |
464 | |
465 startrev = self.rev(start) if start is not None else nullrev | |
466 stoprevs = {self.rev(n) for n in stop or []} | |
467 | |
468 revs = dagop.headrevssubset(self.revs, self.parentrevs, | |
469 startrev=startrev, stoprevs=stoprevs) | |
470 | |
471 return [self.node(rev) for rev in revs] | |
472 | |
473 def children(self, node): | |
474 rev = self.rev(node) | |
475 | |
476 res = self._db.execute( | |
477 r'SELECT' | |
478 r' node ' | |
479 r' FROM filedata ' | |
480 r' WHERE path=? AND (p1rev=? OR p2rev=?) ' | |
481 r' ORDER BY revnum ASC', | |
482 (self._path, rev, rev)) | |
483 | |
484 return [row[0] for row in res] | |
485 | |
486 # End of ifileindex interface. | |
487 | |
488 # Start of ifiledata interface. | |
489 | |
490 def size(self, rev): | |
491 if rev == nullrev: | |
492 return 0 | |
493 | |
494 if rev not in self._revtonode: | |
495 raise IndexError(rev) | |
496 | |
497 node = self._revtonode[rev] | |
498 | |
499 if self.renamed(node): | |
500 return len(self.read(node)) | |
501 | |
502 return len(self.revision(node)) | |
503 | |
504 def revision(self, node, raw=False, _verifyhash=True): | |
505 if node in (nullid, nullrev): | |
506 return b'' | |
507 | |
508 if isinstance(node, int): | |
509 node = self.node(node) | |
510 | |
511 if node not in self._nodetorev: | |
512 raise error.LookupError(node, self._path, _('no node')) | |
513 | |
514 if node in self._revisioncache: | |
515 return self._revisioncache[node] | |
516 | |
517 # Because we have a fulltext revision cache, we are able to | |
518 # short-circuit delta chain traversal and decompression as soon as | |
519 # we encounter a revision in the cache. | |
520 | |
521 stoprids = {self._revisions[n].rid: n | |
522 for n in self._revisioncache} | |
523 | |
524 if not stoprids: | |
525 stoprids[-1] = None | |
526 | |
527 fulltext = resolvedeltachain(self._db, self._pathid, node, | |
528 self._revisioncache, stoprids, | |
529 zstddctx=self._dctx) | |
530 | |
531 if _verifyhash: | |
532 self._checkhash(fulltext, node) | |
533 self._revisioncache[node] = fulltext | |
534 | |
535 return fulltext | |
536 | |
537 def read(self, node): | |
538 return storageutil.filtermetadata(self.revision(node)) | |
539 | |
540 def renamed(self, node): | |
541 return storageutil.filerevisioncopied(self, node) | |
542 | |
543 def cmp(self, node, fulltext): | |
544 return not storageutil.filedataequivalent(self, node, fulltext) | |
545 | |
546 def emitrevisions(self, nodes, nodesorder=None, revisiondata=False, | |
547 assumehaveparentrevisions=False, deltaprevious=False): | |
548 if nodesorder not in ('nodes', 'storage', None): | |
549 raise error.ProgrammingError('unhandled value for nodesorder: %s' % | |
550 nodesorder) | |
551 | |
552 nodes = [n for n in nodes if n != nullid] | |
553 | |
554 if not nodes: | |
555 return | |
556 | |
557 # TODO perform in a single query. | |
558 res = self._db.execute( | |
559 r'SELECT revnum, deltaid FROM fileindex ' | |
560 r'WHERE pathid=? ' | |
561 r' AND node in (%s)' % (r','.join([r'?'] * len(nodes))), | |
562 tuple([self._pathid] + nodes)) | |
563 | |
564 deltabases = {} | |
565 | |
566 for rev, deltaid in res: | |
567 res = self._db.execute( | |
568 r'SELECT revnum from fileindex WHERE pathid=? AND deltaid=?', | |
569 (self._pathid, deltaid)) | |
570 deltabases[rev] = res.fetchone()[0] | |
571 | |
572 # TODO define revdifffn so we can use delta from storage. | |
573 for delta in storageutil.emitrevisions( | |
574 self, nodes, nodesorder, sqliterevisiondelta, | |
575 deltaparentfn=deltabases.__getitem__, | |
576 revisiondata=revisiondata, | |
577 assumehaveparentrevisions=assumehaveparentrevisions, | |
578 deltaprevious=deltaprevious): | |
579 | |
580 yield delta | |
581 | |
582 # End of ifiledata interface. | |
583 | |
584 # Start of ifilemutation interface. | |
585 | |
586 def add(self, filedata, meta, transaction, linkrev, p1, p2): | |
587 if meta or filedata.startswith(b'\x01\n'): | |
588 filedata = storageutil.packmeta(meta, filedata) | |
589 | |
590 return self.addrevision(filedata, transaction, linkrev, p1, p2) | |
591 | |
592 def addrevision(self, revisiondata, transaction, linkrev, p1, p2, node=None, | |
593 flags=0, cachedelta=None): | |
594 if flags: | |
595 raise SQLiteStoreError(_('flags not supported on revisions')) | |
596 | |
597 validatehash = node is not None | |
598 node = node or storageutil.hashrevisionsha1(revisiondata, p1, p2) | |
599 | |
600 if validatehash: | |
601 self._checkhash(revisiondata, node, p1, p2) | |
602 | |
603 if node in self._nodetorev: | |
604 return node | |
605 | |
606 node = self._addrawrevision(node, revisiondata, transaction, linkrev, | |
607 p1, p2) | |
608 | |
609 self._revisioncache[node] = revisiondata | |
610 return node | |
611 | |
612 def addgroup(self, deltas, linkmapper, transaction, addrevisioncb=None): | |
613 nodes = [] | |
614 | |
615 for node, p1, p2, linknode, deltabase, delta, wireflags in deltas: | |
616 storeflags = 0 | |
617 | |
618 if wireflags & repository.REVISION_FLAG_CENSORED: | |
619 storeflags |= FLAG_CENSORED | |
620 | |
621 if wireflags & ~repository.REVISION_FLAG_CENSORED: | |
622 raise SQLiteStoreError('unhandled revision flag') | |
623 | |
624 baserev = self.rev(deltabase) | |
625 | |
626 # If base is censored, delta must be full replacement in a single | |
627 # patch operation. | |
628 if baserev != nullrev and self.iscensored(baserev): | |
629 hlen = struct.calcsize('>lll') | |
630 oldlen = len(self.revision(deltabase, raw=True, | |
631 _verifyhash=False)) | |
632 newlen = len(delta) - hlen | |
633 | |
634 if delta[:hlen] != mdiff.replacediffheader(oldlen, newlen): | |
635 raise error.CensoredBaseError(self._path, | |
636 deltabase) | |
637 | |
638 if (not (storeflags & FLAG_CENSORED) | |
639 and storageutil.deltaiscensored( | |
640 delta, baserev, lambda x: len(self.revision(x, raw=True)))): | |
641 storeflags |= FLAG_CENSORED | |
642 | |
643 linkrev = linkmapper(linknode) | |
644 | |
645 nodes.append(node) | |
646 | |
647 if node in self._revisions: | |
648 continue | |
649 | |
650 if deltabase == nullid: | |
651 text = mdiff.patch(b'', delta) | |
652 storedelta = None | |
653 else: | |
654 text = None | |
655 storedelta = (deltabase, delta) | |
656 | |
657 self._addrawrevision(node, text, transaction, linkrev, p1, p2, | |
658 storedelta=storedelta, flags=storeflags) | |
659 | |
660 if addrevisioncb: | |
661 addrevisioncb(self, node) | |
662 | |
663 return nodes | |
664 | |
665 def censorrevision(self, tr, censornode, tombstone=b''): | |
666 tombstone = storageutil.packmeta({b'censored': tombstone}, b'') | |
667 | |
668 # This restriction is cargo culted from revlogs and makes no sense for | |
669 # SQLite, since columns can be resized at will. | |
670 if len(tombstone) > len(self.revision(censornode, raw=True)): | |
671 raise error.Abort(_('censor tombstone must be no longer than ' | |
672 'censored data')) | |
673 | |
674 # We need to replace the censored revision's data with the tombstone. | |
675 # But replacing that data will have implications for delta chains that | |
676 # reference it. | |
677 # | |
678 # While "better," more complex strategies are possible, we do something | |
679 # simple: we find delta chain children of the censored revision and we | |
680 # replace those incremental deltas with fulltexts of their corresponding | |
681 # revision. Then we delete the now-unreferenced delta and original | |
682 # revision and insert a replacement. | |
683 | |
684 # Find the delta to be censored. | |
685 censoreddeltaid = self._db.execute( | |
686 r'SELECT deltaid FROM fileindex WHERE id=?', | |
687 (self._revisions[censornode].rid,)).fetchone()[0] | |
688 | |
689 # Find all its delta chain children. | |
690 # TODO once we support storing deltas for !files, we'll need to look | |
691 # for those delta chains too. | |
692 rows = list(self._db.execute( | |
693 r'SELECT id, pathid, node FROM fileindex ' | |
694 r'WHERE deltabaseid=? OR deltaid=?', | |
695 (censoreddeltaid, censoreddeltaid))) | |
696 | |
697 for row in rows: | |
698 rid, pathid, node = row | |
699 | |
700 fulltext = resolvedeltachain(self._db, pathid, node, {}, {-1: None}, | |
701 zstddctx=self._dctx) | |
702 | |
703 deltahash = hashlib.sha1(fulltext).digest() | |
704 | |
705 if self._compengine == 'zstd': | |
706 deltablob = self._cctx.compress(fulltext) | |
707 compression = COMPRESSION_ZSTD | |
708 elif self._compengine == 'zlib': | |
709 deltablob = zlib.compress(fulltext) | |
710 compression = COMPRESSION_ZLIB | |
711 elif self._compengine == 'none': | |
712 deltablob = fulltext | |
713 compression = COMPRESSION_NONE | |
714 else: | |
715 raise error.ProgrammingError('unhandled compression engine: %s' | |
716 % self._compengine) | |
717 | |
718 if len(deltablob) >= len(fulltext): | |
719 deltablob = fulltext | |
720 compression = COMPRESSION_NONE | |
721 | |
722 deltaid = insertdelta(self._db, compression, deltahash, deltablob) | |
723 | |
724 self._db.execute( | |
725 r'UPDATE fileindex SET deltaid=?, deltabaseid=NULL ' | |
726 r'WHERE id=?', (deltaid, rid)) | |
727 | |
728 # Now create the tombstone delta and replace the delta on the censored | |
729 # node. | |
730 deltahash = hashlib.sha1(tombstone).digest() | |
731 tombstonedeltaid = insertdelta(self._db, COMPRESSION_NONE, | |
732 deltahash, tombstone) | |
733 | |
734 flags = self._revisions[censornode].flags | |
735 flags |= FLAG_CENSORED | |
736 | |
737 self._db.execute( | |
738 r'UPDATE fileindex SET flags=?, deltaid=?, deltabaseid=NULL ' | |
739 r'WHERE pathid=? AND node=?', | |
740 (flags, tombstonedeltaid, self._pathid, censornode)) | |
741 | |
742 self._db.execute( | |
743 r'DELETE FROM delta WHERE id=?', (censoreddeltaid,)) | |
744 | |
745 self._refreshindex() | |
746 self._revisioncache.clear() | |
747 | |
748 def getstrippoint(self, minlink): | |
749 return storageutil.resolvestripinfo(minlink, len(self) - 1, | |
750 [self.rev(n) for n in self.heads()], | |
751 self.linkrev, | |
752 self.parentrevs) | |
753 | |
754 def strip(self, minlink, transaction): | |
755 if not len(self): | |
756 return | |
757 | |
758 rev, _ignored = self.getstrippoint(minlink) | |
759 | |
760 if rev == len(self): | |
761 return | |
762 | |
763 for rev in self.revs(rev): | |
764 self._db.execute( | |
765 r'DELETE FROM fileindex WHERE pathid=? AND node=?', | |
766 (self._pathid, self.node(rev))) | |
767 | |
768 # TODO how should we garbage collect data in delta table? | |
769 | |
770 self._refreshindex() | |
771 | |
772 # End of ifilemutation interface. | |
773 | |
774 # Start of ifilestorage interface. | |
775 | |
776 def files(self): | |
777 return [] | |
778 | |
779 def storageinfo(self, exclusivefiles=False, sharedfiles=False, | |
780 revisionscount=False, trackedsize=False, | |
781 storedsize=False): | |
782 d = {} | |
783 | |
784 if exclusivefiles: | |
785 d['exclusivefiles'] = [] | |
786 | |
787 if sharedfiles: | |
788 # TODO list sqlite file(s) here. | |
789 d['sharedfiles'] = [] | |
790 | |
791 if revisionscount: | |
792 d['revisionscount'] = len(self) | |
793 | |
794 if trackedsize: | |
795 d['trackedsize'] = sum(len(self.revision(node)) | |
796 for node in self._nodetorev) | |
797 | |
798 if storedsize: | |
799 # TODO implement this? | |
800 d['storedsize'] = None | |
801 | |
802 return d | |
803 | |
804 def verifyintegrity(self, state): | |
805 state['skipread'] = set() | |
806 | |
807 for rev in self: | |
808 node = self.node(rev) | |
809 | |
810 try: | |
811 self.revision(node) | |
812 except Exception as e: | |
813 yield sqliteproblem( | |
814 error=_('unpacking %s: %s') % (short(node), e), | |
815 node=node) | |
816 | |
817 state['skipread'].add(node) | |
818 | |
819 # End of ifilestorage interface. | |
820 | |
821 def _checkhash(self, fulltext, node, p1=None, p2=None): | |
822 if p1 is None and p2 is None: | |
823 p1, p2 = self.parents(node) | |
824 | |
825 if node == storageutil.hashrevisionsha1(fulltext, p1, p2): | |
826 return | |
827 | |
828 try: | |
829 del self._revisioncache[node] | |
830 except KeyError: | |
831 pass | |
832 | |
833 if storageutil.iscensoredtext(fulltext): | |
834 raise error.CensoredNodeError(self._path, node, fulltext) | |
835 | |
836 raise SQLiteStoreError(_('integrity check failed on %s') % | |
837 self._path) | |
838 | |
839 def _addrawrevision(self, node, revisiondata, transaction, linkrev, | |
840 p1, p2, storedelta=None, flags=0): | |
841 if self._pathid is None: | |
842 res = self._db.execute( | |
843 r'INSERT INTO filepath (path) VALUES (?)', (self._path,)) | |
844 self._pathid = res.lastrowid | |
845 | |
846 # For simplicity, always store a delta against p1. | |
847 # TODO we need a lot more logic here to make behavior reasonable. | |
848 | |
849 if storedelta: | |
850 deltabase, delta = storedelta | |
851 | |
852 if isinstance(deltabase, int): | |
853 deltabase = self.node(deltabase) | |
854 | |
855 else: | |
856 assert revisiondata is not None | |
857 deltabase = p1 | |
858 | |
859 if deltabase == nullid: | |
860 delta = revisiondata | |
861 else: | |
862 delta = mdiff.textdiff(self.revision(self.rev(deltabase)), | |
863 revisiondata) | |
864 | |
865 # File index stores a pointer to its delta and the parent delta. | |
866 # The parent delta is stored via a pointer to the fileindex PK. | |
867 if deltabase == nullid: | |
868 baseid = None | |
869 else: | |
870 baseid = self._revisions[deltabase].rid | |
871 | |
872 # Deltas are stored with a hash of their content. This allows | |
873 # us to de-duplicate. The table is configured to ignore conflicts | |
874 # and it is faster to just insert and silently noop than to look | |
875 # first. | |
876 deltahash = hashlib.sha1(delta).digest() | |
877 | |
878 if self._compengine == 'zstd': | |
879 deltablob = self._cctx.compress(delta) | |
880 compression = COMPRESSION_ZSTD | |
881 elif self._compengine == 'zlib': | |
882 deltablob = zlib.compress(delta) | |
883 compression = COMPRESSION_ZLIB | |
884 elif self._compengine == 'none': | |
885 deltablob = delta | |
886 compression = COMPRESSION_NONE | |
887 else: | |
888 raise error.ProgrammingError('unhandled compression engine: %s' % | |
889 self._compengine) | |
890 | |
891 # Don't store compressed data if it isn't practical. | |
892 if len(deltablob) >= len(delta): | |
893 deltablob = delta | |
894 compression = COMPRESSION_NONE | |
895 | |
896 deltaid = insertdelta(self._db, compression, deltahash, deltablob) | |
897 | |
898 rev = len(self) | |
899 | |
900 if p1 == nullid: | |
901 p1rev = nullrev | |
902 else: | |
903 p1rev = self._nodetorev[p1] | |
904 | |
905 if p2 == nullid: | |
906 p2rev = nullrev | |
907 else: | |
908 p2rev = self._nodetorev[p2] | |
909 | |
910 rid = self._db.execute( | |
911 r'INSERT INTO fileindex (' | |
912 r' pathid, revnum, node, p1rev, p2rev, linkrev, flags, ' | |
913 r' deltaid, deltabaseid) ' | |
914 r' VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)', | |
915 (self._pathid, rev, node, p1rev, p2rev, linkrev, flags, | |
916 deltaid, baseid) | |
917 ).lastrowid | |
918 | |
919 entry = revisionentry( | |
920 rid=rid, | |
921 rev=rev, | |
922 node=node, | |
923 p1rev=p1rev, | |
924 p2rev=p2rev, | |
925 p1node=p1, | |
926 p2node=p2, | |
927 linkrev=linkrev, | |
928 flags=flags) | |
929 | |
930 self._nodetorev[node] = rev | |
931 self._revtonode[rev] = node | |
932 self._revisions[node] = entry | |
933 | |
934 return node | |
935 | |
936 class sqliterepository(localrepo.localrepository): | |
937 def cancopy(self): | |
938 return False | |
939 | |
940 def transaction(self, *args, **kwargs): | |
941 current = self.currenttransaction() | |
942 | |
943 tr = super(sqliterepository, self).transaction(*args, **kwargs) | |
944 | |
945 if current: | |
946 return tr | |
947 | |
948 self._dbconn.execute(r'BEGIN TRANSACTION') | |
949 | |
950 def committransaction(_): | |
951 self._dbconn.commit() | |
952 | |
953 tr.addfinalize('sqlitestore', committransaction) | |
954 | |
955 return tr | |
956 | |
957 @property | |
958 def _dbconn(self): | |
959 # SQLite connections can only be used on the thread that created | |
960 # them. In most cases, this "just works." However, hgweb uses | |
961 # multiple threads. | |
962 tid = threading.current_thread().ident | |
963 | |
964 if self._db: | |
965 if self._db[0] == tid: | |
966 return self._db[1] | |
967 | |
968 db = makedb(self.svfs.join('db.sqlite')) | |
969 self._db = (tid, db) | |
970 | |
971 return db | |
972 | |
973 def makedb(path): | |
974 """Construct a database handle for a database at path.""" | |
975 | |
976 db = sqlite3.connect(path) | |
977 db.text_factory = bytes | |
978 | |
979 res = db.execute(r'PRAGMA user_version').fetchone()[0] | |
980 | |
981 # New database. | |
982 if res == 0: | |
983 for statement in CREATE_SCHEMA: | |
984 db.execute(statement) | |
985 | |
986 db.commit() | |
987 | |
988 elif res == CURRENT_SCHEMA_VERSION: | |
989 pass | |
990 | |
991 else: | |
992 raise error.Abort(_('sqlite database has unrecognized version')) | |
993 | |
994 db.execute(r'PRAGMA journal_mode=WAL') | |
995 | |
996 return db | |
997 | |
998 def featuresetup(ui, supported): | |
999 supported.add(REQUIREMENT) | |
1000 | |
1001 if zstd: | |
1002 supported.add(REQUIREMENT_ZSTD) | |
1003 | |
1004 supported.add(REQUIREMENT_ZLIB) | |
1005 supported.add(REQUIREMENT_NONE) | |
1006 | |
1007 def newreporequirements(orig, ui, createopts): | |
1008 if createopts['backend'] != 'sqlite': | |
1009 return orig(ui, createopts) | |
1010 | |
1011 # This restriction can be lifted once we have more confidence. | |
1012 if 'sharedrepo' in createopts: | |
1013 raise error.Abort(_('shared repositories not supported with SQLite ' | |
1014 'store')) | |
1015 | |
1016 # This filtering is out of an abundance of caution: we want to ensure | |
1017 # we honor creation options and we do that by annotating exactly the | |
1018 # creation options we recognize. | |
1019 known = { | |
1020 'narrowfiles', | |
1021 'backend', | |
1022 } | |
1023 | |
1024 unsupported = set(createopts) - known | |
1025 if unsupported: | |
1026 raise error.Abort(_('SQLite store does not support repo creation ' | |
1027 'option: %s') % ', '.join(sorted(unsupported))) | |
1028 | |
1029 # Since we're a hybrid store that still relies on revlogs, we fall back | |
1030 # to using the revlogv1 backend's storage requirements then adding our | |
1031 # own requirement. | |
1032 createopts['backend'] = 'revlogv1' | |
1033 requirements = orig(ui, createopts) | |
1034 requirements.add(REQUIREMENT) | |
1035 | |
1036 compression = ui.config('storage', 'sqlite.compression') | |
1037 | |
1038 if compression == 'zstd' and not zstd: | |
1039 raise error.Abort(_('storage.sqlite.compression set to "zstd" but ' | |
1040 'zstandard compression not available to this ' | |
1041 'Mercurial install')) | |
1042 | |
1043 if compression == 'zstd': | |
1044 requirements.add(REQUIREMENT_ZSTD) | |
1045 elif compression == 'zlib': | |
1046 requirements.add(REQUIREMENT_ZLIB) | |
1047 elif compression == 'none': | |
1048 requirements.add(REQUIREMENT_NONE) | |
1049 else: | |
1050 raise error.Abort(_('unknown compression engine defined in ' | |
1051 'storage.sqlite.compression: %s') % compression) | |
1052 | |
1053 return requirements | |
1054 | |
1055 @interfaceutil.implementer(repository.ilocalrepositoryfilestorage) | |
1056 class sqlitefilestorage(object): | |
1057 """Repository file storage backed by SQLite.""" | |
1058 def file(self, path): | |
1059 if path[0] == b'/': | |
1060 path = path[1:] | |
1061 | |
1062 if REQUIREMENT_ZSTD in self.requirements: | |
1063 compression = 'zstd' | |
1064 elif REQUIREMENT_ZLIB in self.requirements: | |
1065 compression = 'zlib' | |
1066 elif REQUIREMENT_NONE in self.requirements: | |
1067 compression = 'none' | |
1068 else: | |
1069 raise error.Abort(_('unable to determine what compression engine ' | |
1070 'to use for SQLite storage')) | |
1071 | |
1072 return sqlitefilestore(self._dbconn, path, compression) | |
1073 | |
1074 def makefilestorage(orig, requirements, **kwargs): | |
1075 """Produce a type conforming to ``ilocalrepositoryfilestorage``.""" | |
1076 if REQUIREMENT in requirements: | |
1077 return sqlitefilestorage | |
1078 else: | |
1079 return orig(requirements=requirements, **kwargs) | |
1080 | |
1081 def makemain(orig, ui, requirements, **kwargs): | |
1082 if REQUIREMENT in requirements: | |
1083 if REQUIREMENT_ZSTD in requirements and not zstd: | |
1084 raise error.Abort(_('repository uses zstandard compression, which ' | |
1085 'is not available to this Mercurial install')) | |
1086 | |
1087 return sqliterepository | |
1088 | |
1089 return orig(requirements=requirements, **kwargs) | |
1090 | |
1091 def verifierinit(orig, self, *args, **kwargs): | |
1092 orig(self, *args, **kwargs) | |
1093 | |
1094 # We don't care that files in the store don't align with what is | |
1095 # advertised. So suppress these warnings. | |
1096 self.warnorphanstorefiles = False | |
1097 | |
1098 def extsetup(ui): | |
1099 localrepo.featuresetupfuncs.add(featuresetup) | |
1100 extensions.wrapfunction(localrepo, 'newreporequirements', | |
1101 newreporequirements) | |
1102 extensions.wrapfunction(localrepo, 'makefilestorage', | |
1103 makefilestorage) | |
1104 extensions.wrapfunction(localrepo, 'makemain', | |
1105 makemain) | |
1106 extensions.wrapfunction(verify.verifier, '__init__', | |
1107 verifierinit) | |
1108 | |
1109 def reposetup(ui, repo): | |
1110 if isinstance(repo, sqliterepository): | |
1111 repo._db = None | |
1112 | |
1113 # TODO check for bundlerepository? |