tests/test-context.py
author Pierre-Yves David <pierre-yves.david@octobus.net>
Thu, 16 Feb 2023 04:02:36 +0100
changeset 50140 ff12f42415f5
parent 50096 d41960df197e
permissions -rw-r--r--
localrepo: stop doing special dirstate backup at transaction open Since the dirstate writes are already managed by the transaction, we already do a backup of the dirstate when necessary (and even trigger one to keep `hg rollback` happy). We needs some special code to deal with the initial empty checkout, but it is not too complicated. Managing variable filename (as dirstate-v2 uses) at the "journalfile" level, is complex and fragile (which is consistent with the fact these files are not journal…). If we no longer do it, our life is significantly simpler. In some sense, we apply the xkcd-1134¹ solution to our savebackup/restorebackup problem. [1] https://xkcd.com/1134/ (the change to test-hardlink are expect as decreasing the number of duplicated backup drive the hardlink count down)

import os
import stat
import sys
from mercurial.node import hex
from mercurial import (
    context,
    diffutil,
    encoding,
    hg,
    scmutil,
    ui as uimod,
)

print_ = print


def print(*args, **kwargs):
    """print() wrapper that flushes stdout buffers to avoid py3 buffer issues

    We could also just write directly to sys.stdout.buffer the way the
    ui object will, but this was easier for porting the test.
    """
    print_(*args, **kwargs)
    sys.stdout.flush()


def printb(data, end=b'\n'):
    out = getattr(sys.stdout, 'buffer', sys.stdout)
    out.write(data + end)
    out.flush()


ui = uimod.ui.load()

repo = hg.repository(ui, b'test1', create=1)
os.chdir('test1')

# create 'foo' with fixed time stamp
f = open('foo', 'wb')
f.write(b'foo\n')
f.close()
os.utime('foo', (1000, 1000))

# add+commit 'foo'
with repo.wlock(), repo.lock(), repo.transaction(b'test-context'):
    with repo.dirstate.changing_files(repo):
        repo[None].add([b'foo'])
    repo.commit(text=b'commit1', date=b"0 0")

d = repo[None][b'foo'].date()
if os.name == 'nt':
    d = d[:2]
print("workingfilectx.date = (%d, %d)" % d)

# test memctx with non-ASCII commit message


def filectxfn(repo, memctx, path):
    return context.memfilectx(repo, memctx, b"foo", b"")


ctx = context.memctx(
    repo,
    [b'tip', None],
    encoding.tolocal(b"Gr\xc3\xbcezi!"),
    [b"foo"],
    filectxfn,
)
ctx.commit()
for enc in "ASCII", "Latin-1", "UTF-8":
    encoding.encoding = enc
    printb(b"%-8s: %s" % (enc.encode('ascii'), repo[b"tip"].description()))

# test performing a status


def getfilectx(repo, memctx, f):
    fctx = memctx.p1()[f]
    data, flags = fctx.data(), fctx.flags()
    if f == b'foo':
        data += b'bar\n'
    return context.memfilectx(
        repo, memctx, f, data, b'l' in flags, b'x' in flags
    )


ctxa = repo[0]
ctxb = context.memctx(
    repo,
    [ctxa.node(), None],
    b"test diff",
    [b"foo"],
    getfilectx,
    ctxa.user(),
    ctxa.date(),
)

print(ctxb.status(ctxa))

# test performing a diff on a memctx
diffopts = diffutil.diffallopts(repo.ui, {b'git': True})
for d in ctxb.diff(ctxa, opts=diffopts):
    printb(d, end=b'')

# test safeness and correctness of "ctx.status()"
print('= checking context.status():')

# ancestor "wcctx ~ 2"
actx2 = repo[b'.']

repo.wwrite(b'bar-m', b'bar-m\n', b'')
repo.wwrite(b'bar-r', b'bar-r\n', b'')
with repo.wlock(), repo.lock(), repo.transaction(b'test-context'):
    with repo.dirstate.changing_files(repo):
        repo[None].add([b'bar-m', b'bar-r'])
    repo.commit(text=b'add bar-m, bar-r', date=b"0 0")

# ancestor "wcctx ~ 1"
actx1 = repo[b'.']

repo.wwrite(b'bar-m', b'bar-m bar-m\n', b'')
repo.wwrite(b'bar-a', b'bar-a\n', b'')
with repo.wlock(), repo.lock(), repo.transaction(b'test-context'):
    with repo.dirstate.changing_files(repo):
        repo[None].add([b'bar-a'])
        repo[None].forget([b'bar-r'])

