view tests/simplestorerepo.py @ 40326:fed697fa1734

sqlitestore: file storage backend using SQLite This commit provides an extension which uses SQLite to store file data (as opposed to revlogs). As the inline documentation describes, there are still several aspects to the extension that are incomplete. But it's a start. The extension does support basic clone, checkout, and commit workflows, which makes it suitable for simple use cases. One notable missing feature is support for "bundlerepos." This is probably responsible for the most test failures when the extension is activated as part of the test suite. All revision data is stored in SQLite. Data is stored as zstd compressed chunks (default if zstd is available), zlib compressed chunks (default if zstd is not available), or raw chunks (if configured or if a compressed delta is not smaller than the raw delta). This makes things very similar to revlogs. Unlike revlogs, the extension doesn't yet enforce a limit on delta chain length. This is an obvious limitation and should be addressed. This is somewhat mitigated by the use of zstd, which is much faster than zlib to decompress. There is a dedicated table for storing deltas. Deltas are stored by the SHA-1 hash of their uncompressed content. The "fileindex" table has columns that reference the delta for each revision and the base delta that delta should be applied against. A recursive SQL query is used to resolve the delta chain along with the delta data. By storing deltas by hash, we are able to de-duplicate delta storage! With revlogs, the same deltas in different revlogs would result in duplicate storage of that delta. In this scheme, inserting the duplicate delta is a no-op and delta chains simply reference the existing delta. When initially implementing this extension, I did not have content-indexed deltas and deltas could be duplicated across files (just like revlogs). When I implemented content-indexed deltas, the size of the SQLite database for a full clone of mozilla-unified dropped: before: 2,554,261,504 bytes after: 2,488,754,176 bytes Surprisingly, this is still larger than the bytes size of revlog files: revlog files: 2,104,861,230 bytes du -b: 2,254,381,614 I would have expected storage to be smaller since we're not limiting delta chain length and since we're using zstd instead of zlib. I suspect the SQLite indexes and per-column overhead account for the bulk of the differences. (Keep in mind that revlog uses a 64-byte packed struct for revision index data and deltas are stored without padding. Aside from the 12 unused bytes in the 32 byte node field, revlogs are pretty efficient.) Another source of overhead is file name storage. With revlogs, file names are stored in the filesystem. But with SQLite, we need to store file names in the database. This is roughly equivalent to the size of the fncache file, which for the mozilla-unified repository is ~34MB. Since the SQLite database isn't append-only and since delta chains can reference any delta, this opens some interesting possibilities. For example, we could store deltas in reverse, such that fulltexts are stored for newer revisions and deltas are applied to reconstruct older revisions. This is likely a more optimal storage strategy for version control, as new data tends to be more frequently accessed than old data. We would obviously need wire protocol support for transferring revision data from newest to oldest. And we would probably need some kind of mechanism for "re-encoding" stores. But it should be doable. This extension is very much experimental quality. There are a handful of features that don't work. It probably isn't suitable for day-to-day use. But it could be used in limited cases (e.g. read-only checkouts like in CI). And it is also a good proving ground for alternate storage backends. As we continue to define interfaces for all things storage, it will be useful to have a viable alternate storage backend to see how things shake out in practice. test-storage.py passes on Python 2 and introduces no new test failures on Python 3. Having the storage-level unit tests has proved to be insanely useful when developing this extension. Those tests caught numerous bugs during development and I'm convinced this style of testing is the way forward for ensuring alternate storage backends work as intended. Of course, test coverage isn't close to what it needs to be. But it is a start. And what coverage we have gives me confidence that basic store functionality is implemented properly. Differential Revision: https://phab.mercurial-scm.org/D4928
author Gregory Szorc <gregory.szorc@gmail.com>
date Tue, 09 Oct 2018 08:50:13 -0700
parents 8e398628a3f2
children 1b183edbb68e
line wrap: on
line source

# simplestorerepo.py - Extension that swaps in alternate repository storage.
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

# To use this with the test suite:
#
#   $ HGREPOFEATURES="simplestore" ./run-tests.py \
#       --extra-config-opt extensions.simplestore=`pwd`/simplestorerepo.py

from __future__ import absolute_import

import stat

