Mercurial > hg
changeset 47214:906a7bcaac86
revlog: introduce a mandatory `_writing` context to update revlog content
Before this change, various revlog methods where managing the opening and
closing of the revlog files manually and passing the file descriptor alors the
call path. To simplify the tracking of the write operation by a future docket,
we need something more organised. As a result, we introduce a `revlog._writing`
context manager that will wrap each revlog update operation. The file
descriptor are kept in the existing `revlog._writinghandles` parameter that
was already used by the `addgroup` logic.
All this change is internal to the revlog only, the "public" interface is not
affected. The `addrevision` and `addgroup` logic are still responsible for
setup up this context. However this new context give us multiple benefits:
* all writer use a same, unified, logic,
* this context is programmatically enforced,
* each write "session" as a clearly identified start and end.
The post-pull sidedata update logic is still doing writing by end and will be
adjusted in a later changesets.
This change affect the concurrency checker test, because register the state of
the file in the transaction sooner in `addrevision` (about as early as what
`addgroup` would do), so the abort is rollbacking the other commit. I don't want
to weaken the current main logic.
Differential Revision: https://phab.mercurial-scm.org/D10605
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Mon, 03 May 2021 12:27:42 +0200 |
parents | 100f061d88f6 |
children | 042388bba644 |
files | mercurial/changelog.py mercurial/revlog.py tests/test-racy-mutations.t tests/test-revlog-raw.py |
diffstat | 4 files changed, 112 insertions(+), 89 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/changelog.py Mon May 03 12:27:31 2021 +0200 +++ b/mercurial/changelog.py Mon May 03 12:27:42 2021 +0200 @@ -506,9 +506,9 @@ return False - def _enforceinlinesize(self, tr, fp=None): + def _enforceinlinesize(self, tr): if not self._delayed: - revlog.revlog._enforceinlinesize(self, tr, fp) + revlog.revlog._enforceinlinesize(self, tr) def read(self, nodeorrev): """Obtain data from a parsed changelog revision.
--- a/mercurial/revlog.py Mon May 03 12:27:31 2021 +0200 +++ b/mercurial/revlog.py Mon May 03 12:27:42 2021 +0200 @@ -360,6 +360,8 @@ # 2-tuple of file handles being used for active writing. self._writinghandles = None + # prevent nesting of addgroup + self._adding_group = None self._loadindex() @@ -1955,7 +1957,7 @@ raise error.CensoredNodeError(self.display_id, node, text) raise - def _enforceinlinesize(self, tr, fp=None): + def _enforceinlinesize(self, tr): """Check if the revlog is too big for inline and convert if so. This should be called after revisions are added to the revlog. If the @@ -1975,21 +1977,27 @@ trindex = 0 tr.add(self._datafile, 0) - if fp: + existing_handles = False + if self._writinghandles is not None: + existing_handles = True + fp = self._writinghandles[0] fp.flush() fp.close() # We can't use the cached file handle after close(). So prevent # its usage. self._writinghandles = None - if True: - with self._indexfp(b'r') as ifh, self._datafp(b'w') as dfh: + new_dfh = self._datafp(b'w+') + new_dfh.truncate(0) # drop any potentially existing data + try: + with self._indexfp(b'r') as read_ifh: for r in self: - dfh.write(self._getsegmentforrevs(r, r, df=ifh)[1]) + new_dfh.write(self._getsegmentforrevs(r, r, df=read_ifh)[1]) if troffset <= self.start(r): trindex = r - - with self._indexfp(b'w') as fp: + new_dfh.flush() + + with self.opener(self._indexfile, mode=b'w', atomictemp=True) as fp: self._format_flags &= ~FLAG_INLINE_DATA self._inline = False for i in self: @@ -1999,7 +2007,6 @@ header = self.index.pack_header(header) e = header + e fp.write(e) - # the temp file replace the real index when we exit the context # manager @@ -2007,9 +2014,50 @@ nodemaputil.setup_persistent_nodemap(tr, self) self._chunkclear() + if existing_handles: + # switched from inline to conventional reopen the index + ifh = self._indexfp(b"a+") + self._writinghandles = (ifh, new_dfh) + new_dfh = None + finally: + if new_dfh is not None: + new_dfh.close() + def _nodeduplicatecallback(self, transaction, node): """called when trying to add a node already stored.""" + @contextlib.contextmanager + def _writing(self, transaction): + if self._writinghandles is not None: + yield + else: + r = len(self) + dsize = 0 + if r: + dsize = self.end(r - 1) + dfh = None + if not self._inline: + dfh = self._datafp(b"a+") + transaction.add(self._datafile, dsize) + try: + isize = r * self.index.entry_size + ifh = self._indexfp(b"a+") + if self._inline: + transaction.add(self._indexfile, dsize + isize) + else: + transaction.add(self._indexfile, isize) + try: + self._writinghandles = (ifh, dfh) + try: + yield + finally: + self._writinghandles = None + finally: + ifh.close() + finally: + if dfh is not None: + dfh.close() + def addrevision( self, text, @@ -2105,11 +2153,7 @@ useful when reusing a revision not stored in this revlog (ex: received over wire, or read from an external bundle). """ - dfh = None - if not self._inline: - dfh = self._datafp(b"a+") - ifh = self._indexfp(b"a+") - try: + with self._writing(transaction): return self._addrevision( node, rawtext, @@ -2119,15 +2163,9 @@ p2, flags, cachedelta, - ifh, - dfh, deltacomputer=deltacomputer, sidedata=sidedata, ) - finally: - if dfh: - dfh.close() - ifh.close() def compress(self, data): """Generate a possibly-compressed representation of data.""" @@ -2214,8 +2252,6 @@ p2, flags, cachedelta, - ifh, - dfh, alwayscache=False, deltacomputer=None, sidedata=None, @@ -2244,11 +2280,14 @@ raise error.RevlogError( _(b"%s: attempt to add wdir revision") % self.display_id ) + if self._writinghandles is None: + msg = b'adding revision outside `revlog._writing` context' + raise error.ProgrammingError(msg) if self._inline: - fh = ifh + fh = self._writinghandles[0] else: - fh = dfh + fh = self._writinghandles[1] btext = [rawtext] @@ -2258,6 +2297,7 @@ offset = self._get_data_offset(prev) if self._concurrencychecker: + ifh, dfh = self._writinghandles if self._inline: # offset is "as if" it were in the .d file, so we need to add on # the size of the entry metadata. @@ -2323,8 +2363,6 @@ entry = header + entry self._writeentry( transaction, - ifh, - dfh, entry, deltainfo.data, link, @@ -2362,9 +2400,7 @@ offset = max(self.end(rev), offset, sidedata_end) return offset - def _writeentry( - self, transaction, ifh, dfh, entry, data, link, offset, sidedata - ): + def _writeentry(self, transaction, entry, data, link, offset, sidedata): # Files opened in a+ mode have inconsistent behavior on various # platforms. Windows requires that a file positioning call be made # when the file handle transitions between reads and writes. See @@ -2377,6 +2413,10 @@ # Note: This is likely not necessary on Python 3. However, because # the file handle is reused for reads and may be seeked there, we need # to be careful before changing this. + if self._writinghandles is None: + msg = b'adding revision outside `revlog._writing` context' + raise error.ProgrammingError(msg) + ifh, dfh = self._writinghandles ifh.seek(0, os.SEEK_END) if dfh: dfh.seek(0, os.SEEK_END) @@ -2399,7 +2439,7 @@ ifh.write(data[1]) if sidedata: ifh.write(sidedata) - self._enforceinlinesize(transaction, ifh) + self._enforceinlinesize(transaction) nodemaputil.setup_persistent_nodemap(transaction, self) def addgroup( @@ -2422,28 +2462,13 @@ this revlog and the node that was added. """ - if self._writinghandles: + if self._adding_group: raise error.ProgrammingError(b'cannot nest addgroup() calls') - r = len(self) - end = 0 - if r: - end = self.end(r - 1) - ifh = self._indexfp(b"a+") - isize = r * self.index.entry_size - if self._inline: - transaction.add(self._indexfile, end + isize) - dfh = None - else: - transaction.add(self._indexfile, isize) - transaction.add(self._datafile, end) - dfh = self._datafp(b"a+") - - self._writinghandles = (ifh, dfh) + self._adding_group = True empty = True - try: - if True: + with self._writing(transaction): deltacomputer = deltautil.deltacomputer(self) # loop through our set of deltas for data in deltas: @@ -2514,8 +2539,6 @@ p2, flags, (baserev, delta), - ifh, - dfh, alwayscache=alwayscache, deltacomputer=deltacomputer, sidedata=sidedata, @@ -2524,20 +2547,8 @@ if addrevisioncb: addrevisioncb(self, rev) empty = False - - if not dfh and not self._inline: - # addrevision switched from inline to conventional - # reopen the index - ifh.close() - dfh = self._datafp(b"a+") - ifh = self._indexfp(b"a+") - self._writinghandles = (ifh, dfh) finally: - self._writinghandles = None - - if dfh: - dfh.close() - ifh.close() + self._adding_group = False return not empty def iscensored(self, rev): @@ -2868,13 +2879,7 @@ ) flags = flags | new_flags[0] & ~new_flags[1] - ifh = destrevlog.opener( - destrevlog._indexfile, b'a+', checkambig=False - ) - dfh = None - if not destrevlog._inline: - dfh = destrevlog.opener(destrevlog._datafile, b'a+') - try: + with destrevlog._writing(tr): destrevlog._addrevision( node, rawtext, @@ -2884,15 +2889,9 @@ p2, flags, cachedelta, - ifh, - dfh, deltacomputer=deltacomputer, sidedata=sidedata, ) - finally: - if dfh: - dfh.close() - ifh.close() if addrevisioncb: addrevisioncb(self, rev, node)
--- a/tests/test-racy-mutations.t Mon May 03 12:27:31 2021 +0200 +++ b/tests/test-racy-mutations.t Mon May 03 12:27:42 2021 +0200 @@ -91,7 +91,7 @@ $ hg debugrevlogindex -c rev linkrev nodeid p1 p2 0 0 222799e2f90b 000000000000 000000000000 - 1 1 6f124f6007a0 222799e2f90b 000000000000 + 1 1 6f124f6007a0 222799e2f90b 000000000000 (missing-correct-output !) And, because of transactions, there's none in the manifestlog either. $ hg debugrevlogindex -m rev linkrev nodeid p1 p2
--- a/tests/test-revlog-raw.py Mon May 03 12:27:31 2021 +0200 +++ b/tests/test-revlog-raw.py Mon May 03 12:27:42 2021 +0200 @@ -19,6 +19,32 @@ flagutil, ) + +class _NoTransaction(object): + """transaction like object to update the nodemap outside a transaction""" + + def __init__(self): + self._postclose = {} + + def addpostclose(self, callback_id, callback_func): + self._postclose[callback_id] = callback_func + + def registertmp(self, *args, **kwargs): + pass + + def addbackup(self, *args, **kwargs): + pass + + def add(self, *args, **kwargs): + pass + + def addabort(self, *args, **kwargs): + pass + + def _report(self, *args): + pass + + # TESTTMP is optional. This makes it convenient to run without run-tests.py tvfs = vfs.vfs(encoding.environ.get(b'TESTTMP', b'/tmp')) @@ -201,19 +227,17 @@ text = None cachedelta = (deltaparent, rlog.revdiff(deltaparent, r)) flags = rlog.flags(r) - ifh = dfh = None - try: - ifh = dlog.opener(dlog._indexfile, b'a+') - if not dlog._inline: - dfh = dlog.opener(dlog._datafile, b'a+') + with dlog._writing(_NoTransaction()): dlog._addrevision( - rlog.node(r), text, tr, r, p1, p2, flags, cachedelta, ifh, dfh + rlog.node(r), + text, + tr, + r, + p1, + p2, + flags, + cachedelta, ) - finally: - if dfh is not None: - dfh.close() - if ifh is not None: - ifh.close() return dlog