Mercurial > hg
diff hgext/remotefilelog/basestore.py @ 40495:3a333a582d7b
remotefilelog: import pruned-down remotefilelog extension from hg-experimental
This is remotefilelog as of my recent patches for compatibility with
current tip of hg, minus support for old versions of Mercurial and
some FB-specific features like their treemanifest extension and
fetching linkrev data from a patched phabricator. The file extutil.py
moved from hgext3rd to remotefilelog.
This is not yet ready to be landed, consider it a preview for
now. Planned changes include:
* replace lz4 with zstd
* rename some capabilities, requirements and wireproto commands to mark
them as experimental
* consolidate bits of shallowutil with related functions (eg readfile)
I'm certainly open to other (small) changes, but my rough mission is
to land this largely as-is so we can use it as a model of the
functionality we need going forward for lazy-fetching of file contents
from a server.
# no-check-commit because of a few foo_bar functions
Differential Revision: https://phab.mercurial-scm.org/D4782
author | Augie Fackler <augie@google.com> |
---|---|
date | Thu, 27 Sep 2018 13:03:19 -0400 |
parents | |
children | 13d4ad8d7801 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/hgext/remotefilelog/basestore.py Thu Sep 27 13:03:19 2018 -0400 @@ -0,0 +1,423 @@ +from __future__ import absolute_import + +import errno +import hashlib +import os +import shutil +import stat +import time + +from mercurial.i18n import _ +from mercurial.node import bin, hex +from mercurial import ( + error, + pycompat, + util, +) +from . import ( + constants, + shallowutil, +) + +class basestore(object): + def __init__(self, repo, path, reponame, shared=False): + """Creates a remotefilelog store object for the given repo name. + + `path` - The file path where this store keeps its data + `reponame` - The name of the repo. This is used to partition data from + many repos. + `shared` - True if this store is a shared cache of data from the central + server, for many repos on this machine. False means this store is for + the local data for one repo. + """ + self.repo = repo + self.ui = repo.ui + self._path = path + self._reponame = reponame + self._shared = shared + self._uid = os.getuid() if not pycompat.iswindows else None + + self._validatecachelog = self.ui.config("remotefilelog", + "validatecachelog") + self._validatecache = self.ui.config("remotefilelog", "validatecache", + 'on') + if self._validatecache not in ('on', 'strict', 'off'): + self._validatecache = 'on' + if self._validatecache == 'off': + self._validatecache = False + + if shared: + shallowutil.mkstickygroupdir(self.ui, path) + + def getmissing(self, keys): + missing = [] + for name, node in keys: + filepath = self._getfilepath(name, node) + exists = os.path.exists(filepath) + if (exists and self._validatecache == 'strict' and + not self._validatekey(filepath, 'contains')): + exists = False + if not exists: + missing.append((name, node)) + + return missing + + # BELOW THIS ARE IMPLEMENTATIONS OF REPACK SOURCE + + def markledger(self, ledger, options=None): + if options and options.get(constants.OPTION_PACKSONLY): + return + if self._shared: + for filename, nodes in self._getfiles(): + for node in nodes: + ledger.markdataentry(self, filename, node) + ledger.markhistoryentry(self, filename, node) + + def cleanup(self, ledger): + ui = self.ui + entries = ledger.sources.get(self, []) + count = 0 + for entry in entries: + if entry.gced or (entry.datarepacked and entry.historyrepacked): + ui.progress(_("cleaning up"), count, unit="files", + total=len(entries)) + path = self._getfilepath(entry.filename, entry.node) + util.tryunlink(path) + count += 1 + ui.progress(_("cleaning up"), None) + + # Clean up the repo cache directory. + self._cleanupdirectory(self._getrepocachepath()) + + # BELOW THIS ARE NON-STANDARD APIS + + def _cleanupdirectory(self, rootdir): + """Removes the empty directories and unnecessary files within the root + directory recursively. Note that this method does not remove the root + directory itself. """ + + oldfiles = set() + otherfiles = set() + # osutil.listdir returns stat information which saves some rmdir/listdir + # syscalls. + for name, mode in util.osutil.listdir(rootdir): + if stat.S_ISDIR(mode): + dirpath = os.path.join(rootdir, name) + self._cleanupdirectory(dirpath) + + # Now that the directory specified by dirpath is potentially + # empty, try and remove it. + try: + os.rmdir(dirpath) + except OSError: + pass + + elif stat.S_ISREG(mode): + if name.endswith('_old'): + oldfiles.add(name[:-4]) + else: + otherfiles.add(name) + + # Remove the files which end with suffix '_old' and have no + # corresponding file without the suffix '_old'. See addremotefilelognode + # method for the generation/purpose of files with '_old' suffix. + for filename in oldfiles - otherfiles: + filepath = os.path.join(rootdir, filename + '_old') + util.tryunlink(filepath) + + def _getfiles(self): + """Return a list of (filename, [node,...]) for all the revisions that + exist in the store. + + This is useful for obtaining a list of all the contents of the store + when performing a repack to another store, since the store API requires + name+node keys and not namehash+node keys. + """ + existing = {} + for filenamehash, node in self._listkeys(): + existing.setdefault(filenamehash, []).append(node) + + filenamemap = self._resolvefilenames(existing.keys()) + + for filename, sha in filenamemap.iteritems(): + yield (filename, existing[sha]) + + def _resolvefilenames(self, hashes): + """Given a list of filename hashes that are present in the + remotefilelog store, return a mapping from filename->hash. + + This is useful when converting remotefilelog blobs into other storage + formats. + """ + if not hashes: + return {} + + filenames = {} + missingfilename = set(hashes) + + # Start with a full manifest, since it'll cover the majority of files + for filename in self.repo['tip'].manifest(): + sha = hashlib.sha1(filename).digest() + if sha in missingfilename: + filenames[filename] = sha + missingfilename.discard(sha) + + # Scan the changelog until we've found every file name + cl = self.repo.unfiltered().changelog + for rev in pycompat.xrange(len(cl) - 1, -1, -1): + if not missingfilename: + break + files = cl.readfiles(cl.node(rev)) + for filename in files: + sha = hashlib.sha1(filename).digest() + if sha in missingfilename: + filenames[filename] = sha + missingfilename.discard(sha) + + return filenames + + def _getrepocachepath(self): + return os.path.join( + self._path, self._reponame) if self._shared else self._path + + def _listkeys(self): + """List all the remotefilelog keys that exist in the store. + + Returns a iterator of (filename hash, filecontent hash) tuples. + """ + + for root, dirs, files in os.walk(self._getrepocachepath()): + for filename in files: + if len(filename) != 40: + continue + node = filename + if self._shared: + # .../1a/85ffda..be21 + filenamehash = root[-41:-39] + root[-38:] + else: + filenamehash = root[-40:] + yield (bin(filenamehash), bin(node)) + + def _getfilepath(self, name, node): + node = hex(node) + if self._shared: + key = shallowutil.getcachekey(self._reponame, name, node) + else: + key = shallowutil.getlocalkey(name, node) + + return os.path.join(self._path, key) + + def _getdata(self, name, node): + filepath = self._getfilepath(name, node) + try: + data = shallowutil.readfile(filepath) + if self._validatecache and not self._validatedata(data, filepath): + if self._validatecachelog: + with open(self._validatecachelog, 'a+') as f: + f.write("corrupt %s during read\n" % filepath) + os.rename(filepath, filepath + ".corrupt") + raise KeyError("corrupt local cache file %s" % filepath) + except IOError: + raise KeyError("no file found at %s for %s:%s" % (filepath, name, + hex(node))) + + return data + + def addremotefilelognode(self, name, node, data): + filepath = self._getfilepath(name, node) + + oldumask = os.umask(0o002) + try: + # if this node already exists, save the old version for + # recovery/debugging purposes. + if os.path.exists(filepath): + newfilename = filepath + '_old' + # newfilename can be read-only and shutil.copy will fail. + # Delete newfilename to avoid it + if os.path.exists(newfilename): + shallowutil.unlinkfile(newfilename) + shutil.copy(filepath, newfilename) + + shallowutil.mkstickygroupdir(self.ui, os.path.dirname(filepath)) + shallowutil.writefile(filepath, data, readonly=True) + + if self._validatecache: + if not self._validatekey(filepath, 'write'): + raise error.Abort(_("local cache write was corrupted %s") % + filepath) + finally: + os.umask(oldumask) + + def markrepo(self, path): + """Call this to add the given repo path to the store's list of + repositories that are using it. This is useful later when doing garbage + collection, since it allows us to insecpt the repos to see what nodes + they want to be kept alive in the store. + """ + repospath = os.path.join(self._path, "repos") + with open(repospath, 'a') as reposfile: + reposfile.write(os.path.dirname(path) + "\n") + + repospathstat = os.stat(repospath) + if repospathstat.st_uid == self._uid: + os.chmod(repospath, 0o0664) + + def _validatekey(self, path, action): + with open(path, 'rb') as f: + data = f.read() + + if self._validatedata(data, path): + return True + + if self._validatecachelog: + with open(self._validatecachelog, 'a+') as f: + f.write("corrupt %s during %s\n" % (path, action)) + + os.rename(path, path + ".corrupt") + return False + + def _validatedata(self, data, path): + try: + if len(data) > 0: + # see remotefilelogserver.createfileblob for the format + offset, size, flags = shallowutil.parsesizeflags(data) + if len(data) <= size: + # it is truncated + return False + + # extract the node from the metadata + offset += size + datanode = data[offset:offset + 20] + + # and compare against the path + if os.path.basename(path) == hex(datanode): + # Content matches the intended path + return True + return False + except (ValueError, RuntimeError): + pass + + return False + + def gc(self, keepkeys): + ui = self.ui + cachepath = self._path + _removing = _("removing unnecessary files") + _truncating = _("enforcing cache limit") + + # prune cache + import Queue + queue = Queue.PriorityQueue() + originalsize = 0 + size = 0 + count = 0 + removed = 0 + + # keep files newer than a day even if they aren't needed + limit = time.time() - (60 * 60 * 24) + + ui.progress(_removing, count, unit="files") + for root, dirs, files in os.walk(cachepath): + for file in files: + if file == 'repos': + continue + + # Don't delete pack files + if '/packs/' in root: + continue + + ui.progress(_removing, count, unit="files") + path = os.path.join(root, file) + key = os.path.relpath(path, cachepath) + count += 1 + try: + pathstat = os.stat(path) + except OSError as e: + # errno.ENOENT = no such file or directory + if e.errno != errno.ENOENT: + raise + msg = _("warning: file %s was removed by another process\n") + ui.warn(msg % path) + continue + + originalsize += pathstat.st_size + + if key in keepkeys or pathstat.st_atime > limit: + queue.put((pathstat.st_atime, path, pathstat)) + size += pathstat.st_size + else: + try: + shallowutil.unlinkfile(path) + except OSError as e: + # errno.ENOENT = no such file or directory + if e.errno != errno.ENOENT: + raise + msg = _("warning: file %s was removed by another " + "process\n") + ui.warn(msg % path) + continue + removed += 1 + ui.progress(_removing, None) + + # remove oldest files until under limit + limit = ui.configbytes("remotefilelog", "cachelimit") + if size > limit: + excess = size - limit + removedexcess = 0 + while queue and size > limit and size > 0: + ui.progress(_truncating, removedexcess, unit="bytes", + total=excess) + atime, oldpath, oldpathstat = queue.get() + try: + shallowutil.unlinkfile(oldpath) + except OSError as e: + # errno.ENOENT = no such file or directory + if e.errno != errno.ENOENT: + raise + msg = _("warning: file %s was removed by another process\n") + ui.warn(msg % oldpath) + size -= oldpathstat.st_size + removed += 1 + removedexcess += oldpathstat.st_size + ui.progress(_truncating, None) + + ui.status(_("finished: removed %s of %s files (%0.2f GB to %0.2f GB)\n") + % (removed, count, + float(originalsize) / 1024.0 / 1024.0 / 1024.0, + float(size) / 1024.0 / 1024.0 / 1024.0)) + +class baseunionstore(object): + def __init__(self, *args, **kwargs): + # If one of the functions that iterates all of the stores is about to + # throw a KeyError, try this many times with a full refresh between + # attempts. A repack operation may have moved data from one store to + # another while we were running. + self.numattempts = kwargs.get('numretries', 0) + 1 + # If not-None, call this function on every retry and if the attempts are + # exhausted. + self.retrylog = kwargs.get('retrylog', None) + + def markforrefresh(self): + for store in self.stores: + if util.safehasattr(store, 'markforrefresh'): + store.markforrefresh() + + @staticmethod + def retriable(fn): + def noop(*args): + pass + def wrapped(self, *args, **kwargs): + retrylog = self.retrylog or noop + funcname = fn.__name__ + for i in pycompat.xrange(self.numattempts): + if i > 0: + retrylog('re-attempting (n=%d) %s\n' % (i, funcname)) + self.markforrefresh() + try: + return fn(self, *args, **kwargs) + except KeyError: + pass + # retries exhausted + retrylog('retries exhausted in %s, raising KeyError\n' % funcname) + raise + return wrapped