--- a/mercurial/revlog.py Thu Oct 26 05:37:37 2023 +0200
+++ b/mercurial/revlog.py Tue Oct 24 11:08:49 2023 +0200
@@ -369,6 +369,9 @@
self.delta_config = delta_config
self.feature_config = feature_config
+ # used during diverted write.
+ self._orig_index_file = None
+
self._default_compression_header = default_compression_header
# index
@@ -393,6 +396,8 @@
# 3-tuple of (node, rev, text) for a raw revision.
self._revisioncache = None
+ self._delay_buffer = None
+
@property
def index_file(self):
return self.__index_file
@@ -407,14 +412,27 @@
return len(self.index)
def clear_cache(self):
+ assert not self.is_delaying
self._revisioncache = None
self._segmentfile.clear_cache()
self._segmentfile_sidedata.clear_cache()
@property
def canonical_index_file(self):
+ if self._orig_index_file is not None:
+ return self._orig_index_file
return self.index_file
+ @property
+ def is_delaying(self):
+ """is the revlog is currently delaying the visibility of written data?
+
+ The delaying mechanism can be either in-memory or written on disk in a
+ side-file."""
+ return (self._delay_buffer is not None) or (
+ self._orig_index_file is not None
+ )
+
# Derived from index values.
def start(self, rev):
@@ -700,22 +718,36 @@
You should not use this directly and use `_writing` instead
"""
try:
- f = self.opener(
- self.index_file,
- mode=b"r+",
- checkambig=self.data_config.check_ambig,
- )
+ if self._delay_buffer is None:
+ f = self.opener(
+ self.index_file,
+ mode=b"r+",
+ checkambig=self.data_config.check_ambig,
+ )
+ else:
+ # check_ambig affect we way we open file for writing, however
+ # here, we do not actually open a file for writting as write
+ # will appened to a delay_buffer. So check_ambig is not
+ # meaningful and unneeded here.
+ f = randomaccessfile.appender(
+ self.opener, self.index_file, b"r+", self._delay_buffer
+ )
if index_end is None:
f.seek(0, os.SEEK_END)
else:
f.seek(index_end, os.SEEK_SET)
return f
except FileNotFoundError:
- return self.opener(
- self.index_file,
- mode=b"w+",
- checkambig=self.data_config.check_ambig,
- )
+ if self._delay_buffer is None:
+ return self.opener(
+ self.index_file,
+ mode=b"w+",
+ checkambig=self.data_config.check_ambig,
+ )
+ else:
+ return randomaccessfile.appender(
+ self.opener, self.index_file, b"w+", self._delay_buffer
+ )
def __index_new_fp(self):
"""internal method to create a new index file for writing
@@ -1044,20 +1076,101 @@
dfh.write(data[1])
if sidedata:
sdfh.write(sidedata)
- ifh.write(entry)
+ if self._delay_buffer is None:
+ ifh.write(entry)
+ else:
+ self._delay_buffer.append(entry)
else:
offset += curr * self.index.entry_size
transaction.add(self.canonical_index_file, offset)
- ifh.write(entry)
- ifh.write(data[0])
- ifh.write(data[1])
assert not sidedata
+ if self._delay_buffer is None:
+ ifh.write(entry)
+ ifh.write(data[0])
+ ifh.write(data[1])
+ else:
+ self._delay_buffer.append(entry)
+ self._delay_buffer.append(data[0])
+ self._delay_buffer.append(data[1])
return (
ifh.tell(),
dfh.tell() if dfh else None,
sdfh.tell() if sdfh else None,
)
+ def _divert_index(self):
+ return self.index_file + b'.a'
+
+ def delay(self):
+ assert not self.is_open
+ if self._delay_buffer is not None or self._orig_index_file is not None:
+ # delay or divert already in place
+ return None
+ elif len(self.index) == 0:
+ self._orig_index_file = self.index_file
+ self.index_file = self._divert_index()
+ self._segmentfile.filename = self.index_file
+ assert self._orig_index_file is not None
+ assert self.index_file is not None
+ if self.opener.exists(self.index_file):
+ self.opener.unlink(self.index_file)
+ return self.index_file
+ else:
+ self._segmentfile._delay_buffer = self._delay_buffer = []
+ return None
+
+ def write_pending(self):
+ assert not self.is_open
+ if self._orig_index_file is not None:
+ return None, True
+ any_pending = False
+ pending_index_file = self._divert_index()
+ if self.opener.exists(pending_index_file):
+ self.opener.unlink(pending_index_file)
+ util.copyfile(
+ self.opener.join(self.index_file),
+ self.opener.join(pending_index_file),
+ )
+ if self._delay_buffer:
+ with self.opener(pending_index_file, b'r+') as ifh:
+ ifh.seek(0, os.SEEK_END)
+ ifh.write(b"".join(self._delay_buffer))
+ any_pending = True
+ self._segmentfile._delay_buffer = self._delay_buffer = None
+ self._orig_index_file = self.index_file
+ self.index_file = pending_index_file
+ self._segmentfile.filename = self.index_file
+ return self.index_file, any_pending
+
+ def finalize_pending(self):
+ assert not self.is_open
+
+ delay = self._delay_buffer is not None
+ divert = self._orig_index_file is not None
+
+ if delay and divert:
+ assert False, "unreachable"
+ elif delay:
+ if self._delay_buffer:
+ with self.opener(self.index_file, b'r+') as ifh:
+ ifh.seek(0, os.SEEK_END)
+ ifh.write(b"".join(self._delay_buffer))
+ self._segmentfile._delay_buffer = self._delay_buffer = None
+ elif divert:
+ if self.opener.exists(self.index_file):
+ self.opener.rename(
+ self.index_file,
+ self._orig_index_file,
+ checkambig=True,
+ )
+ self.index_file = self._orig_index_file
+ self._orig_index_file = None
+ self._segmentfile.filename = self.index_file
+ else:
+ msg = b"not delay or divert found on this revlog"
+ raise error.ProgrammingError(msg)
+ return self.canonical_index_file
+
class revlog:
"""
@@ -2925,6 +3038,10 @@
if self._docket is not None:
self._write_docket(transaction)
+ @property
+ def is_delaying(self):
+ return self._inner.is_delaying
+
def _write_docket(self, transaction):
"""write the current docket on disk