view tests/test-storage.py @ 40033:5e5b06087ec5

keepalive: track number of bytes received from an HTTP response We also bubble the byte count up to the HTTPConnection instance and its parent opener at read time. Unlike sending, there isn't a clear "end of response" signal we can intercept to defer updating the accounting. Differential Revision: https://phab.mercurial-scm.org/D4857
author Gregory Szorc <gregory.szorc@gmail.com>
date Mon, 01 Oct 2018 12:30:32 -0700
parents a3a9b93bff80
children cdf61ab1f54c
line wrap: on
line source

# This test verifies the conformance of various classes to various
# storage interfaces.
from __future__ import absolute_import

import silenttestrunner

from mercurial import (
    filelog,
    transaction,
    ui as uimod,
    vfs as vfsmod,
)

from mercurial.testing import (
    storage as storagetesting,
)

STATE = {
    'lastindex': 0,
    'ui': uimod.ui(),
    'vfs': vfsmod.vfs(b'.', realpath=True),
}

def makefilefn(self):
    """Factory for filelog instances."""
    fl = filelog.filelog(STATE['vfs'], b'filelog-%d' % STATE['lastindex'])
    STATE['lastindex'] += 1
    return fl

def maketransaction(self):
    vfsmap = {'plain': STATE['vfs']}

    return transaction.transaction(STATE['ui'].warn, STATE['vfs'], vfsmap,
                                   b'journal', b'undo')

# Assigning module-level attributes that inherit from unittest.TestCase
# is all that is needed to register tests.
filelogindextests = storagetesting.makeifileindextests(makefilefn,
                                                       maketransaction)
filelogdatatests = storagetesting.makeifiledatatests(makefilefn,
                                                     maketransaction)
filelogmutationtests = storagetesting.makeifilemutationtests(makefilefn,
                                                             maketransaction)

if __name__ == '__main__':
    silenttestrunner.main(__name__)