stream: prefer keeping an open file handle to volatile file instead of copy
We will still do copy if too many file handle are open. Currently, have less
than 10 volatile files in typical usage, so we should be fine.
See inline documentation for details.
--- a/mercurial/streamclone.py Tue Oct 01 15:55:49 2024 +0200
+++ b/mercurial/streamclone.py Tue Oct 01 16:07:51 2024 +0200
@@ -567,53 +567,113 @@
class VolatileManager:
- """Manage temporary backup of volatile file during stream clone
+ """Manage temporary backups of volatile files during stream clone.
- This should be used as a Python context, the copies will be discarded when
- exiting the context.
+ This class will keep open file handles for the volatile files, writing the
+ smaller ones on disk if the number of open file handles grow too much.
- A copy can be done by calling the object on the real path (encoded full
- path)
+ This should be used as a Python context, the file handles and copies will
+ be discarded when exiting the context.
- The backup path can be retrieved using the __getitem__ protocol, obj[path].
- On file without backup, it will return the unmodified path. (equivalent to
- `dict.get(x, x)`)
+ The preservation can be done by calling the object on the real path
+ (encoded full path).
+
+ Valid filehandles for any file should be retrieved by calling `open(path)`.
"""
+ # arbitrarily picked as "it seemed fine" and much higher than the current
+ # usage.
+ MAX_OPEN = 100
+
def __init__(self):
+ self._counter = 0
+ self._volatile_fps = None
self._copies = None
self._dst_dir = None
def __enter__(self):
- if self._copies is not None:
- msg = "Copies context already open"
- raise error.ProgrammingError(msg)
+ if self._counter == 0:
+ assert self._volatile_fps is None
+ self._volatile_fps = {}
+ self._counter += 1
+ return self
+
+ def __exit__(self, *args, **kwars):
+ """discard all backups"""
+ self._counter -= 1
+ if self._counter == 0:
+ for _size, fp in self._volatile_fps.values():
+ fp.close()
+ self._volatile_fps = None
+ if self._copies is not None:
+ for tmp in self._copies.values():
+ util.tryunlink(tmp)
+ util.tryrmdir(self._dst_dir)
+ self._copies = None
+ self._dst_dir = None
+ assert self._volatile_fps is None
+ assert self._copies is None
+ assert self._dst_dir is None
+
+ def _init_tmp_copies(self):
+ """prepare a temporary directory to save volatile files
+
+ This will be used as backup if we have too many files open"""
+ assert 0 < self._counter
+ assert self._copies is None
+ assert self._dst_dir is None
self._copies = {}
self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
- return self
+
+ def _flush_some_on_disk(self):
+ """move some of the open files to tempory files on disk"""
+ if self._copies is None:
+ self._init_tmp_copies()
+ flush_count = self.MAX_OPEN // 2
+ for src, (size, fp) in sorted(self._volatile_fps.items())[:flush_count]:
+ prefix = os.path.basename(src)
+ fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir)
+ self._copies[src] = dst
+ os.close(fd)
+ # we no longer hardlink, but on the other hand we rarely do this,
+ # and we do it for the smallest file only and not at all in the
+ # common case.
+ with open(dst, 'wb') as bck:
+ fp.seek(0)
+ bck.write(fp.read())
+ del self._volatile_fps[src]
+ fp.close()
+
+ def _keep_one(self, src):
+ """preserve an open file handle for a given path"""
+ # store the file quickly to ensure we close it if any error happens
+ _, fp = self._volatile_fps[src] = (None, open(src, 'rb'))
+ fp.seek(0, os.SEEK_END)
+ size = fp.tell()
+ self._volatile_fps[src] = (size, fp)
def __call__(self, src):
- """create a backup of the file at src"""
- prefix = os.path.basename(src)
- fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir)
- os.close(fd)
- self._copies[src] = dst
- util.copyfiles(src, dst, hardlink=True)
- return dst
+ """preserve the volatile file at src"""
+ assert 0 < self._counter
+ if len(self._volatile_fps) >= (self.MAX_OPEN - 1):
+ self._flush_some_on_disk()
+ self._keep_one(src)
@contextlib.contextmanager
def open(self, src):
- actual_path = self._copies.get(src, src)
- with open(actual_path, 'rb') as fp:
+ assert 0 < self._counter
+ entry = self._volatile_fps.get(src)
+ if entry is not None:
+ _size, fp = entry
+ fp.seek(0)
yield fp
-
- def __exit__(self, *args, **kwars):
- """discard all backups"""
- for tmp in self._copies.values():
- util.tryunlink(tmp)
- util.tryrmdir(self._dst_dir)
- self._copies = None
- self._dst_dir = None
+ else:
+ if self._copies is None:
+ actual_path = src
+ else:
+ actual_path = self._copies.get(src, src)
+ with open(actual_path, 'rb') as fp:
+ yield fp
def _makemap(repo):