view mercurial/revlogutils/nodemap.py @ 44338:2ea6a67ff502

nodemap: write new data from the expected current data length If the amount of data in the file exceed the expect amount, we will overwrite the extra data. This is a simple way to be safer. Differential Revision: https://phab.mercurial-scm.org/D7891
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Wed, 15 Jan 2020 15:50:43 +0100
parents 1d2b37def017
children c7eebdb15139
line wrap: on
line source

# nodemap.py - nodemap related code and utilities
#
# Copyright 2019 Pierre-Yves David <pierre-yves.david@octobus.net>
# Copyright 2019 George Racinet <georges.racinet@octobus.net>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from __future__ import absolute_import

import os
import re
import struct

from .. import (
    error,
    node as nodemod,
    util,
)


class NodeMap(dict):
    def __missing__(self, x):
        raise error.RevlogError(b'unknown node: %s' % x)


def persisted_data(revlog):
    """read the nodemap for a revlog from disk"""
    if revlog.nodemap_file is None:
        return None
    pdata = revlog.opener.tryread(revlog.nodemap_file)
    if not pdata:
        return None
    offset = 0
    (version,) = S_VERSION.unpack(pdata[offset : offset + S_VERSION.size])
    if version != ONDISK_VERSION:
        return None
    offset += S_VERSION.size
    headers = S_HEADER.unpack(pdata[offset : offset + S_HEADER.size])
    uid_size, tip_rev, data_length, data_unused = headers
    offset += S_HEADER.size
    docket = NodeMapDocket(pdata[offset : offset + uid_size])
    docket.tip_rev = tip_rev
    docket.data_length = data_length
    docket.data_unused = data_unused

    filename = _rawdata_filepath(revlog, docket)
    return docket, revlog.opener.tryread(filename)


def setup_persistent_nodemap(tr, revlog):
    """Install whatever is needed transaction side to persist a nodemap on disk

    (only actually persist the nodemap if this is relevant for this revlog)
    """
    if revlog._inline:
        return  # inlined revlog are too small for this to be relevant
    if revlog.nodemap_file is None:
        return  # we do not use persistent_nodemap on this revlog
    callback_id = b"revlog-persistent-nodemap-%s" % revlog.nodemap_file
    if tr.hasfinalize(callback_id):
        return  # no need to register again
    tr.addfinalize(callback_id, lambda tr: _persist_nodemap(tr, revlog))


def _persist_nodemap(tr, revlog):
    """Write nodemap data on disk for a given revlog
    """
    if getattr(revlog, 'filteredrevs', ()):
        raise error.ProgrammingError(
            "cannot persist nodemap of a filtered changelog"
        )
    if revlog.nodemap_file is None:
        msg = "calling persist nodemap on a revlog without the feature enableb"
        raise error.ProgrammingError(msg)

    can_incremental = util.safehasattr(revlog.index, "nodemap_data_incremental")
    ondisk_docket = revlog._nodemap_docket

    data = None
    # first attemp an incremental update of the data
    if can_incremental and ondisk_docket is not None:
        target_docket = revlog._nodemap_docket.copy()
        (
            src_docket,
            data_changed_count,
            data,
        ) = revlog.index.nodemap_data_incremental()
        if src_docket != target_docket:
            data = None
        else:
            datafile = _rawdata_filepath(revlog, target_docket)
            # EXP-TODO: if this is a cache, this should use a cache vfs, not a
            # store vfs
            with revlog.opener(datafile, b'r+') as fd:
                fd.seek(target_docket.data_length)
                fd.write(data)
            target_docket.data_length += len(data)
            target_docket.data_unused += data_changed_count

    if data is None:
        # otherwise fallback to a full new export
        target_docket = NodeMapDocket()
        datafile = _rawdata_filepath(revlog, target_docket)
        if util.safehasattr(revlog.index, "nodemap_data_all"):
            data = revlog.index.nodemap_data_all()
        else:
            data = persistent_data(revlog.index)
        # EXP-TODO: if this is a cache, this should use a cache vfs, not a
        # store vfs
        with revlog.opener(datafile, b'w') as fd:
            fd.write(data)
        target_docket.data_length = len(data)
    target_docket.tip_rev = revlog.tiprev()
    # EXP-TODO: if this is a cache, this should use a cache vfs, not a
    # store vfs
    with revlog.opener(revlog.nodemap_file, b'w', atomictemp=True) as fp:
        fp.write(target_docket.serialize())
    revlog._nodemap_docket = target_docket
    # EXP-TODO: if the transaction abort, we should remove the new data and
    # reinstall the old one.

    # search for old index file in all cases, some older process might have
    # left one behind.
    olds = _other_rawdata_filepath(revlog, target_docket)
    if olds:
        realvfs = getattr(revlog, '_realopener', revlog.opener)

        def cleanup(tr):
            for oldfile in olds:
                realvfs.tryunlink(oldfile)

        callback_id = b"revlog-cleanup-nodemap-%s" % revlog.nodemap_file
        tr.addpostclose(callback_id, cleanup)


