view tests/test-wireproto-clientreactor.py @ 40326:fed697fa1734

sqlitestore: file storage backend using SQLite This commit provides an extension which uses SQLite to store file data (as opposed to revlogs). As the inline documentation describes, there are still several aspects to the extension that are incomplete. But it's a start. The extension does support basic clone, checkout, and commit workflows, which makes it suitable for simple use cases. One notable missing feature is support for "bundlerepos." This is probably responsible for the most test failures when the extension is activated as part of the test suite. All revision data is stored in SQLite. Data is stored as zstd compressed chunks (default if zstd is available), zlib compressed chunks (default if zstd is not available), or raw chunks (if configured or if a compressed delta is not smaller than the raw delta). This makes things very similar to revlogs. Unlike revlogs, the extension doesn't yet enforce a limit on delta chain length. This is an obvious limitation and should be addressed. This is somewhat mitigated by the use of zstd, which is much faster than zlib to decompress. There is a dedicated table for storing deltas. Deltas are stored by the SHA-1 hash of their uncompressed content. The "fileindex" table has columns that reference the delta for each revision and the base delta that delta should be applied against. A recursive SQL query is used to resolve the delta chain along with the delta data. By storing deltas by hash, we are able to de-duplicate delta storage! With revlogs, the same deltas in different revlogs would result in duplicate storage of that delta. In this scheme, inserting the duplicate delta is a no-op and delta chains simply reference the existing delta. When initially implementing this extension, I did not have content-indexed deltas and deltas could be duplicated across files (just like revlogs). When I implemented content-indexed deltas, the size of the SQLite database for a full clone of mozilla-unified dropped: before: 2,554,261,504 bytes after: 2,488,754,176 bytes Surprisingly, this is still larger than the bytes size of revlog files: revlog files: 2,104,861,230 bytes du -b: 2,254,381,614 I would have expected storage to be smaller since we're not limiting delta chain length and since we're using zstd instead of zlib. I suspect the SQLite indexes and per-column overhead account for the bulk of the differences. (Keep in mind that revlog uses a 64-byte packed struct for revision index data and deltas are stored without padding. Aside from the 12 unused bytes in the 32 byte node field, revlogs are pretty efficient.) Another source of overhead is file name storage. With revlogs, file names are stored in the filesystem. But with SQLite, we need to store file names in the database. This is roughly equivalent to the size of the fncache file, which for the mozilla-unified repository is ~34MB. Since the SQLite database isn't append-only and since delta chains can reference any delta, this opens some interesting possibilities. For example, we could store deltas in reverse, such that fulltexts are stored for newer revisions and deltas are applied to reconstruct older revisions. This is likely a more optimal storage strategy for version control, as new data tends to be more frequently accessed than old data. We would obviously need wire protocol support for transferring revision data from newest to oldest. And we would probably need some kind of mechanism for "re-encoding" stores. But it should be doable. This extension is very much experimental quality. There are a handful of features that don't work. It probably isn't suitable for day-to-day use. But it could be used in limited cases (e.g. read-only checkouts like in CI). And it is also a good proving ground for alternate storage backends. As we continue to define interfaces for all things storage, it will be useful to have a viable alternate storage backend to see how things shake out in practice. test-storage.py passes on Python 2 and introduces no new test failures on Python 3. Having the storage-level unit tests has proved to be insanely useful when developing this extension. Those tests caught numerous bugs during development and I'm convinced this style of testing is the way forward for ensuring alternate storage backends work as intended. Of course, test coverage isn't close to what it needs to be. But it is a start. And what coverage we have gives me confidence that basic store functionality is implemented properly. Differential Revision: https://phab.mercurial-scm.org/D4928
author Gregory Szorc <gregory.szorc@gmail.com>
date Tue, 09 Oct 2018 08:50:13 -0700
parents e67522413ca8
children 07b87ee2ea75
line wrap: on
line source

from __future__ import absolute_import

import unittest
import zlib

from mercurial import (
    error,
    ui as uimod,
    wireprotoframing as framing,
)
from mercurial.utils import (
    cborutil,
)

try:
    from mercurial import zstd
    zstd.__version__
except ImportError:
    zstd = None

ffs = framing.makeframefromhumanstring

globalui = uimod.ui()

def sendframe(reactor, frame):
    """Send a frame bytearray to a reactor."""
    header = framing.parseheader(frame)
    payload = frame[framing.FRAME_HEADER_SIZE:]
    assert len(payload) == header.length

    return reactor.onframerecv(framing.frame(header.requestid,
                                             header.streamid,
                                             header.streamflags,
                                             header.typeid,
                                             header.flags,
                                             payload))