from mercurial.i18n import _
from mercurial.node import (
    bin,
    hex,
    nullid,
    nullrev,
)
from mercurial.thirdparty import (
    attr,
    cbor,
)
from mercurial import (
    ancestor,
    bundlerepo,
    error,
    extensions,
    localrepo,
    mdiff,
    pycompat,
    repository,
    revlog,
    store,
    verify,
)
from mercurial.utils import (
    interfaceutil,
    storageutil,
)

# Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
# extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
# be specifying the version(s) of Mercurial they are tested with, or
# leave the attribute unspecified.
testedwith = 'ships-with-hg-core'

REQUIREMENT = 'testonly-simplestore'

def validatenode(node):
    if isinstance(node, int):
        raise ValueError('expected node; got int')

    if len(node) != 20:
        raise ValueError('expected 20 byte node')

def validaterev(rev):
    if not isinstance(rev, int):
        raise ValueError('expected int')

class simplestoreerror(error.StorageError):
    pass

@interfaceutil.implementer(repository.irevisiondelta)
@attr.s(slots=True, frozen=True)
class simplestorerevisiondelta(object):
    node = attr.ib()
    p1node = attr.ib()
    p2node = attr.ib()
    basenode = attr.ib()
    linknode = attr.ib()
    flags = attr.ib()
    baserevisionsize = attr.ib()
    revision = attr.ib()
    delta = attr.ib()