### Nodemap docket file
#
# The nodemap data are stored on disk using 2 files:
#
# * a raw data files containing a persistent nodemap
#   (see `Nodemap Trie` section)
#
# * a small "docket" file containing medatadata
#
# While the nodemap data can be multiple tens of megabytes, the "docket" is
# small, it is easy to update it automatically or to duplicated its content
# during a transaction.
#
# Multiple raw data can exist at the same time (The currently valid one and a
# new one beind used by an in progress transaction). To accomodate this, the
# filename hosting the raw data has a variable parts. The exact filename is
# specified inside the "docket" file.
#
# The docket file contains information to find, qualify and validate the raw
# data. Its content is currently very light, but it will expand as the on disk
# nodemap gains the necessary features to be used in production.

# version 0 is experimental, no BC garantee, do no use outside of tests.
ONDISK_VERSION = 0
S_VERSION = struct.Struct(">B")
S_HEADER = struct.Struct(">BQQQ")

ID_SIZE = 8


def _make_uid():
    """return a new unique identifier.

    The identifier is random and composed of ascii characters."""
    return nodemod.hex(os.urandom(ID_SIZE))


class NodeMapDocket(object):
    """metadata associated with persistent nodemap data

    The persistent data may come from disk or be on their way to disk.
    """

    def __init__(self, uid=None):
        if uid is None:
            uid = _make_uid()
        self.uid = uid
        self.tip_rev = None
        self.data_length = None
        self.data_unused = 0

    def copy(self):
        new = NodeMapDocket(uid=self.uid)
        new.tip_rev = self.tip_rev
        new.data_length = self.data_length
        new.data_unused = self.data_unused
        return new

    def __cmp__(self, other):
        if self.uid < other.uid:
            return -1
        if self.uid > other.uid:
            return 1
        elif self.data_length < other.data_length:
            return -1
        elif self.data_length > other.data_length:
            return 1
        return 0

    def __eq__(self, other):
        return self.uid == other.uid and self.data_length == other.data_length

    def serialize(self):
        """return serialized bytes for a docket using the passed uid"""
        data = []
        data.append(S_VERSION.pack(ONDISK_VERSION))
        headers = (
            len(self.uid),
            self.tip_rev,
            self.data_length,
            self.data_unused,
        )
        data.append(S_HEADER.pack(*headers))
        data.append(self.uid)
        return b''.join(data)


def _rawdata_filepath(revlog, docket):
    """The (vfs relative) nodemap's rawdata file for a given uid"""
    prefix = revlog.nodemap_file[:-2]
    return b"%s-%s.nd" % (prefix, docket.uid)


def _other_rawdata_filepath(revlog, docket):
    prefix = revlog.nodemap_file[:-2]
    pattern = re.compile(b"(^|/)%s-[0-9a-f]+\.nd$" % prefix)
    new_file_path = _rawdata_filepath(revlog, docket)
    new_file_name = revlog.opener.basename(new_file_path)
    dirpath = revlog.opener.dirname(new_file_path)
    others = []
    for f in revlog.opener.listdir(dirpath):
        if pattern.match(f) and f != new_file_name:
            others.append(f)
    return others


