changelog-delay: move the delay/divert logic inside the (inner) revlog
Instead of hacking throught the vfs/opener, we implement the delay/divert logic
inside the `_InnerRevlog` and `randomaccessfile` object. This will allow to an
alternative implementation of the `_InnerRevlog` that does not need to use Python details.
As a result, the new implementation can use the transaction less agressively
and avoid some extra output since no data had been written yet. That seems like
a good side effect.
--- a/mercurial/changelog.py Thu Oct 26 05:37:37 2023 +0200
+++ b/mercurial/changelog.py Tue Oct 24 11:08:49 2023 +0200
@@ -27,7 +27,6 @@
from .revlogutils import (
constants as revlog_constants,
flagutil,
- randomaccessfile,
)
_defaultextra = {b'branch': b'default'}
@@ -92,38 +91,6 @@
return b'\n'.join([l.rstrip() for l in desc.splitlines()]).strip(b'\n')
-class _divertopener:
- def __init__(self, opener, target):
- self._opener = opener
- self._target = target
-
- def __call__(self, name, mode=b'r', checkambig=False, **kwargs):
- if name != self._target:
- return self._opener(name, mode, **kwargs)
- return self._opener(name + b".a", mode, **kwargs)
-
- def __getattr__(self, attr):
- return getattr(self._opener, attr)
-
-
-class _delayopener:
- """build an opener that stores chunks in 'buf' instead of 'target'"""
-
- def __init__(self, opener, target, buf):
- self._opener = opener
- self._target = target
- self._buf = buf
-
- def __call__(self, name, mode=b'r', checkambig=False, **kwargs):
- if name != self._target:
- return self._opener(name, mode, **kwargs)
- assert not kwargs
- return randomaccessfile.appender(self._opener, name, mode, self._buf)
-
- def __getattr__(self, attr):
- return getattr(self._opener, attr)
-
-
@attr.s
class _changelogrevision:
# Extensions might modify _defaultextra, so let the constructor below pass
@@ -354,10 +321,7 @@
# chains.
self._storedeltachains = False
- self._realopener = opener
- self._delayed = False
- self._delaybuf = None
- self._divert = False
+ self._v2_delayed = False
self._filteredrevs = frozenset()
self._filteredrevs_hashcache = {}
self._copiesstorage = opener.options.get(b'copies-storage')
@@ -374,90 +338,47 @@
self._filteredrevs_hashcache = {}
def _write_docket(self, tr):
- if not self.is_delaying:
+ if not self._v2_delayed:
super(changelog, self)._write_docket(tr)
- @property
- def is_delaying(self):
- return self._delayed
-
def delayupdate(self, tr):
"""delay visibility of index updates to other readers"""
assert not self._inner.is_open
- if self._docket is None and not self.is_delaying:
- if len(self) == 0:
- self._divert = True
- if self._realopener.exists(self._indexfile + b'.a'):
- self._realopener.unlink(self._indexfile + b'.a')
- self.opener = _divertopener(self._realopener, self._indexfile)
- else:
- self._delaybuf = []
- self.opener = _delayopener(
- self._realopener, self._indexfile, self._delaybuf
- )
- self._inner.opener = self.opener
- self._inner._segmentfile.opener = self.opener
- self._inner._segmentfile_sidedata.opener = self.opener
- self._delayed = True
+ if self._docket is not None:
+ self._v2_delayed = True
+ else:
+ new_index = self._inner.delay()
+ if new_index is not None:
+ self._indexfile = new_index
+ tr.registertmp(new_index)
tr.addpending(b'cl-%i' % id(self), self._writepending)
tr.addfinalize(b'cl-%i' % id(self), self._finalize)
def _finalize(self, tr):
"""finalize index updates"""
assert not self._inner.is_open
- self._delayed = False
- self.opener = self._realopener
- self._inner.opener = self.opener
- self._inner._segmentfile.opener = self.opener
- self._inner._segmentfile_sidedata.opener = self.opener
- # move redirected index data back into place
if self._docket is not None:
- self._write_docket(tr)
- elif self._divert:
- assert not self._delaybuf
- tmpname = self._indexfile + b".a"
- nfile = self.opener.open(tmpname)
- nfile.close()
- self.opener.rename(tmpname, self._indexfile, checkambig=True)
- elif self._delaybuf:
- fp = self.opener(self._indexfile, b'a', checkambig=True)
- fp.write(b"".join(self._delaybuf))
- fp.close()
- self._delaybuf = None
- self._divert = False
- # split when we're done
- self._enforceinlinesize(tr, side_write=False)
+ self._docket.write(tr)
+ self._v2_delayed = False
+ else:
+ new_index_file = self._inner.finalize_pending()
+ self._indexfile = new_index_file
+ # split when we're done
+ self._enforceinlinesize(tr, side_write=False)
def _writepending(self, tr):
"""create a file containing the unfinalized state for
pretxnchangegroup"""
assert not self._inner.is_open
if self._docket:
- return self._docket.write(tr, pending=True)
- if self._delaybuf:
- # make a temporary copy of the index
- fp1 = self._realopener(self._indexfile)
- pendingfilename = self._indexfile + b".a"
- # register as a temp file to ensure cleanup on failure
- tr.registertmp(pendingfilename)
- # write existing data
- fp2 = self._realopener(pendingfilename, b"w")
- fp2.write(fp1.read())
- # add pending data
- fp2.write(b"".join(self._delaybuf))
- fp2.close()
- # switch modes so finalize can simply rename
- self._delaybuf = None
- self._divert = True
- self.opener = _divertopener(self._realopener, self._indexfile)
- self._inner.opener = self.opener
- self._inner._segmentfile.opener = self.opener
- self._inner._segmentfile_sidedata.opener = self.opener
-
- if self._divert:
- return True
-
- return False
+ any_pending = self._docket.write(tr, pending=True)
+ self._v2_delayed = False
+ else:
+ new_index, any_pending = self._inner.write_pending()
+ if new_index is not None:
+ self._indexfile = new_index
+ tr.registertmp(new_index)
+ return any_pending
def _enforceinlinesize(self, tr, side_write=True):
if not self.is_delaying:
--- a/mercurial/repocache.py Thu Oct 26 05:37:37 2023 +0200
+++ b/mercurial/repocache.py Tue Oct 24 11:08:49 2023 +0200
@@ -129,7 +129,7 @@
srcfilecache = srcrepo._filecache
if b'changelog' in srcfilecache:
destfilecache[b'changelog'] = ce = srcfilecache[b'changelog']
- ce.obj.opener = ce.obj._realopener = destrepo.svfs
+ ce.obj.opener = ce.obj._inner.opener = destrepo.svfs
if b'obsstore' in srcfilecache:
destfilecache[b'obsstore'] = ce = srcfilecache[b'obsstore']
ce.obj.svfs = destrepo.svfs
--- a/mercurial/revlog.py Thu Oct 26 05:37:37 2023 +0200
+++ b/mercurial/revlog.py Tue Oct 24 11:08:49 2023 +0200
@@ -369,6 +369,9 @@
self.delta_config = delta_config
self.feature_config = feature_config
+ # used during diverted write.
+ self._orig_index_file = None
+
self._default_compression_header = default_compression_header
# index
@@ -393,6 +396,8 @@
# 3-tuple of (node, rev, text) for a raw revision.
self._revisioncache = None
+ self._delay_buffer = None
+
@property
def index_file(self):
return self.__index_file
@@ -407,14 +412,27 @@
return len(self.index)
def clear_cache(self):
+ assert not self.is_delaying
self._revisioncache = None
self._segmentfile.clear_cache()
self._segmentfile_sidedata.clear_cache()
@property
def canonical_index_file(self):
+ if self._orig_index_file is not None:
+ return self._orig_index_file
return self.index_file
+ @property
+ def is_delaying(self):
+ """is the revlog is currently delaying the visibility of written data?
+
+ The delaying mechanism can be either in-memory or written on disk in a
+ side-file."""
+ return (self._delay_buffer is not None) or (
+ self._orig_index_file is not None
+ )
+
# Derived from index values.
def start(self, rev):
@@ -700,22 +718,36 @@
You should not use this directly and use `_writing` instead
"""
try:
- f = self.opener(
- self.index_file,
- mode=b"r+",
- checkambig=self.data_config.check_ambig,
- )
+ if self._delay_buffer is None:
+ f = self.opener(
+ self.index_file,
+ mode=b"r+",
+ checkambig=self.data_config.check_ambig,
+ )
+ else:
+ # check_ambig affect we way we open file for writing, however
+ # here, we do not actually open a file for writting as write
+ # will appened to a delay_buffer. So check_ambig is not
+ # meaningful and unneeded here.
+ f = randomaccessfile.appender(
+ self.opener, self.index_file, b"r+", self._delay_buffer
+ )
if index_end is None:
f.seek(0, os.SEEK_END)
else:
f.seek(index_end, os.SEEK_SET)
return f
except FileNotFoundError:
- return self.opener(
- self.index_file,
- mode=b"w+",
- checkambig=self.data_config.check_ambig,
- )
+ if self._delay_buffer is None:
+ return self.opener(
+ self.index_file,
+ mode=b"w+",
+ checkambig=self.data_config.check_ambig,
+ )
+ else:
+ return randomaccessfile.appender(
+ self.opener, self.index_file, b"w+", self._delay_buffer
+ )
def __index_new_fp(self):
"""internal method to create a new index file for writing
@@ -1044,20 +1076,101 @@
dfh.write(data[1])
if sidedata:
sdfh.write(sidedata)
- ifh.write(entry)
+ if self._delay_buffer is None:
+ ifh.write(entry)
+ else:
+ self._delay_buffer.append(entry)
else:
offset += curr * self.index.entry_size
transaction.add(self.canonical_index_file, offset)
- ifh.write(entry)
- ifh.write(data[0])
- ifh.write(data[1])
assert not sidedata
+ if self._delay_buffer is None:
+ ifh.write(entry)
+ ifh.write(data[0])
+ ifh.write(data[1])
+ else:
+ self._delay_buffer.append(entry)
+ self._delay_buffer.append(data[0])
+ self._delay_buffer.append(data[1])
return (
ifh.tell(),
dfh.tell() if dfh else None,
sdfh.tell() if sdfh else None,
)
+ def _divert_index(self):
+ return self.index_file + b'.a'
+
+ def delay(self):
+ assert not self.is_open
+ if self._delay_buffer is not None or self._orig_index_file is not None:
+ # delay or divert already in place
+ return None
+ elif len(self.index) == 0:
+ self._orig_index_file = self.index_file
+ self.index_file = self._divert_index()
+ self._segmentfile.filename = self.index_file
+ assert self._orig_index_file is not None
+ assert self.index_file is not None
+ if self.opener.exists(self.index_file):
+ self.opener.unlink(self.index_file)
+ return self.index_file
+ else:
+ self._segmentfile._delay_buffer = self._delay_buffer = []
+ return None
+
+ def write_pending(self):
+ assert not self.is_open
+ if self._orig_index_file is not None:
+ return None, True
+ any_pending = False
+ pending_index_file = self._divert_index()
+ if self.opener.exists(pending_index_file):
+ self.opener.unlink(pending_index_file)
+ util.copyfile(
+ self.opener.join(self.index_file),
+ self.opener.join(pending_index_file),
+ )
+ if self._delay_buffer:
+ with self.opener(pending_index_file, b'r+') as ifh:
+ ifh.seek(0, os.SEEK_END)
+ ifh.write(b"".join(self._delay_buffer))
+ any_pending = True
+ self._segmentfile._delay_buffer = self._delay_buffer = None
+ self._orig_index_file = self.index_file
+ self.index_file = pending_index_file
+ self._segmentfile.filename = self.index_file
+ return self.index_file, any_pending
+
+ def finalize_pending(self):
+ assert not self.is_open
+
+ delay = self._delay_buffer is not None
+ divert = self._orig_index_file is not None
+
+ if delay and divert:
+ assert False, "unreachable"
+ elif delay:
+ if self._delay_buffer:
+ with self.opener(self.index_file, b'r+') as ifh:
+ ifh.seek(0, os.SEEK_END)
+ ifh.write(b"".join(self._delay_buffer))
+ self._segmentfile._delay_buffer = self._delay_buffer = None
+ elif divert:
+ if self.opener.exists(self.index_file):
+ self.opener.rename(
+ self.index_file,
+ self._orig_index_file,
+ checkambig=True,
+ )
+ self.index_file = self._orig_index_file
+ self._orig_index_file = None
+ self._segmentfile.filename = self.index_file
+ else:
+ msg = b"not delay or divert found on this revlog"
+ raise error.ProgrammingError(msg)
+ return self.canonical_index_file
+
class revlog:
"""
@@ -2925,6 +3038,10 @@
if self._docket is not None:
self._write_docket(transaction)
+ @property
+ def is_delaying(self):
+ return self._inner.is_delaying
+
def _write_docket(self, transaction):
"""write the current docket on disk
--- a/mercurial/revlogutils/randomaccessfile.py Thu Oct 26 05:37:37 2023 +0200
+++ b/mercurial/revlogutils/randomaccessfile.py Tue Oct 24 11:08:49 2023 +0200
@@ -116,6 +116,8 @@
if initial_cache:
self._cached_chunk_position, self._cached_chunk = initial_cache
+ self._delay_buffer = None
+
def clear_cache(self):
self._cached_chunk = b''
self._cached_chunk_position = 0
@@ -131,7 +133,12 @@
def _open(self, mode=b'r'):
"""Return a file object"""
- return self.opener(self.filename, mode=mode)
+ if self._delay_buffer is None:
+ return self.opener(self.filename, mode=mode)
+ else:
+ return appender(
+ self.opener, self.filename, mode, self._delay_buffer
+ )
@contextlib.contextmanager
def _read_handle(self):
--- a/tests/test-bundle2-exchange.t Thu Oct 26 05:37:37 2023 +0200
+++ b/tests/test-bundle2-exchange.t Tue Oct 24 11:08:49 2023 +0200
@@ -1042,8 +1042,6 @@
adding changesets
remote: abort: incompatible Mercurial client; bundle2 required
remote: (see https://www.mercurial-scm.org/wiki/IncompatibleClient)
- transaction abort!
- rollback completed
abort: stream ended unexpectedly (got 0 bytes, expected 4)
[255]
--- a/tests/test-http-bad-server.t Thu Oct 26 05:37:37 2023 +0200
+++ b/tests/test-http-bad-server.t Tue Oct 24 11:08:49 2023 +0200
@@ -725,8 +725,6 @@
$ hg clone http://localhost:$HGPORT/ clone
requesting all changes
adding changesets
- transaction abort!
- rollback completed
abort: HTTP request error (incomplete response)
(this may be an intermittent network failure; if the error persists, consider contacting the network or server operator)
[255]
@@ -759,8 +757,6 @@
$ hg clone http://localhost:$HGPORT/ clone
requesting all changes
adding changesets
- transaction abort!
- rollback completed
abort: HTTP request error (incomplete response*) (glob)
(this may be an intermittent network failure; if the error persists, consider contacting the network or server operator)
[255]
@@ -795,8 +791,6 @@
$ hg clone http://localhost:$HGPORT/ clone
requesting all changes
adding changesets
- transaction abort!
- rollback completed
abort: HTTP request error (incomplete response)
(this may be an intermittent network failure; if the error persists, consider contacting the network or server operator)
[255]