@interfaceutil.implementer(repository.ifilestorage)
class filestorage(object):
    """Implements storage for a tracked path.

    Data is stored in the VFS in a directory corresponding to the tracked
    path.

    Index data is stored in an ``index`` file using CBOR.

    Fulltext data is stored in files having names of the node.
    """

    def __init__(self, svfs, path):
        self._svfs = svfs
        self._path = path

        self._storepath = b'/'.join([b'data', path])
        self._indexpath = b'/'.join([self._storepath, b'index'])

        indexdata = self._svfs.tryread(self._indexpath)
        if indexdata:
            indexdata = cbor.loads(indexdata)

        self._indexdata = indexdata or []
        self._indexbynode = {}
        self._indexbyrev = {}
        self._index = []
        self._refreshindex()

    def _refreshindex(self):
        self._indexbynode.clear()
        self._indexbyrev.clear()
        self._index = []

        for i, entry in enumerate(self._indexdata):
            self._indexbynode[entry[b'node']] = entry
            self._indexbyrev[i] = entry

        self._indexbynode[nullid] = {
            b'node': nullid,
            b'p1': nullid,
            b'p2': nullid,
            b'linkrev': nullrev,
            b'flags': 0,
        }

        self._indexbyrev[nullrev] = {
            b'node': nullid,
            b'p1': nullid,
            b'p2': nullid,
            b'linkrev': nullrev,
            b'flags': 0,
        }

        for i, entry in enumerate(self._indexdata):
            p1rev, p2rev = self.parentrevs(self.rev(entry[b'node']))

            # start, length, rawsize, chainbase, linkrev, p1, p2, node
            self._index.append((0, 0, 0, -1, entry[b'linkrev'], p1rev, p2rev,
                                entry[b'node']))

        self._index.append((0, 0, 0, -1, -1, -1, -1, nullid))

    def __len__(self):
        return len(self._indexdata)

    def __iter__(self):
        return iter(range(len(self)))

    def revs(self, start=0, stop=None):
        step = 1
        if stop is not None:
            if start > stop:
                step = -1

            stop += step
        else:
            stop = len(self)

        return range(start, stop, step)

    def parents(self, node):
        validatenode(node)

        if node not in self._indexbynode:
            raise KeyError('unknown node')

        entry = self._indexbynode[node]

        return entry[b'p1'], entry[b'p2']

    def parentrevs(self, rev):
        p1, p2 = self.parents(self._indexbyrev[rev][b'node'])
        return self.rev(p1), self.rev(p2)

    def rev(self, node):
        validatenode(node)

        try:
            self._indexbynode[node]
        except KeyError:
            raise error.LookupError(node, self._indexpath, _('no node'))

        for rev, entry in self._indexbyrev.items():
            if entry[b'node'] == node:
                return rev

        raise error.ProgrammingError('this should not occur')

    def node(self, rev):
        validaterev(rev)

        return self._indexbyrev[rev][b'node']

    def lookup(self, node):
        if isinstance(node, int):
            return self.node(node)

        if len(node) == 20:
            self.rev(node)
            return node

        try:
            rev = int(node)
            if '%d' % rev != node:
                raise ValueError

            if rev < 0:
                rev = len(self) + rev
            if rev < 0 or rev >= len(self):
                raise ValueError

            return self.node(rev)
        except (ValueError, OverflowError):
            pass

        if len(node) == 40:
            try:
                rawnode = bin(node)
                self.rev(rawnode)
                return rawnode
            except TypeError:
                pass

        raise error.LookupError(node, self._path, _('invalid lookup input'))

    def linkrev(self, rev):
        validaterev(rev)

        return self._indexbyrev[rev][b'linkrev']

    def _flags(self, rev):
        validaterev(rev)

        return self._indexbyrev[rev][b'flags']

    def _candelta(self, baserev, rev):
        validaterev(baserev)
        validaterev(rev)

        if ((self._flags(baserev) & revlog.REVIDX_RAWTEXT_CHANGING_FLAGS)
            or (self._flags(rev) & revlog.REVIDX_RAWTEXT_CHANGING_FLAGS)):
            return False

        return True

    def _processflags(self, text, flags, operation, raw=False):
        if flags == 0:
            return text, True

        if flags & ~revlog.REVIDX_KNOWN_FLAGS:
            raise simplestoreerror(_("incompatible revision flag '%#x'") %
                                   (flags & ~revlog.REVIDX_KNOWN_FLAGS))

        validatehash = True
        # Depending on the operation (read or write), the order might be
        # reversed due to non-commutative transforms.
        orderedflags = revlog.REVIDX_FLAGS_ORDER
        if operation == 'write':
            orderedflags = reversed(orderedflags)

        for flag in orderedflags:
            # If a flagprocessor has been registered for a known flag, apply the
            # related operation transform and update result tuple.
            if flag & flags:
                vhash = True

                if flag not in revlog._flagprocessors:
                    message = _("missing processor for flag '%#x'") % (flag)
                    raise simplestoreerror(message)

                processor = revlog._flagprocessors[flag]
                if processor is not None:
                    readtransform, writetransform, rawtransform = processor

                    if raw:
                        vhash = rawtransform(self, text)
                    elif operation == 'read':
                        text, vhash = readtransform(self, text)
                    else:  # write operation
                        text, vhash = writetransform(self, text)
                validatehash = validatehash and vhash

        return text, validatehash

    def checkhash(self, text, node, p1=None, p2=None, rev=None):
        if p1 is None and p2 is None:
            p1, p2 = self.parents(node)
        if node != storageutil.hashrevisionsha1(text, p1, p2):
            raise simplestoreerror(_("integrity check failed on %s") %
                self._path)

    def revision(self, node, raw=False):
        validatenode(node)

        if node == nullid:
            return b''

        rev = self.rev(node)
        flags = self._flags(rev)

        path = b'/'.join([self._storepath, hex(node)])
        rawtext = self._svfs.read(path)

        text, validatehash = self._processflags(rawtext, flags, 'read', raw=raw)
        if validatehash:
            self.checkhash(text, node, rev=rev)

        return text

    def read(self, node):
        validatenode(node)

        revision = self.revision(node)

        if not revision.startswith(b'\1\n'):
            return revision

        start = revision.index(b'\1\n', 2)
        return revision[start + 2:]

    def renamed(self, node):
        validatenode(node)

        if self.parents(node)[0] != nullid:
            return False

        fulltext = self.revision(node)
        m = storageutil.parsemeta(fulltext)[0]

        if m and 'copy' in m:
            return m['copy'], bin(m['copyrev'])

        return False

    def cmp(self, node, text):
        validatenode(node)

        t = text

        if text.startswith(b'\1\n'):
            t = b'\1\n\1\n' + text

        p1, p2 = self.parents(node)

        if storageutil.hashrevisionsha1(t, p1, p2) == node:
            return False

        if self.iscensored(self.rev(node)):
            return text != b''

        if self.renamed(node):
            t2 = self.read(node)
            return t2 != text

        return True

    def size(self, rev):
        validaterev(rev)

        node = self._indexbyrev[rev][b'node']

        if self.renamed(node):
            return len(self.read(node))

        if self.iscensored(rev):
            return 0

        return len(self.revision(node))

    def iscensored(self, rev):
        validaterev(rev)

        return self._flags(rev) & repository.REVISION_FLAG_CENSORED

    def commonancestorsheads(self, a, b):
        validatenode(a)
        validatenode(b)

        a = self.rev(a)
        b = self.rev(b)

        ancestors = ancestor.commonancestorsheads(self.parentrevs, a, b)
        return pycompat.maplist(self.node, ancestors)

    def descendants(self, revs):
        # This is a copy of revlog.descendants()
        first = min(revs)
        if first == nullrev:
            for i in self:
                yield i
            return

        seen = set(revs)
        for i in self.revs(start=first + 1):
            for x in self.parentrevs(i):
                if x != nullrev and x in seen:
                    seen.add(i)
                    yield i
                    break

    # Required by verify.
    def files(self):
        entries = self._svfs.listdir(self._storepath)

        # Strip out undo.backup.* files created as part of transaction
        # recording.
        entries = [f for f in entries if not f.startswith('undo.backup.')]

        return [b'/'.join((self._storepath, f)) for f in entries]

    def add(self, text, meta, transaction, linkrev, p1, p2):
        if meta or text.startswith(b'\1\n'):
            text = storageutil.packmeta(meta, text)

        return self.addrevision(text, transaction, linkrev, p1, p2)

    def addrevision(self, text, transaction, linkrev, p1, p2, node=None,
                    flags=revlog.REVIDX_DEFAULT_FLAGS, cachedelta=None):
        validatenode(p1)
        validatenode(p2)

        if flags:
            node = node or storageutil.hashrevisionsha1(text, p1, p2)

        rawtext, validatehash = self._processflags(text, flags, 'write')

        node = node or storageutil.hashrevisionsha1(text, p1, p2)

        if node in self._indexbynode:
            return node

        if validatehash:
            self.checkhash(rawtext, node, p1=p1, p2=p2)

        return self._addrawrevision(node, rawtext, transaction, linkrev, p1, p2,
                                    flags)

    def _addrawrevision(self, node, rawtext, transaction, link, p1, p2, flags):
        transaction.addbackup(self._indexpath)

        path = b'/'.join([self._storepath, hex(node)])

        self._svfs.write(path, rawtext)

        self._indexdata.append({
            b'node': node,
            b'p1': p1,
            b'p2': p2,
            b'linkrev': link,
            b'flags': flags,
        })

        self._reflectindexupdate()

        return node

    def _reflectindexupdate(self):
        self._refreshindex()
        self._svfs.write(self._indexpath, cbor.dumps(self._indexdata))

    def addgroup(self, deltas, linkmapper, transaction, addrevisioncb=None):
        nodes = []

        transaction.addbackup(self._indexpath)

        for node, p1, p2, linknode, deltabase, delta, flags in deltas:
            linkrev = linkmapper(linknode)
            flags = flags or revlog.REVIDX_DEFAULT_FLAGS

            nodes.append(node)

            if node in self._indexbynode:
                continue

            # Need to resolve the fulltext from the delta base.
            if deltabase == nullid:
                text = mdiff.patch(b'', delta)
            else:
                text = mdiff.patch(self.revision(deltabase), delta)

            self._addrawrevision(node, text, transaction, linkrev, p1, p2,
                                 flags)

            if addrevisioncb:
                addrevisioncb(self, node)

        return nodes

    def heads(self, start=None, stop=None):
        # This is copied from revlog.py.
        if start is None and stop is None:
            if not len(self):
                return [nullid]
            return [self.node(r) for r in self.headrevs()]

        if start is None:
            start = nullid
        if stop is None:
            stop = []
        stoprevs = set([self.rev(n) for n in stop])
        startrev = self.rev(start)
        reachable = {startrev}
        heads = {startrev}

        parentrevs = self.parentrevs
        for r in self.revs(start=startrev + 1):
            for p in parentrevs(r):
                if p in reachable:
                    if r not in stoprevs:
                        reachable.add(r)
                    heads.add(r)
                if p in heads and p not in stoprevs:
                    heads.remove(p)

        return [self.node(r) for r in heads]

    def children(self, node):
        validatenode(node)

        # This is a copy of revlog.children().
        c = []
        p = self.rev(node)
        for r in self.revs(start=p + 1):
            prevs = [pr for pr in self.parentrevs(r) if pr != nullrev]
            if prevs:
                for pr in prevs:
                    if pr == p:
                        c.append(self.node(r))
            elif p == nullrev:
                c.append(self.node(r))
        return c

    def getstrippoint(self, minlink):

        # This is largely a copy of revlog.getstrippoint().
        brokenrevs = set()
        strippoint = len(self)

        heads = {}
        futurelargelinkrevs = set()
        for head in self.heads():
            headlinkrev = self.linkrev(self.rev(head))
            heads[head] = headlinkrev
            if headlinkrev >= minlink:
                futurelargelinkrevs.add(headlinkrev)

        # This algorithm involves walking down the rev graph, starting at the
        # heads. Since the revs are topologically sorted according to linkrev,
        # once all head linkrevs are below the minlink, we know there are
        # no more revs that could have a linkrev greater than minlink.
        # So we can stop walking.
        while futurelargelinkrevs:
            strippoint -= 1
            linkrev = heads.pop(strippoint)

            if linkrev < minlink:
                brokenrevs.add(strippoint)
            else:
                futurelargelinkrevs.remove(linkrev)

            for p in self.parentrevs(strippoint):
                if p != nullrev:
                    plinkrev = self.linkrev(p)
                    heads[p] = plinkrev
                    if plinkrev >= minlink:
                        futurelargelinkrevs.add(plinkrev)

        return strippoint, brokenrevs

    def strip(self, minlink, transaction):
        if not len(self):
            return

        rev, _ignored = self.getstrippoint(minlink)
        if rev == len(self):
            return

        # Purge index data starting at the requested revision.
        self._indexdata[rev:] = []
        self._reflectindexupdate()