### Nodemap Trie
#
# This is a simple reference implementation to compute and persist a nodemap
# trie. This reference implementation is write only. The python version of this
# is not expected to be actually used, since it wont provide performance
# improvement over existing non-persistent C implementation.
#
# The nodemap is persisted as Trie using 4bits-address/16-entries block. each
# revision can be adressed using its node shortest prefix.
#
# The trie is stored as a sequence of block. Each block contains 16 entries
# (signed 64bit integer, big endian). Each entry can be one of the following:
#
#  * value >=  0 -> index of sub-block
#  * value == -1 -> no value
#  * value <  -1 -> a revision value: rev = -(value+10)
#
# The implementation focus on simplicity, not on performance. A Rust
# implementation should provide a efficient version of the same binary
# persistence. This reference python implementation is never meant to be
# extensively use in production.


def persistent_data(index):
    """return the persistent binary form for a nodemap for a given index
    """
    trie = _build_trie(index)
    return _persist_trie(trie)


def update_persistent_data(index, root, max_idx, last_rev):
    """return the incremental update for persistent nodemap from a given index
    """
    changed_block, trie = _update_trie(index, root, last_rev)
    return (
        changed_block * S_BLOCK.size,
        _persist_trie(trie, existing_idx=max_idx),
    )


S_BLOCK = struct.Struct(">" + ("l" * 16))

NO_ENTRY = -1
# rev 0 need to be -2 because 0 is used by block, -1 is a special value.
REV_OFFSET = 2


def _transform_rev(rev):
    """Return the number used to represent the rev in the tree.

    (or retrieve a rev number from such representation)

    Note that this is an involution, a function equal to its inverse (i.e.
    which gives the identity when applied to itself).
    """
    return -(rev + REV_OFFSET)


def _to_int(hex_digit):
    """turn an hexadecimal digit into a proper integer"""
    return int(hex_digit, 16)


class Block(dict):
    """represent a block of the Trie

    contains up to 16 entry indexed from 0 to 15"""

    def __init__(self):
        super(Block, self).__init__()
        # If this block exist on disk, here is its ID
        self.ondisk_id = None

    def __iter__(self):
        return iter(self.get(i) for i in range(16))


def _build_trie(index):
    """build a nodemap trie

    The nodemap stores revision number for each unique prefix.

    Each block is a dictionary with keys in `[0, 15]`. Values are either
    another block or a revision number.
    """
    root = Block()
    for rev in range(len(index)):
        hex = nodemod.hex(index[rev][7])
        _insert_into_block(index, 0, root, rev, hex)
    return root


def _update_trie(index, root, last_rev):
    """consume"""
    changed = 0
    for rev in range(last_rev + 1, len(index)):
        hex = nodemod.hex(index[rev][7])
        changed += _insert_into_block(index, 0, root, rev, hex)
    return changed, root


def _insert_into_block(index, level, block, current_rev, current_hex):
    """insert a new revision in a block

    index: the index we are adding revision for
    level: the depth of the current block in the trie
    block: the block currently being considered
    current_rev: the revision number we are adding
    current_hex: the hexadecimal representation of the of that revision
    """
    changed = 1
    if block.ondisk_id is not None:
        block.ondisk_id = None
    hex_digit = _to_int(current_hex[level : level + 1])
    entry = block.get(hex_digit)
    if entry is None:
        # no entry, simply store the revision number
        block[hex_digit] = current_rev
    elif isinstance(entry, dict):
        # need to recurse to an underlying block
        changed += _insert_into_block(
            index, level + 1, entry, current_rev, current_hex
        )
    else:
        # collision with a previously unique prefix, inserting new
        # vertices to fit both entry.
        other_hex = nodemod.hex(index[entry][7])
        other_rev = entry
        new = Block()
        block[hex_digit] = new
        _insert_into_block(index, level + 1, new, other_rev, other_hex)
        _insert_into_block(index, level + 1, new, current_rev, current_hex)
    return changed