class SingleSendTests(unittest.TestCase):
    """A reactor that can only send once rejects subsequent sends."""

    if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
        # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
        # the regex version.
        assertRaisesRegex = (# camelcase-required
            unittest.TestCase.assertRaisesRegexp)

    def testbasic(self):
        reactor = framing.clientreactor(globalui,
                                        hasmultiplesend=False,
                                        buffersends=True)

        request, action, meta = reactor.callcommand(b'foo', {})
        self.assertEqual(request.state, b'pending')
        self.assertEqual(action, b'noop')

        action, meta = reactor.flushcommands()
        self.assertEqual(action, b'sendframes')

        for frame in meta[b'framegen']:
            self.assertEqual(request.state, b'sending')

        self.assertEqual(request.state, b'sent')

        with self.assertRaisesRegex(error.ProgrammingError,
                                     'cannot issue new commands'):
            reactor.callcommand(b'foo', {})

        with self.assertRaisesRegex(error.ProgrammingError,
                                     'cannot issue new commands'):
            reactor.callcommand(b'foo', {})

class NoBufferTests(unittest.TestCase):
    """A reactor without send buffering sends requests immediately."""
    def testbasic(self):
        reactor = framing.clientreactor(globalui,
                                        hasmultiplesend=True,
                                        buffersends=False)

        request, action, meta = reactor.callcommand(b'command1', {})
        self.assertEqual(request.requestid, 1)
        self.assertEqual(action, b'sendframes')

        self.assertEqual(request.state, b'pending')

        for frame in meta[b'framegen']:
            self.assertEqual(request.state, b'sending')

        self.assertEqual(request.state, b'sent')

        action, meta = reactor.flushcommands()
        self.assertEqual(action, b'noop')

        # And we can send another command.
        request, action, meta = reactor.callcommand(b'command2', {})
        self.assertEqual(request.requestid, 3)
        self.assertEqual(action, b'sendframes')

        for frame in meta[b'framegen']:
            self.assertEqual(request.state, b'sending')

        self.assertEqual(request.state, b'sent')

class BadFrameRecvTests(unittest.TestCase):
    if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
        # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
        # the regex version.
        assertRaisesRegex = (# camelcase-required
            unittest.TestCase.assertRaisesRegexp)

    def testoddstream(self):
        reactor = framing.clientreactor(globalui)

        action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
        self.assertEqual(action, b'error')
        self.assertEqual(meta[b'message'],
                         b'received frame with odd numbered stream ID: 1')

    def testunknownstream(self):
        reactor = framing.clientreactor(globalui)

        action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
        self.assertEqual(action, b'error')
        self.assertEqual(meta[b'message'],
                         b'received frame on unknown stream without beginning '
                         b'of stream flag set')

    def testunhandledframetype(self):
        reactor = framing.clientreactor(globalui, buffersends=False)

        request, action, meta = reactor.callcommand(b'foo', {})
        for frame in meta[b'framegen']:
            pass

        with self.assertRaisesRegex(error.ProgrammingError,
                                     'unhandled frame type'):
            sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))

class StreamTests(unittest.TestCase):
    def testmultipleresponseframes(self):
        reactor = framing.clientreactor(globalui, buffersends=False)

        request, action, meta = reactor.callcommand(b'foo', {})

        self.assertEqual(action, b'sendframes')
        for f in meta[b'framegen']:
            pass

        action, meta = sendframe(
            reactor,
            ffs(b'%d 0 stream-begin command-response 0 foo' %
                request.requestid))
        self.assertEqual(action, b'responsedata')

        action, meta = sendframe(
            reactor,
            ffs(b'%d 0 0 command-response eos bar' % request.requestid))
        self.assertEqual(action, b'responsedata')

class RedirectTests(unittest.TestCase):
    def testredirect(self):
        reactor = framing.clientreactor(globalui, buffersends=False)

        redirect = {
            b'targets': [b'a', b'b'],
            b'hashes': [b'sha256'],
        }

        request, action, meta = reactor.callcommand(
            b'foo', {}, redirect=redirect)

        self.assertEqual(action, b'sendframes')

        frames = list(meta[b'framegen'])
        self.assertEqual(len(frames), 1)

        self.assertEqual(frames[0],
                         ffs(b'1 1 stream-begin command-request new '
                             b"cbor:{b'name': b'foo', "
                             b"b'redirect': {b'targets': [b'a', b'b'], "
                             b"b'hashes': [b'sha256']}}"))

