# HG changeset patch # User Pierre-Yves David # Date 1620037662 -7200 # Node ID 906a7bcaac868397bd065b2b0beb199d540bbf72 # Parent 100f061d88f610c1952333c59e1a297b3922bdda revlog: introduce a mandatory `_writing` context to update revlog content Before this change, various revlog methods where managing the opening and closing of the revlog files manually and passing the file descriptor alors the call path. To simplify the tracking of the write operation by a future docket, we need something more organised. As a result, we introduce a `revlog._writing` context manager that will wrap each revlog update operation. The file descriptor are kept in the existing `revlog._writinghandles` parameter that was already used by the `addgroup` logic. All this change is internal to the revlog only, the "public" interface is not affected. The `addrevision` and `addgroup` logic are still responsible for setup up this context. However this new context give us multiple benefits: * all writer use a same, unified, logic, * this context is programmatically enforced, * each write "session" as a clearly identified start and end. The post-pull sidedata update logic is still doing writing by end and will be adjusted in a later changesets. This change affect the concurrency checker test, because register the state of the file in the transaction sooner in `addrevision` (about as early as what `addgroup` would do), so the abort is rollbacking the other commit. I don't want to weaken the current main logic. Differential Revision: https://phab.mercurial-scm.org/D10605 diff -r 100f061d88f6 -r 906a7bcaac86 mercurial/changelog.py --- a/mercurial/changelog.py Mon May 03 12:27:31 2021 +0200 +++ b/mercurial/changelog.py Mon May 03 12:27:42 2021 +0200 @@ -506,9 +506,9 @@ return False - def _enforceinlinesize(self, tr, fp=None): + def _enforceinlinesize(self, tr): if not self._delayed: - revlog.revlog._enforceinlinesize(self, tr, fp) + revlog.revlog._enforceinlinesize(self, tr) def read(self, nodeorrev): """Obtain data from a parsed changelog revision. diff -r 100f061d88f6 -r 906a7bcaac86 mercurial/revlog.py --- a/mercurial/revlog.py Mon May 03 12:27:31 2021 +0200 +++ b/mercurial/revlog.py Mon May 03 12:27:42 2021 +0200 @@ -360,6 +360,8 @@ # 2-tuple of file handles being used for active writing. self._writinghandles = None + # prevent nesting of addgroup + self._adding_group = None self._loadindex() @@ -1955,7 +1957,7 @@ raise error.CensoredNodeError(self.display_id, node, text) raise - def _enforceinlinesize(self, tr, fp=None): + def _enforceinlinesize(self, tr): """Check if the revlog is too big for inline and convert if so. This should be called after revisions are added to the revlog. If the @@ -1975,21 +1977,27 @@ trindex = 0 tr.add(self._datafile, 0) - if fp: + existing_handles = False + if self._writinghandles is not None: + existing_handles = True + fp = self._writinghandles[0] fp.flush() fp.close() # We can't use the cached file handle after close(). So prevent # its usage. self._writinghandles = None - if True: - with self._indexfp(b'r') as ifh, self._datafp(b'w') as dfh: + new_dfh = self._datafp(b'w+') + new_dfh.truncate(0) # drop any potentially existing data + try: + with self._indexfp(b'r') as read_ifh: for r in self: - dfh.write(self._getsegmentforrevs(r, r, df=ifh)[1]) + new_dfh.write(self._getsegmentforrevs(r, r, df=read_ifh)[1]) if troffset <= self.start(r): trindex = r - - with self._indexfp(b'w') as fp: + new_dfh.flush() + + with self.opener(self._indexfile, mode=b'w', atomictemp=True) as fp: self._format_flags &= ~FLAG_INLINE_DATA self._inline = False for i in self: @@ -1999,7 +2007,6 @@ header = self.index.pack_header(header) e = header + e fp.write(e) - # the temp file replace the real index when we exit the context # manager @@ -2007,9 +2014,50 @@ nodemaputil.setup_persistent_nodemap(tr, self) self._chunkclear() + if existing_handles: + # switched from inline to conventional reopen the index + ifh = self._indexfp(b"a+") + self._writinghandles = (ifh, new_dfh) + new_dfh = None + finally: + if new_dfh is not None: + new_dfh.close() + def _nodeduplicatecallback(self, transaction, node): """called when trying to add a node already stored.""" + @contextlib.contextmanager + def _writing(self, transaction): + if self._writinghandles is not None: + yield + else: + r = len(self) + dsize = 0 + if r: + dsize = self.end(r - 1) + dfh = None + if not self._inline: + dfh = self._datafp(b"a+") + transaction.add(self._datafile, dsize) + try: + isize = r * self.index.entry_size + ifh = self._indexfp(b"a+") + if self._inline: + transaction.add(self._indexfile, dsize + isize) + else: + transaction.add(self._indexfile, isize) + try: + self._writinghandles = (ifh, dfh) + try: + yield + finally: + self._writinghandles = None + finally: + ifh.close() + finally: + if dfh is not None: + dfh.close() + def addrevision( self, text, @@ -2105,11 +2153,7 @@ useful when reusing a revision not stored in this revlog (ex: received over wire, or read from an external bundle). """ - dfh = None - if not self._inline: - dfh = self._datafp(b"a+") - ifh = self._indexfp(b"a+") - try: + with self._writing(transaction): return self._addrevision( node, rawtext, @@ -2119,15 +2163,9 @@ p2, flags, cachedelta, - ifh, - dfh, deltacomputer=deltacomputer, sidedata=sidedata, ) - finally: - if dfh: - dfh.close() - ifh.close() def compress(self, data): """Generate a possibly-compressed representation of data.""" @@ -2214,8 +2252,6 @@ p2, flags, cachedelta, - ifh, - dfh, alwayscache=False, deltacomputer=None, sidedata=None, @@ -2244,11 +2280,14 @@ raise error.RevlogError( _(b"%s: attempt to add wdir revision") % self.display_id ) + if self._writinghandles is None: + msg = b'adding revision outside `revlog._writing` context' + raise error.ProgrammingError(msg) if self._inline: - fh = ifh + fh = self._writinghandles[0] else: - fh = dfh + fh = self._writinghandles[1] btext = [rawtext] @@ -2258,6 +2297,7 @@ offset = self._get_data_offset(prev) if self._concurrencychecker: + ifh, dfh = self._writinghandles if self._inline: # offset is "as if" it were in the .d file, so we need to add on # the size of the entry metadata. @@ -2323,8 +2363,6 @@ entry = header + entry self._writeentry( transaction, - ifh, - dfh, entry, deltainfo.data, link, @@ -2362,9 +2400,7 @@ offset = max(self.end(rev), offset, sidedata_end) return offset - def _writeentry( - self, transaction, ifh, dfh, entry, data, link, offset, sidedata - ): + def _writeentry(self, transaction, entry, data, link, offset, sidedata): # Files opened in a+ mode have inconsistent behavior on various # platforms. Windows requires that a file positioning call be made # when the file handle transitions between reads and writes. See @@ -2377,6 +2413,10 @@ # Note: This is likely not necessary on Python 3. However, because # the file handle is reused for reads and may be seeked there, we need # to be careful before changing this. + if self._writinghandles is None: + msg = b'adding revision outside `revlog._writing` context' + raise error.ProgrammingError(msg) + ifh, dfh = self._writinghandles ifh.seek(0, os.SEEK_END) if dfh: dfh.seek(0, os.SEEK_END) @@ -2399,7 +2439,7 @@ ifh.write(data[1]) if sidedata: ifh.write(sidedata) - self._enforceinlinesize(transaction, ifh) + self._enforceinlinesize(transaction) nodemaputil.setup_persistent_nodemap(transaction, self) def addgroup( @@ -2422,28 +2462,13 @@ this revlog and the node that was added. """ - if self._writinghandles: + if self._adding_group: raise error.ProgrammingError(b'cannot nest addgroup() calls') - r = len(self) - end = 0 - if r: - end = self.end(r - 1) - ifh = self._indexfp(b"a+") - isize = r * self.index.entry_size - if self._inline: - transaction.add(self._indexfile, end + isize) - dfh = None - else: - transaction.add(self._indexfile, isize) - transaction.add(self._datafile, end) - dfh = self._datafp(b"a+") - - self._writinghandles = (ifh, dfh) + self._adding_group = True empty = True - try: - if True: + with self._writing(transaction): deltacomputer = deltautil.deltacomputer(self) # loop through our set of deltas for data in deltas: @@ -2514,8 +2539,6 @@ p2, flags, (baserev, delta), - ifh, - dfh, alwayscache=alwayscache, deltacomputer=deltacomputer, sidedata=sidedata, @@ -2524,20 +2547,8 @@ if addrevisioncb: addrevisioncb(self, rev) empty = False - - if not dfh and not self._inline: - # addrevision switched from inline to conventional - # reopen the index - ifh.close() - dfh = self._datafp(b"a+") - ifh = self._indexfp(b"a+") - self._writinghandles = (ifh, dfh) finally: - self._writinghandles = None - - if dfh: - dfh.close() - ifh.close() + self._adding_group = False return not empty def iscensored(self, rev): @@ -2868,13 +2879,7 @@ ) flags = flags | new_flags[0] & ~new_flags[1] - ifh = destrevlog.opener( - destrevlog._indexfile, b'a+', checkambig=False - ) - dfh = None - if not destrevlog._inline: - dfh = destrevlog.opener(destrevlog._datafile, b'a+') - try: + with destrevlog._writing(tr): destrevlog._addrevision( node, rawtext, @@ -2884,15 +2889,9 @@ p2, flags, cachedelta, - ifh, - dfh, deltacomputer=deltacomputer, sidedata=sidedata, ) - finally: - if dfh: - dfh.close() - ifh.close() if addrevisioncb: addrevisioncb(self, rev, node) diff -r 100f061d88f6 -r 906a7bcaac86 tests/test-racy-mutations.t --- a/tests/test-racy-mutations.t Mon May 03 12:27:31 2021 +0200 +++ b/tests/test-racy-mutations.t Mon May 03 12:27:42 2021 +0200 @@ -91,7 +91,7 @@ $ hg debugrevlogindex -c rev linkrev nodeid p1 p2 0 0 222799e2f90b 000000000000 000000000000 - 1 1 6f124f6007a0 222799e2f90b 000000000000 + 1 1 6f124f6007a0 222799e2f90b 000000000000 (missing-correct-output !) And, because of transactions, there's none in the manifestlog either. $ hg debugrevlogindex -m rev linkrev nodeid p1 p2 diff -r 100f061d88f6 -r 906a7bcaac86 tests/test-revlog-raw.py --- a/tests/test-revlog-raw.py Mon May 03 12:27:31 2021 +0200 +++ b/tests/test-revlog-raw.py Mon May 03 12:27:42 2021 +0200 @@ -19,6 +19,32 @@ flagutil, ) + +class _NoTransaction(object): + """transaction like object to update the nodemap outside a transaction""" + + def __init__(self): + self._postclose = {} + + def addpostclose(self, callback_id, callback_func): + self._postclose[callback_id] = callback_func + + def registertmp(self, *args, **kwargs): + pass + + def addbackup(self, *args, **kwargs): + pass + + def add(self, *args, **kwargs): + pass + + def addabort(self, *args, **kwargs): + pass + + def _report(self, *args): + pass + + # TESTTMP is optional. This makes it convenient to run without run-tests.py tvfs = vfs.vfs(encoding.environ.get(b'TESTTMP', b'/tmp')) @@ -201,19 +227,17 @@ text = None cachedelta = (deltaparent, rlog.revdiff(deltaparent, r)) flags = rlog.flags(r) - ifh = dfh = None - try: - ifh = dlog.opener(dlog._indexfile, b'a+') - if not dlog._inline: - dfh = dlog.opener(dlog._datafile, b'a+') + with dlog._writing(_NoTransaction()): dlog._addrevision( - rlog.node(r), text, tr, r, p1, p2, flags, cachedelta, ifh, dfh + rlog.node(r), + text, + tr, + r, + p1, + p2, + flags, + cachedelta, ) - finally: - if dfh is not None: - dfh.close() - if ifh is not None: - ifh.close() return dlog