Mercurial > hg
view tests/test-context.py @ 51482:32631657e9af
stream-clone-test: simplify the case testing phases
There is only two important things in this test:
- the number of file we send, to show we picked the phase roots.
- the resulting phases, to show we did not modified them.
The rest are of the number are very fragile and consume a lot of time for little
value when adjusting formats, caches, and protocol.
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Wed, 28 Feb 2024 22:43:07 +0100 |
parents | d41960df197e |
children |
line wrap: on
line source
import os import stat import sys from mercurial.node import hex from mercurial import ( context, diffutil, encoding, hg, scmutil, ui as uimod, ) print_ = print def print(*args, **kwargs): """print() wrapper that flushes stdout buffers to avoid py3 buffer issues We could also just write directly to sys.stdout.buffer the way the ui object will, but this was easier for porting the test. """ print_(*args, **kwargs) sys.stdout.flush() def printb(data, end=b'\n'): out = getattr(sys.stdout, 'buffer', sys.stdout) out.write(data + end) out.flush() ui = uimod.ui.load() repo = hg.repository(ui, b'test1', create=1) os.chdir('test1') # create 'foo' with fixed time stamp f = open('foo', 'wb') f.write(b'foo\n') f.close() os.utime('foo', (1000, 1000)) # add+commit 'foo' with repo.wlock(), repo.lock(), repo.transaction(b'test-context'): with repo.dirstate.changing_files(repo): repo[None].add([b'foo']) repo.commit(text=b'commit1', date=b"0 0") d = repo[None][b'foo'].date() if os.name == 'nt': d = d[:2] print("workingfilectx.date = (%d, %d)" % d) # test memctx with non-ASCII commit message def filectxfn(repo, memctx, path): return context.memfilectx(repo, memctx, b"foo", b"") ctx = context.memctx( repo, [b'tip', None], encoding.tolocal(b"Gr\xc3\xbcezi!"), [b"foo"], filectxfn, ) ctx.commit() for enc in "ASCII", "Latin-1", "UTF-8": encoding.encoding = enc printb(b"%-8s: %s" % (enc.encode('ascii'), repo[b"tip"].description())) # test performing a status def getfilectx(repo, memctx, f): fctx = memctx.p1()[f] data, flags = fctx.data(), fctx.flags() if f == b'foo': data += b'bar\n' return context.memfilectx( repo, memctx, f, data, b'l' in flags, b'x' in flags ) ctxa = repo[0] ctxb = context.memctx( repo, [ctxa.node(), None], b"test diff", [b"foo"], getfilectx, ctxa.user(), ctxa.date(), ) print(ctxb.status(ctxa)) # test performing a diff on a memctx diffopts = diffutil.diffallopts(repo.ui, {b'git': True}) for d in ctxb.diff(ctxa, opts=diffopts): printb(d, end=b'') # test safeness and correctness of "ctx.status()" print('= checking context.status():') # ancestor "wcctx ~ 2" actx2 = repo[b'.'] repo.wwrite(b'bar-m', b'bar-m\n', b'') repo.wwrite(b'bar-r', b'bar-r\n', b'') with repo.wlock(), repo.lock(), repo.transaction(b'test-context'): with repo.dirstate.changing_files(repo): repo[None].add([b'bar-m', b'bar-r']) repo.commit(text=b'add bar-m, bar-r', date=b"0 0") # ancestor "wcctx ~ 1" actx1 = repo[b'.'] repo.wwrite(b'bar-m', b'bar-m bar-m\n', b'') repo.wwrite(b'bar-a', b'bar-a\n', b'') with repo.wlock(), repo.lock(), repo.transaction(b'test-context'): with repo.dirstate.changing_files(repo): repo[None].add([b'bar-a']) repo[None].forget([b'bar-r']) # status at this point: # M bar-m # A bar-a # R bar-r # C foo print('== checking workingctx.status:') wctx = repo[None] print('wctx._status=%s' % (str(wctx._status))) print('=== with "pattern match":') print( actx1.status(other=wctx, match=scmutil.matchfiles(repo, [b'bar-m', b'foo'])) ) print('wctx._status=%s' % (str(wctx._status))) print( actx2.status(other=wctx, match=scmutil.matchfiles(repo, [b'bar-m', b'foo'])) ) print('wctx._status=%s' % (str(wctx._status))) print('=== with "always match" and "listclean=True":') print(actx1.status(other=wctx, listclean=True)) print('wctx._status=%s' % (str(wctx._status))) print(actx2.status(other=wctx, listclean=True)) print('wctx._status=%s' % (str(wctx._status))) print("== checking workingcommitctx.status:") wcctx = context.workingcommitctx( repo, scmutil.status([b'bar-m'], [b'bar-a'], [], [], [], [], []), text=b'', date=b'0 0', ) print('wcctx._status=%s' % (str(wcctx._status))) print('=== with "always match":') print(actx1.status(other=wcctx)) print('wcctx._status=%s' % (str(wcctx._status))) print(actx2.status(other=wcctx)) print('wcctx._status=%s' % (str(wcctx._status))) print('=== with "always match" and "listclean=True":') print(actx1.status(other=wcctx, listclean=True)) print('wcctx._status=%s' % (str(wcctx._status))) print(actx2.status(other=wcctx, listclean=True)) print('wcctx._status=%s' % (str(wcctx._status))) print('=== with "pattern match":') print( actx1.status( other=wcctx, match=scmutil.matchfiles(repo, [b'bar-m', b'foo']) ) ) print('wcctx._status=%s' % (str(wcctx._status))) print( actx2.status( other=wcctx, match=scmutil.matchfiles(repo, [b'bar-m', b'foo']) ) ) print('wcctx._status=%s' % (str(wcctx._status))) print('=== with "pattern match" and "listclean=True":') print( actx1.status( other=wcctx, match=scmutil.matchfiles(repo, [b'bar-r', b'foo']), listclean=True, ) ) print('wcctx._status=%s' % (str(wcctx._status))) print( actx2.status( other=wcctx, match=scmutil.matchfiles(repo, [b'bar-r', b'foo']), listclean=True, ) ) print('wcctx._status=%s' % (str(wcctx._status))) os.chdir('..') # test manifestlog being changed print('== commit with manifestlog invalidated') repo = hg.repository(ui, b'test2', create=1) os.chdir('test2') # make some commits for i in [b'1', b'2', b'3']: with open(i, 'wb') as f: f.write(i) status = scmutil.status([], [i], [], [], [], [], []) ctx = context.workingcommitctx( repo, status, text=i, user=b'test@test.com', date=(0, 0) ) ctx.p1().manifest() # side effect: cache manifestctx n = repo.commitctx(ctx) printb(b'commit %s: %s' % (i, hex(n))) # touch 00manifest.i mtime so storecache could expire. # repo.__dict__['manifestlog'] is deleted by transaction releasefn. st = repo.svfs.stat(b'00manifest.i') repo.svfs.utime( b'00manifest.i', (st[stat.ST_MTIME] + 1, st[stat.ST_MTIME] + 1) ) # read the file just committed try: if repo[n][i].data() != i: print('data mismatch') except Exception as ex: print('cannot read data: %r' % ex) with repo.wlock(), repo.lock(), repo.transaction(b'test'): with open(b'4', 'wb') as f: f.write(b'4') with repo.dirstate.changing_files(repo): repo.dirstate.set_tracked(b'4') repo.commit(b'4') revsbefore = len(repo.changelog) repo.invalidate(clearfilecache=True) revsafter = len(repo.changelog) if revsbefore != revsafter: print('changeset lost by repo.invalidate()')