def issimplestorefile(f, kind, st):
    if kind != stat.S_IFREG:
        return False

    if store.isrevlog(f, kind, st):
        return False

    # Ignore transaction undo files.
    if f.startswith('undo.'):
        return False

    # Otherwise assume it belongs to the simple store.
    return True

class simplestore(store.encodedstore):
    def datafiles(self):
        for x in super(simplestore, self).datafiles():
            yield x

        # Supplement with non-revlog files.
        extrafiles = self._walk('data', True, filefilter=issimplestorefile)

        for unencoded, encoded, size in extrafiles:
            try:
                unencoded = store.decodefilename(unencoded)
            except KeyError:
                unencoded = None

            yield unencoded, encoded, size

def reposetup(ui, repo):
    if not repo.local():
        return

    if isinstance(repo, bundlerepo.bundlerepository):
        raise error.Abort(_('cannot use simple store with bundlerepo'))

    class simplestorerepo(repo.__class__):
        def file(self, f):
            return filestorage(self.svfs, f)

    repo.__class__ = simplestorerepo

def featuresetup(ui, supported):
    supported.add(REQUIREMENT)

def newreporequirements(orig, ui):
    """Modifies default requirements for new repos to use the simple store."""
    requirements = orig(ui)

    # These requirements are only used to affect creation of the store
    # object. We have our own store. So we can remove them.
    # TODO do this once we feel like taking the test hit.
    #if 'fncache' in requirements:
    #    requirements.remove('fncache')
    #if 'dotencode' in requirements:
    #    requirements.remove('dotencode')

    requirements.add(REQUIREMENT)

    return requirements

def makestore(orig, requirements, path, vfstype):
    if REQUIREMENT not in requirements:
        return orig(requirements, path, vfstype)

    return simplestore(path, vfstype)

def verifierinit(orig, self, *args, **kwargs):
    orig(self, *args, **kwargs)

    # We don't care that files in the store don't align with what is
    # advertised. So suppress these warnings.
    self.warnorphanstorefiles = False

def extsetup(ui):
    localrepo.featuresetupfuncs.add(featuresetup)

    extensions.wrapfunction(localrepo, 'newreporequirements',
                            newreporequirements)
    extensions.wrapfunction(store, 'store', makestore)
    extensions.wrapfunction(verify.verifier, '__init__', verifierinit)