view mercurial/testing/__init__.py @ 51758:421c9b3f2f4e

commit: set whole manifest entries at once (node with its associated flags) Add a new function manifest.set that sets whole manifest entries at once, so the caller doesn't have to do two separate operations: m[p] = n m.set_flags(f) becomes: m.set(p, n, f) This obviously saves an extra lookup by path, and it also lets the underlying manifest implementation to be more efficient as it doesn't have to deal with partially-specified entries. It makes the interaction conceptually simpler, as well, since we don't have to go through an intermediate state of incorrect partially-written entry. (the real motivation for this change is an alternative manifest implementation where we batch pending writes, and dealing with fully defined entries makes the batching logic muchsimpler while avoiding slowdown due to alternating writes and reads)
author Arseniy Alekseyev <aalekseyev@janestreet.com>
date Thu, 01 Aug 2024 13:38:31 +0100
parents 8e0d823ef182
children f4733654f144
line wrap: on
line source

import os
import time


# work around check-code complains
#
# This is a simple log level module doing simple test related work, we can't
# import more things, and we do not need it.
environ = getattr(os, 'environ')


def wait_on_cfg(ui, cfg, timeout=10):
    """synchronize on the `cfg` config path

    Use this to synchronize commands during race tests.
    """
    full_config = b'sync.' + cfg
    wait_config = full_config + b'-timeout'
    sync_path = ui.config(b'devel', full_config)
    if sync_path is not None:
        timeout = ui.config(b'devel', wait_config)
        ready_path = sync_path + b'.waiting'
        write_file(ready_path)
        wait_file(sync_path, timeout=timeout)


def _timeout_factor():
    """return the current modification to timeout"""
    default = int(environ.get('HGTEST_TIMEOUT_DEFAULT', 360))
    current = int(environ.get('HGTEST_TIMEOUT', default))
    if current == 0:
        return 1
    return current / float(default)


def wait_file(path, timeout=10):
    timeout *= _timeout_factor()
    start = time.time()
    while not os.path.exists(path):
        if timeout and time.time() - start > timeout:
            raise RuntimeError(b"timed out waiting for file: %s" % path)
        time.sleep(0.01)


def write_file(path, content=b''):
    if content:
        write_path = b'%s.tmp' % path
    else:
        write_path = path
    with open(write_path, 'wb') as f:
        f.write(content)
    if path != write_path:
        os.rename(write_path, path)