# status at this point:
#   M bar-m
#   A bar-a
#   R bar-r
#   C foo

print('== checking workingctx.status:')

wctx = repo[None]
print('wctx._status=%s' % (str(wctx._status)))

print('=== with "pattern match":')
print(
    actx1.status(other=wctx, match=scmutil.matchfiles(repo, [b'bar-m', b'foo']))
)
print('wctx._status=%s' % (str(wctx._status)))
print(
    actx2.status(other=wctx, match=scmutil.matchfiles(repo, [b'bar-m', b'foo']))
)
print('wctx._status=%s' % (str(wctx._status)))

print('=== with "always match" and "listclean=True":')
print(actx1.status(other=wctx, listclean=True))
print('wctx._status=%s' % (str(wctx._status)))
print(actx2.status(other=wctx, listclean=True))
print('wctx._status=%s' % (str(wctx._status)))

print("== checking workingcommitctx.status:")

wcctx = context.workingcommitctx(
    repo,
    scmutil.status([b'bar-m'], [b'bar-a'], [], [], [], [], []),
    text=b'',
    date=b'0 0',
)
print('wcctx._status=%s' % (str(wcctx._status)))

print('=== with "always match":')
print(actx1.status(other=wcctx))
print('wcctx._status=%s' % (str(wcctx._status)))
print(actx2.status(other=wcctx))
print('wcctx._status=%s' % (str(wcctx._status)))

print('=== with "always match" and "listclean=True":')
print(actx1.status(other=wcctx, listclean=True))
print('wcctx._status=%s' % (str(wcctx._status)))
print(actx2.status(other=wcctx, listclean=True))
print('wcctx._status=%s' % (str(wcctx._status)))

print('=== with "pattern match":')
print(
    actx1.status(
        other=wcctx, match=scmutil.matchfiles(repo, [b'bar-m', b'foo'])
    )
)
print('wcctx._status=%s' % (str(wcctx._status)))
print(
    actx2.status(
        other=wcctx, match=scmutil.matchfiles(repo, [b'bar-m', b'foo'])
    )
)
print('wcctx._status=%s' % (str(wcctx._status)))

print('=== with "pattern match" and "listclean=True":')
print(
    actx1.status(
        other=wcctx,
        match=scmutil.matchfiles(repo, [b'bar-r', b'foo']),
        listclean=True,
    )
)
print('wcctx._status=%s' % (str(wcctx._status)))
print(
    actx2.status(
        other=wcctx,
        match=scmutil.matchfiles(repo, [b'bar-r', b'foo']),
        listclean=True,
    )
)
print('wcctx._status=%s' % (str(wcctx._status)))

os.chdir('..')

# test manifestlog being changed
print('== commit with manifestlog invalidated')

repo = hg.repository(ui, b'test2', create=1)
os.chdir('test2')

# make some commits
for i in [b'1', b'2', b'3']:
    with open(i, 'wb') as f:
        f.write(i)
    status = scmutil.status([], [i], [], [], [], [], [])
    ctx = context.workingcommitctx(
        repo, status, text=i, user=b'test@test.com', date=(0, 0)
    )
    ctx.p1().manifest()  # side effect: cache manifestctx
    n = repo.commitctx(ctx)
    printb(b'commit %s: %s' % (i, hex(n)))

    # touch 00manifest.i mtime so storecache could expire.
    # repo.__dict__['manifestlog'] is deleted by transaction releasefn.
    st = repo.svfs.stat(b'00manifest.i')
    repo.svfs.utime(
        b'00manifest.i', (st[stat.ST_MTIME] + 1, st[stat.ST_MTIME] + 1)
    )

    # read the file just committed
    try:
        if repo[n][i].data() != i:
            print('data mismatch')
    except Exception as ex:
        print('cannot read data: %r' % ex)

with repo.wlock(), repo.lock(), repo.transaction(b'test'):
    with open(b'4', 'wb') as f:
        f.write(b'4')
    with repo.dirstate.changing_files(repo):
        repo.dirstate.set_tracked(b'4')
    repo.commit(b'4')
    revsbefore = len(repo.changelog)
    repo.invalidate(clearfilecache=True)
    revsafter = len(repo.changelog)
    if revsbefore != revsafter:
        print('changeset lost by repo.invalidate()')