def _persist_trie(root, existing_idx=None):
    """turn a nodemap trie into persistent binary data

    See `_build_trie` for nodemap trie structure"""
    block_map = {}
    if existing_idx is not None:
        base_idx = existing_idx + 1
    else:
        base_idx = 0
    chunks = []
    for tn in _walk_trie(root):
        if tn.ondisk_id is not None:
            block_map[id(tn)] = tn.ondisk_id
        else:
            block_map[id(tn)] = len(chunks) + base_idx
            chunks.append(_persist_block(tn, block_map))
    return b''.join(chunks)


def _walk_trie(block):
    """yield all the block in a trie

    Children blocks are always yield before their parent block.
    """
    for (_, item) in sorted(block.items()):
        if isinstance(item, dict):
            for sub_block in _walk_trie(item):
                yield sub_block
    yield block


def _persist_block(block_node, block_map):
    """produce persistent binary data for a single block

    Children block are assumed to be already persisted and present in
    block_map.
    """
    data = tuple(_to_value(v, block_map) for v in block_node)
    return S_BLOCK.pack(*data)


def _to_value(item, block_map):
    """persist any value as an integer"""
    if item is None:
        return NO_ENTRY
    elif isinstance(item, dict):
        return block_map[id(item)]
    else:
        return _transform_rev(item)


def parse_data(data):
    """parse parse nodemap data into a nodemap Trie"""
    if (len(data) % S_BLOCK.size) != 0:
        msg = "nodemap data size is not a multiple of block size (%d): %d"
        raise error.Abort(msg % (S_BLOCK.size, len(data)))
    if not data:
        return Block(), None
    block_map = {}
    new_blocks = []
    for i in range(0, len(data), S_BLOCK.size):
        block = Block()
        block.ondisk_id = len(block_map)
        block_map[block.ondisk_id] = block
        block_data = data[i : i + S_BLOCK.size]
        values = S_BLOCK.unpack(block_data)
        new_blocks.append((block, values))
    for b, values in new_blocks:
        for idx, v in enumerate(values):
            if v == NO_ENTRY:
                continue
            elif v >= 0:
                b[idx] = block_map[v]
            else:
                b[idx] = _transform_rev(v)
    return block, i // S_BLOCK.size


# debug utility


def check_data(ui, index, data):
    """verify that the provided nodemap data are valid for the given idex"""
    ret = 0
    ui.status((b"revision in index:   %d\n") % len(index))
    root, __ = parse_data(data)
    all_revs = set(_all_revisions(root))
    ui.status((b"revision in nodemap: %d\n") % len(all_revs))
    for r in range(len(index)):
        if r not in all_revs:
            msg = b"  revision missing from nodemap: %d\n" % r
            ui.write_err(msg)
            ret = 1
        else:
            all_revs.remove(r)
        nm_rev = _find_node(root, nodemod.hex(index[r][7]))
        if nm_rev is None:
            msg = b"  revision node does not match any entries: %d\n" % r
            ui.write_err(msg)
            ret = 1
        elif nm_rev != r:
            msg = (
                b"  revision node does not match the expected revision: "
                b"%d != %d\n" % (r, nm_rev)
            )
            ui.write_err(msg)
            ret = 1

    if all_revs:
        for r in sorted(all_revs):
            msg = b"  extra revision in  nodemap: %d\n" % r
            ui.write_err(msg)
        ret = 1
    return ret


def _all_revisions(root):
    """return all revisions stored in a Trie"""
    for block in _walk_trie(root):
        for v in block:
            if v is None or isinstance(v, Block):
                continue
            yield v


def _find_node(block, node):
    """find the revision associated with a given node"""
    entry = block.get(_to_int(node[0:1]))
    if isinstance(entry, dict):
        return _find_node(entry, node[1:])
    return entry