class StreamSettingsTests(unittest.TestCase):
    def testnoflags(self):
        reactor = framing.clientreactor(globalui, buffersends=False)

        request, action, meta = reactor.callcommand(b'foo', {})
        for f in meta[b'framegen']:
            pass

        action, meta = sendframe(reactor,
            ffs(b'1 2 stream-begin stream-settings 0 '))

        self.assertEqual(action, b'error')
        self.assertEqual(meta, {
            b'message': b'stream encoding settings frame must have '
                        b'continuation or end of stream flag set',
        })

    def testconflictflags(self):
        reactor = framing.clientreactor(globalui, buffersends=False)

        request, action, meta = reactor.callcommand(b'foo', {})
        for f in meta[b'framegen']:
            pass

        action, meta = sendframe(reactor,
            ffs(b'1 2 stream-begin stream-settings continuation|eos '))

        self.assertEqual(action, b'error')
        self.assertEqual(meta, {
            b'message': b'stream encoding settings frame cannot have both '
                        b'continuation and end of stream flags set',
        })

    def testemptypayload(self):
        reactor = framing.clientreactor(globalui, buffersends=False)

        request, action, meta = reactor.callcommand(b'foo', {})
        for f in meta[b'framegen']:
            pass

        action, meta = sendframe(reactor,
            ffs(b'1 2 stream-begin stream-settings eos '))

        self.assertEqual(action, b'error')
        self.assertEqual(meta, {
            b'message': b'stream encoding settings frame did not contain '
                        b'CBOR data'
        })

    def testbadcbor(self):
        reactor = framing.clientreactor(globalui, buffersends=False)

        request, action, meta = reactor.callcommand(b'foo', {})
        for f in meta[b'framegen']:
            pass

        action, meta = sendframe(reactor,
            ffs(b'1 2 stream-begin stream-settings eos badvalue'))

        self.assertEqual(action, b'error')

    def testsingleobject(self):
        reactor = framing.clientreactor(globalui, buffersends=False)

        request, action, meta = reactor.callcommand(b'foo', {})
        for f in meta[b'framegen']:
            pass

        action, meta = sendframe(reactor,
            ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"'))

        self.assertEqual(action, b'noop')
        self.assertEqual(meta, {})

    def testmultipleobjects(self):
        reactor = framing.clientreactor(globalui, buffersends=False)

        request, action, meta = reactor.callcommand(b'foo', {})
        for f in meta[b'framegen']:
            pass

        data = b''.join([
            b''.join(cborutil.streamencode(b'identity')),
            b''.join(cborutil.streamencode({b'foo', b'bar'})),
        ])

        action, meta = sendframe(reactor,
            ffs(b'1 2 stream-begin stream-settings eos %s' % data))

        self.assertEqual(action, b'error')
        self.assertEqual(meta, {
            b'message': b'error setting stream decoder: identity decoder '
                        b'received unexpected additional values',
        })

    def testmultipleframes(self):
        reactor = framing.clientreactor(globalui, buffersends=False)

        request, action, meta = reactor.callcommand(b'foo', {})
        for f in meta[b'framegen']:
            pass

        data = b''.join(cborutil.streamencode(b'identity'))

        action, meta = sendframe(reactor,
            ffs(b'1 2 stream-begin stream-settings continuation %s' %
                data[0:3]))

        self.assertEqual(action, b'noop')
        self.assertEqual(meta, {})

        action, meta = sendframe(reactor,
            ffs(b'1 2 0 stream-settings eos %s' % data[3:]))

        self.assertEqual(action, b'noop')
        self.assertEqual(meta, {})

    def testinvalidencoder(self):
        reactor = framing.clientreactor(globalui, buffersends=False)

        request, action, meta = reactor.callcommand(b'foo', {})
        for f in meta[b'framegen']:
            pass

        action, meta = sendframe(reactor,
            ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"'))

        self.assertEqual(action, b'error')
        self.assertEqual(meta, {
            b'message': b'error setting stream decoder: unknown stream '
                        b'decoder: badvalue',
        })

    def testzlibencoding(self):
        reactor = framing.clientreactor(globalui, buffersends=False)

        request, action, meta = reactor.callcommand(b'foo', {})
        for f in meta[b'framegen']:
            pass

        action, meta = sendframe(reactor,
            ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
                request.requestid))

        self.assertEqual(action, b'noop')
        self.assertEqual(meta, {})

        result = {
            b'status': b'ok',
        }
        encoded = b''.join(cborutil.streamencode(result))

        compressed = zlib.compress(encoded)
        self.assertEqual(zlib.decompress(compressed), encoded)

        action, meta = sendframe(reactor,
            ffs(b'%d 2 encoded command-response eos %s' %
                (request.requestid, compressed)))

        self.assertEqual(action, b'responsedata')
        self.assertEqual(meta[b'data'], encoded)

    def testzlibencodingsinglebyteframes(self):
        reactor = framing.clientreactor(globalui, buffersends=False)

        request, action, meta = reactor.callcommand(b'foo', {})
        for f in meta[b'framegen']:
            pass

        action, meta = sendframe(reactor,
            ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
                request.requestid))

        self.assertEqual(action, b'noop')
        self.assertEqual(meta, {})

        result = {
            b'status': b'ok',
        }
        encoded = b''.join(cborutil.streamencode(result))

        compressed = zlib.compress(encoded)
        self.assertEqual(zlib.decompress(compressed), encoded)

        chunks = []

        for i in range(len(compressed)):
            char = compressed[i:i + 1]
            if char == b'\\':
                char = b'\\\\'
            action, meta = sendframe(reactor,
                ffs(b'%d 2 encoded command-response continuation %s' %
                    (request.requestid, char)))

            self.assertEqual(action, b'responsedata')
            chunks.append(meta[b'data'])
            self.assertTrue(meta[b'expectmore'])
            self.assertFalse(meta[b'eos'])

        # zlib will have the full data decoded at this point, even though
        # we haven't flushed.
        self.assertEqual(b''.join(chunks), encoded)

        # End the stream for good measure.
        action, meta = sendframe(reactor,
            ffs(b'%d 2 stream-end command-response eos ' % request.requestid))

        self.assertEqual(action, b'responsedata')
        self.assertEqual(meta[b'data'], b'')
        self.assertFalse(meta[b'expectmore'])
        self.assertTrue(meta[b'eos'])

    def testzlibmultipleresponses(self):
        # We feed in zlib compressed data on the same stream but belonging to
        # 2 different requests. This tests our flushing behavior.
        reactor = framing.clientreactor(globalui, buffersends=False,
                                        hasmultiplesend=True)

        request1, action, meta = reactor.callcommand(b'foo', {})
        for f in meta[b'framegen']:
            pass

        request2, action, meta = reactor.callcommand(b'foo', {})
        for f in meta[b'framegen']:
            pass

        outstream = framing.outputstream(2)
        outstream.setencoder(globalui, b'zlib')

        response1 = b''.join(cborutil.streamencode({
            b'status': b'ok',
            b'extra': b'response1' * 10,
        }))

        response2 = b''.join(cborutil.streamencode({
            b'status': b'error',
            b'extra': b'response2' * 10,
        }))

        action, meta = sendframe(reactor,
            ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
                request1.requestid))

        self.assertEqual(action, b'noop')
        self.assertEqual(meta, {})

        # Feeding partial data in won't get anything useful out.
        action, meta = sendframe(reactor,
            ffs(b'%d 2 encoded command-response continuation %s' % (
                request1.requestid, outstream.encode(response1))))
        self.assertEqual(action, b'responsedata')
        self.assertEqual(meta[b'data'], b'')

        # But flushing data at both ends will get our original data.
        action, meta = sendframe(reactor,
            ffs(b'%d 2 encoded command-response eos %s' % (
                request1.requestid, outstream.flush())))
        self.assertEqual(action, b'responsedata')
        self.assertEqual(meta[b'data'], response1)

        # We should be able to reuse the compressor/decompressor for the
        # 2nd response.
        action, meta = sendframe(reactor,
            ffs(b'%d 2 encoded command-response continuation %s' % (
                request2.requestid, outstream.encode(response2))))
        self.assertEqual(action, b'responsedata')
        self.assertEqual(meta[b'data'], b'')

        action, meta = sendframe(reactor,
            ffs(b'%d 2 encoded command-response eos %s' % (
                request2.requestid, outstream.flush())))
        self.assertEqual(action, b'responsedata')
        self.assertEqual(meta[b'data'], response2)

    @unittest.skipUnless(zstd, 'zstd not available')
    def testzstd8mbencoding(self):
        reactor = framing.clientreactor(globalui, buffersends=False)

        request, action, meta = reactor.callcommand(b'foo', {})
        for f in meta[b'framegen']:
            pass

        action, meta = sendframe(reactor,
            ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
                request.requestid))

        self.assertEqual(action, b'noop')
        self.assertEqual(meta, {})

        result = {
            b'status': b'ok',
        }
        encoded = b''.join(cborutil.streamencode(result))

        encoder = framing.zstd8mbencoder(globalui)
        compressed = encoder.encode(encoded) + encoder.finish()
        self.assertEqual(zstd.ZstdDecompressor().decompress(
            compressed, max_output_size=len(encoded)), encoded)

        action, meta = sendframe(reactor,
            ffs(b'%d 2 encoded command-response eos %s' %
                (request.requestid, compressed)))

        self.assertEqual(action, b'responsedata')
        self.assertEqual(meta[b'data'], encoded)

    @unittest.skipUnless(zstd, 'zstd not available')
    def testzstd8mbencodingsinglebyteframes(self):
        reactor = framing.clientreactor(globalui, buffersends=False)

        request, action, meta = reactor.callcommand(b'foo', {})
        for f in meta[b'framegen']:
            pass

        action, meta = sendframe(reactor,
            ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
                request.requestid))

        self.assertEqual(action, b'noop')
        self.assertEqual(meta, {})

        result = {
            b'status': b'ok',
        }
        encoded = b''.join(cborutil.streamencode(result))

        compressed = zstd.ZstdCompressor().compress(encoded)
        self.assertEqual(zstd.ZstdDecompressor().decompress(compressed),
                         encoded)

        chunks = []

        for i in range(len(compressed)):
            char = compressed[i:i + 1]
            if char == b'\\':
                char = b'\\\\'
            action, meta = sendframe(reactor,
                ffs(b'%d 2 encoded command-response continuation %s' %
                    (request.requestid, char)))

            self.assertEqual(action, b'responsedata')
            chunks.append(meta[b'data'])
            self.assertTrue(meta[b'expectmore'])
            self.assertFalse(meta[b'eos'])

        # zstd decompressor will flush at frame boundaries.
        self.assertEqual(b''.join(chunks), encoded)

        # End the stream for good measure.
        action, meta = sendframe(reactor,
            ffs(b'%d 2 stream-end command-response eos ' % request.requestid))

        self.assertEqual(action, b'responsedata')
        self.assertEqual(meta[b'data'], b'')
        self.assertFalse(meta[b'expectmore'])
        self.assertTrue(meta[b'eos'])

    @unittest.skipUnless(zstd, 'zstd not available')
    def testzstd8mbmultipleresponses(self):
        # We feed in zstd compressed data on the same stream but belonging to
        # 2 different requests. This tests our flushing behavior.
        reactor = framing.clientreactor(globalui, buffersends=False,
                                        hasmultiplesend=True)

        request1, action, meta = reactor.callcommand(b'foo', {})
        for f in meta[b'framegen']:
            pass

        request2, action, meta = reactor.callcommand(b'foo', {})
        for f in meta[b'framegen']:
            pass

        outstream = framing.outputstream(2)
        outstream.setencoder(globalui, b'zstd-8mb')

        response1 = b''.join(cborutil.streamencode({
            b'status': b'ok',
            b'extra': b'response1' * 10,
        }))

        response2 = b''.join(cborutil.streamencode({
            b'status': b'error',
            b'extra': b'response2' * 10,
        }))

        action, meta = sendframe(reactor,
            ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
                request1.requestid))

        self.assertEqual(action, b'noop')
        self.assertEqual(meta, {})

        # Feeding partial data in won't get anything useful out.
        action, meta = sendframe(reactor,
            ffs(b'%d 2 encoded command-response continuation %s' % (
                request1.requestid, outstream.encode(response1))))
        self.assertEqual(action, b'responsedata')
        self.assertEqual(meta[b'data'], b'')

        # But flushing data at both ends will get our original data.
        action, meta = sendframe(reactor,
            ffs(b'%d 2 encoded command-response eos %s' % (
                request1.requestid, outstream.flush())))
        self.assertEqual(action, b'responsedata')
        self.assertEqual(meta[b'data'], response1)

        # We should be able to reuse the compressor/decompressor for the
        # 2nd response.
        action, meta = sendframe(reactor,
            ffs(b'%d 2 encoded command-response continuation %s' % (
                request2.requestid, outstream.encode(response2))))
        self.assertEqual(action, b'responsedata')
        self.assertEqual(meta[b'data'], b'')

        action, meta = sendframe(reactor,
            ffs(b'%d 2 encoded command-response eos %s' % (
                request2.requestid, outstream.flush())))
        self.assertEqual(action, b'responsedata')
        self.assertEqual(meta[b'data'], response2)

if __name__ == '__main__':
    import silenttestrunner
    silenttestrunner.main(__name__)