Mercurial > hg-stable
diff hgext/remotefilelog/datapack.py @ 40545:3a333a582d7b
remotefilelog: import pruned-down remotefilelog extension from hg-experimental
This is remotefilelog as of my recent patches for compatibility with
current tip of hg, minus support for old versions of Mercurial and
some FB-specific features like their treemanifest extension and
fetching linkrev data from a patched phabricator. The file extutil.py
moved from hgext3rd to remotefilelog.
This is not yet ready to be landed, consider it a preview for
now. Planned changes include:
* replace lz4 with zstd
* rename some capabilities, requirements and wireproto commands to mark
them as experimental
* consolidate bits of shallowutil with related functions (eg readfile)
I'm certainly open to other (small) changes, but my rough mission is
to land this largely as-is so we can use it as a model of the
functionality we need going forward for lazy-fetching of file contents
from a server.
# no-check-commit because of a few foo_bar functions
Differential Revision: https://phab.mercurial-scm.org/D4782
author | Augie Fackler <augie@google.com> |
---|---|
date | Thu, 27 Sep 2018 13:03:19 -0400 |
parents | |
children | 10c10da14c5d |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/hgext/remotefilelog/datapack.py Thu Sep 27 13:03:19 2018 -0400 @@ -0,0 +1,470 @@ +from __future__ import absolute_import + +import struct + +from mercurial.node import hex, nullid +from mercurial.i18n import _ +from mercurial import ( + error, + pycompat, + util, +) +from . import ( + basepack, + constants, + lz4wrapper, + shallowutil, +) + +NODELENGTH = 20 + +# The indicator value in the index for a fulltext entry. +FULLTEXTINDEXMARK = -1 +NOBASEINDEXMARK = -2 + +INDEXSUFFIX = '.dataidx' +PACKSUFFIX = '.datapack' + +class datapackstore(basepack.basepackstore): + INDEXSUFFIX = INDEXSUFFIX + PACKSUFFIX = PACKSUFFIX + + def __init__(self, ui, path): + super(datapackstore, self).__init__(ui, path) + + def getpack(self, path): + return datapack(path) + + def get(self, name, node): + raise RuntimeError("must use getdeltachain with datapackstore") + + def getmeta(self, name, node): + for pack in self.packs: + try: + return pack.getmeta(name, node) + except KeyError: + pass + + for pack in self.refresh(): + try: + return pack.getmeta(name, node) + except KeyError: + pass + + raise KeyError((name, hex(node))) + + def getdelta(self, name, node): + for pack in self.packs: + try: + return pack.getdelta(name, node) + except KeyError: + pass + + for pack in self.refresh(): + try: + return pack.getdelta(name, node) + except KeyError: + pass + + raise KeyError((name, hex(node))) + + def getdeltachain(self, name, node): + for pack in self.packs: + try: + return pack.getdeltachain(name, node) + except KeyError: + pass + + for pack in self.refresh(): + try: + return pack.getdeltachain(name, node) + except KeyError: + pass + + raise KeyError((name, hex(node))) + + def add(self, name, node, data): + raise RuntimeError("cannot add to datapackstore") + +class datapack(basepack.basepack): + INDEXSUFFIX = INDEXSUFFIX + PACKSUFFIX = PACKSUFFIX + + # Format is <node><delta offset><pack data offset><pack data size> + # See the mutabledatapack doccomment for more details. + INDEXFORMAT = '!20siQQ' + INDEXENTRYLENGTH = 40 + + SUPPORTED_VERSIONS = [0, 1] + + def getmissing(self, keys): + missing = [] + for name, node in keys: + value = self._find(node) + if not value: + missing.append((name, node)) + + return missing + + def get(self, name, node): + raise RuntimeError("must use getdeltachain with datapack (%s:%s)" + % (name, hex(node))) + + def getmeta(self, name, node): + value = self._find(node) + if value is None: + raise KeyError((name, hex(node))) + + # version 0 does not support metadata + if self.VERSION == 0: + return {} + + node, deltabaseoffset, offset, size = value + rawentry = self._data[offset:offset + size] + + # see docstring of mutabledatapack for the format + offset = 0 + offset += struct.unpack_from('!H', rawentry, offset)[0] + 2 # filename + offset += 40 # node, deltabase node + offset += struct.unpack_from('!Q', rawentry, offset)[0] + 8 # delta + + metalen = struct.unpack_from('!I', rawentry, offset)[0] + offset += 4 + + meta = shallowutil.parsepackmeta(rawentry[offset:offset + metalen]) + + return meta + + def getdelta(self, name, node): + value = self._find(node) + if value is None: + raise KeyError((name, hex(node))) + + node, deltabaseoffset, offset, size = value + entry = self._readentry(offset, size, getmeta=True) + filename, node, deltabasenode, delta, meta = entry + + # If we've read a lot of data from the mmap, free some memory. + self.freememory() + + return delta, filename, deltabasenode, meta + + def getdeltachain(self, name, node): + value = self._find(node) + if value is None: + raise KeyError((name, hex(node))) + + params = self.params + + # Precompute chains + chain = [value] + deltabaseoffset = value[1] + entrylen = self.INDEXENTRYLENGTH + while (deltabaseoffset != FULLTEXTINDEXMARK + and deltabaseoffset != NOBASEINDEXMARK): + loc = params.indexstart + deltabaseoffset + value = struct.unpack(self.INDEXFORMAT, + self._index[loc:loc + entrylen]) + deltabaseoffset = value[1] + chain.append(value) + + # Read chain data + deltachain = [] + for node, deltabaseoffset, offset, size in chain: + filename, node, deltabasenode, delta = self._readentry(offset, size) + deltachain.append((filename, node, filename, deltabasenode, delta)) + + # If we've read a lot of data from the mmap, free some memory. + self.freememory() + + return deltachain + + def _readentry(self, offset, size, getmeta=False): + rawentry = self._data[offset:offset + size] + self._pagedin += len(rawentry) + + # <2 byte len> + <filename> + lengthsize = 2 + filenamelen = struct.unpack('!H', rawentry[:2])[0] + filename = rawentry[lengthsize:lengthsize + filenamelen] + + # <20 byte node> + <20 byte deltabase> + nodestart = lengthsize + filenamelen + deltabasestart = nodestart + NODELENGTH + node = rawentry[nodestart:deltabasestart] + deltabasenode = rawentry[deltabasestart:deltabasestart + NODELENGTH] + + # <8 byte len> + <delta> + deltastart = deltabasestart + NODELENGTH + rawdeltalen = rawentry[deltastart:deltastart + 8] + deltalen = struct.unpack('!Q', rawdeltalen)[0] + + delta = rawentry[deltastart + 8:deltastart + 8 + deltalen] + delta = lz4wrapper.lz4decompress(delta) + + if getmeta: + if self.VERSION == 0: + meta = {} + else: + metastart = deltastart + 8 + deltalen + metalen = struct.unpack_from('!I', rawentry, metastart)[0] + + rawmeta = rawentry[metastart + 4:metastart + 4 + metalen] + meta = shallowutil.parsepackmeta(rawmeta) + return filename, node, deltabasenode, delta, meta + else: + return filename, node, deltabasenode, delta + + def add(self, name, node, data): + raise RuntimeError("cannot add to datapack (%s:%s)" % (name, node)) + + def _find(self, node): + params = self.params + fanoutkey = struct.unpack(params.fanoutstruct, + node[:params.fanoutprefix])[0] + fanout = self._fanouttable + + start = fanout[fanoutkey] + params.indexstart + indexend = self._indexend + + # Scan forward to find the first non-same entry, which is the upper + # bound. + for i in pycompat.xrange(fanoutkey + 1, params.fanoutcount): + end = fanout[i] + params.indexstart + if end != start: + break + else: + end = indexend + + # Bisect between start and end to find node + index = self._index + startnode = index[start:start + NODELENGTH] + endnode = index[end:end + NODELENGTH] + entrylen = self.INDEXENTRYLENGTH + if startnode == node: + entry = index[start:start + entrylen] + elif endnode == node: + entry = index[end:end + entrylen] + else: + while start < end - entrylen: + mid = start + (end - start) / 2 + mid = mid - ((mid - params.indexstart) % entrylen) + midnode = index[mid:mid + NODELENGTH] + if midnode == node: + entry = index[mid:mid + entrylen] + break + if node > midnode: + start = mid + startnode = midnode + elif node < midnode: + end = mid + endnode = midnode + else: + return None + + return struct.unpack(self.INDEXFORMAT, entry) + + def markledger(self, ledger, options=None): + for filename, node in self: + ledger.markdataentry(self, filename, node) + + def cleanup(self, ledger): + entries = ledger.sources.get(self, []) + allkeys = set(self) + repackedkeys = set((e.filename, e.node) for e in entries if + e.datarepacked or e.gced) + + if len(allkeys - repackedkeys) == 0: + if self.path not in ledger.created: + util.unlinkpath(self.indexpath, ignoremissing=True) + util.unlinkpath(self.packpath, ignoremissing=True) + + def __iter__(self): + for f, n, deltabase, deltalen in self.iterentries(): + yield f, n + + def iterentries(self): + # Start at 1 to skip the header + offset = 1 + data = self._data + while offset < self.datasize: + oldoffset = offset + + # <2 byte len> + <filename> + filenamelen = struct.unpack('!H', data[offset:offset + 2])[0] + offset += 2 + filename = data[offset:offset + filenamelen] + offset += filenamelen + + # <20 byte node> + node = data[offset:offset + constants.NODESIZE] + offset += constants.NODESIZE + # <20 byte deltabase> + deltabase = data[offset:offset + constants.NODESIZE] + offset += constants.NODESIZE + + # <8 byte len> + <delta> + rawdeltalen = data[offset:offset + 8] + deltalen = struct.unpack('!Q', rawdeltalen)[0] + offset += 8 + + # it has to be at least long enough for the lz4 header. + assert deltalen >= 4 + + # python-lz4 stores the length of the uncompressed field as a + # little-endian 32-bit integer at the start of the data. + uncompressedlen = struct.unpack('<I', data[offset:offset + 4])[0] + offset += deltalen + + if self.VERSION == 1: + # <4 byte len> + <metadata-list> + metalen = struct.unpack_from('!I', data, offset)[0] + offset += 4 + metalen + + yield (filename, node, deltabase, uncompressedlen) + + # If we've read a lot of data from the mmap, free some memory. + self._pagedin += offset - oldoffset + if self.freememory(): + data = self._data + +class mutabledatapack(basepack.mutablebasepack): + """A class for constructing and serializing a datapack file and index. + + A datapack is a pair of files that contain the revision contents for various + file revisions in Mercurial. It contains only revision contents (like file + contents), not any history information. + + It consists of two files, with the following format. All bytes are in + network byte order (big endian). + + .datapack + The pack itself is a series of revision deltas with some basic header + information on each. A revision delta may be a fulltext, represented by + a deltabasenode equal to the nullid. + + datapack = <version: 1 byte> + [<revision>,...] + revision = <filename len: 2 byte unsigned int> + <filename> + <node: 20 byte> + <deltabasenode: 20 byte> + <delta len: 8 byte unsigned int> + <delta> + <metadata-list len: 4 byte unsigned int> [1] + <metadata-list> [1] + metadata-list = [<metadata-item>, ...] + metadata-item = <metadata-key: 1 byte> + <metadata-value len: 2 byte unsigned> + <metadata-value> + + metadata-key could be METAKEYFLAG or METAKEYSIZE or other single byte + value in the future. + + .dataidx + The index file consists of two parts, the fanout and the index. + + The index is a list of index entries, sorted by node (one per revision + in the pack). Each entry has: + + - node (The 20 byte node of the entry; i.e. the commit hash, file node + hash, etc) + - deltabase index offset (The location in the index of the deltabase for + this entry. The deltabase is the next delta in + the chain, with the chain eventually + terminating in a full-text, represented by a + deltabase offset of -1. This lets us compute + delta chains from the index, then do + sequential reads from the pack if the revision + are nearby on disk.) + - pack entry offset (The location of this entry in the datapack) + - pack content size (The on-disk length of this entry's pack data) + + The fanout is a quick lookup table to reduce the number of steps for + bisecting the index. It is a series of 4 byte pointers to positions + within the index. It has 2^16 entries, which corresponds to hash + prefixes [0000, 0001,..., FFFE, FFFF]. Example: the pointer in slot + 4F0A points to the index position of the first revision whose node + starts with 4F0A. This saves log(2^16)=16 bisect steps. + + dataidx = <fanouttable> + <index> + fanouttable = [<index offset: 4 byte unsigned int>,...] (2^16 entries) + index = [<index entry>,...] + indexentry = <node: 20 byte> + <deltabase location: 4 byte signed int> + <pack entry offset: 8 byte unsigned int> + <pack entry size: 8 byte unsigned int> + + [1]: new in version 1. + """ + INDEXSUFFIX = INDEXSUFFIX + PACKSUFFIX = PACKSUFFIX + + # v[01] index format: <node><delta offset><pack data offset><pack data size> + INDEXFORMAT = datapack.INDEXFORMAT + INDEXENTRYLENGTH = datapack.INDEXENTRYLENGTH + + # v1 has metadata support + SUPPORTED_VERSIONS = [0, 1] + + def add(self, name, node, deltabasenode, delta, metadata=None): + # metadata is a dict, ex. {METAKEYFLAG: flag} + if len(name) > 2**16: + raise RuntimeError(_("name too long %s") % name) + if len(node) != 20: + raise RuntimeError(_("node should be 20 bytes %s") % node) + + if node in self.entries: + # The revision has already been added + return + + # TODO: allow configurable compression + delta = lz4wrapper.lz4compress(delta) + + rawdata = ''.join(( + struct.pack('!H', len(name)), # unsigned 2 byte int + name, + node, + deltabasenode, + struct.pack('!Q', len(delta)), # unsigned 8 byte int + delta, + )) + + if self.VERSION == 1: + # v1 support metadata + rawmeta = shallowutil.buildpackmeta(metadata) + rawdata += struct.pack('!I', len(rawmeta)) # unsigned 4 byte + rawdata += rawmeta + else: + # v0 cannot store metadata, raise if metadata contains flag + if metadata and metadata.get(constants.METAKEYFLAG, 0) != 0: + raise error.ProgrammingError('v0 pack cannot store flags') + + offset = self.packfp.tell() + + size = len(rawdata) + + self.entries[node] = (deltabasenode, offset, size) + + self.writeraw(rawdata) + + def createindex(self, nodelocations, indexoffset): + entries = sorted((n, db, o, s) for n, (db, o, s) + in self.entries.iteritems()) + + rawindex = '' + fmt = self.INDEXFORMAT + for node, deltabase, offset, size in entries: + if deltabase == nullid: + deltabaselocation = FULLTEXTINDEXMARK + else: + # Instead of storing the deltabase node in the index, let's + # store a pointer directly to the index entry for the deltabase. + deltabaselocation = nodelocations.get(deltabase, + NOBASEINDEXMARK) + + entry = struct.pack(fmt, node, deltabaselocation, offset, size) + rawindex += entry + + return rawindex