Mercurial > hg
view tests/test-cbor.py @ 39270:37e56607cbb9
lfs: add a progress bar when searching for blobs to upload
The search itself can take an extreme amount of time if there are a lot of
revisions involved. I've got a local repo that took 6 minutes to push 1850
commits, and 60% of that time was spent here (there are ~70K files):
\ 58.1% wrapper.py: extractpointers line 297: pointers = extractpointers(...
| 57.7% wrapper.py: pointersfromctx line 352: for p in pointersfromctx(ct...
| 57.4% wrapper.py: pointerfromctx line 397: p = pointerfromctx(ctx, f, ...
\ 38.7% context.py: __contains__ line 368: if f not in ctx:
| 38.7% util.py: __get__ line 82: return key in self._manifest
| 38.7% context.py: _manifest line 1416: result = self.func(obj)
| 38.7% manifest.py: read line 472: return self._manifestctx.re...
\ 25.6% revlog.py: revision line 1562: text = rl.revision(self._node)
\ 12.8% revlog.py: _chunks line 2217: bins = self._chunks(chain, ...
| 12.0% revlog.py: decompressline 2112: ladd(decomp(buffer(data, ch...
\ 7.8% revlog.py: checkhash line 2232: self.checkhash(text, node, ...
| 7.8% revlog.py: hash line 2315: if node != self.hash(text, ...
| 7.8% revlog.py: hash line 2242: return hash(text, p1, p2)
\ 12.0% manifest.py: __init__ line 1565: self._data = manifestdict(t...
\ 16.8% context.py: filenode line 378: if not _islfs(fctx.filelog(...
| 15.7% util.py: __get__ line 706: return self._filelog
| 14.8% context.py: _filelog line 1416: result = self.func(obj)
| 14.8% localrepo.py: file line 629: return self._repo.file(self...
| 14.8% filelog.py: __init__ line 1134: return filelog.filelog(self...
| 14.5% revlog.py: __init__ line 24: censorable=True)
author | Matt Harbison <matt_harbison@yahoo.com> |
---|---|
date | Fri, 24 Aug 2018 17:45:46 -0400 |
parents | 2b3b6187c316 |
children | aeb551a3bb8a |
line wrap: on
line source
from __future__ import absolute_import import io import unittest from mercurial.thirdparty import ( cbor, ) from mercurial.utils import ( cborutil, ) def loadit(it): return cbor.loads(b''.join(it)) class BytestringTests(unittest.TestCase): def testsimple(self): self.assertEqual( list(cborutil.streamencode(b'foobar')), [b'\x46', b'foobar']) self.assertEqual( loadit(cborutil.streamencode(b'foobar')), b'foobar') def testlong(self): source = b'x' * 1048576 self.assertEqual(loadit(cborutil.streamencode(source)), source) def testfromiter(self): # This is the example from RFC 7049 Section 2.2.2. source = [b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99'] self.assertEqual( list(cborutil.streamencodebytestringfromiter(source)), [ b'\x5f', b'\x44', b'\xaa\xbb\xcc\xdd', b'\x43', b'\xee\xff\x99', b'\xff', ]) self.assertEqual( loadit(cborutil.streamencodebytestringfromiter(source)), b''.join(source)) def testfromiterlarge(self): source = [b'a' * 16, b'b' * 128, b'c' * 1024, b'd' * 1048576] self.assertEqual( loadit(cborutil.streamencodebytestringfromiter(source)), b''.join(source)) def testindefinite(self): source = b'\x00\x01\x02\x03' + b'\xff' * 16384 it = cborutil.streamencodeindefinitebytestring(source, chunksize=2) self.assertEqual(next(it), b'\x5f') self.assertEqual(next(it), b'\x42') self.assertEqual(next(it), b'\x00\x01') self.assertEqual(next(it), b'\x42') self.assertEqual(next(it), b'\x02\x03') self.assertEqual(next(it), b'\x42') self.assertEqual(next(it), b'\xff\xff') dest = b''.join(cborutil.streamencodeindefinitebytestring( source, chunksize=42)) self.assertEqual(cbor.loads(dest), source) def testreadtoiter(self): source = io.BytesIO(b'\x5f\x44\xaa\xbb\xcc\xdd\x43\xee\xff\x99\xff') it = cborutil.readindefinitebytestringtoiter(source) self.assertEqual(next(it), b'\xaa\xbb\xcc\xdd') self.assertEqual(next(it), b'\xee\xff\x99') with self.assertRaises(StopIteration): next(it) class IntTests(unittest.TestCase): def testsmall(self): self.assertEqual(list(cborutil.streamencode(0)), [b'\x00']) self.assertEqual(list(cborutil.streamencode(1)), [b'\x01']) self.assertEqual(list(cborutil.streamencode(2)), [b'\x02']) self.assertEqual(list(cborutil.streamencode(3)), [b'\x03']) self.assertEqual(list(cborutil.streamencode(4)), [b'\x04']) def testnegativesmall(self): self.assertEqual(list(cborutil.streamencode(-1)), [b'\x20']) self.assertEqual(list(cborutil.streamencode(-2)), [b'\x21']) self.assertEqual(list(cborutil.streamencode(-3)), [b'\x22']) self.assertEqual(list(cborutil.streamencode(-4)), [b'\x23']) self.assertEqual(list(cborutil.streamencode(-5)), [b'\x24']) def testrange(self): for i in range(-70000, 70000, 10): self.assertEqual( b''.join(cborutil.streamencode(i)), cbor.dumps(i)) class ArrayTests(unittest.TestCase): def testempty(self): self.assertEqual(list(cborutil.streamencode([])), [b'\x80']) self.assertEqual(loadit(cborutil.streamencode([])), []) def testbasic(self): source = [b'foo', b'bar', 1, -10] self.assertEqual(list(cborutil.streamencode(source)), [ b'\x84', b'\x43', b'foo', b'\x43', b'bar', b'\x01', b'\x29']) def testemptyfromiter(self): self.assertEqual(b''.join(cborutil.streamencodearrayfromiter([])), b'\x9f\xff') def testfromiter1(self): source = [b'foo'] self.assertEqual(list(cborutil.streamencodearrayfromiter(source)), [ b'\x9f', b'\x43', b'foo', b'\xff', ]) dest = b''.join(cborutil.streamencodearrayfromiter(source)) self.assertEqual(cbor.loads(dest), source) def testtuple(self): source = (b'foo', None, 42) self.assertEqual(cbor.loads(b''.join(cborutil.streamencode(source))), list(source)) class SetTests(unittest.TestCase): def testempty(self): self.assertEqual(list(cborutil.streamencode(set())), [ b'\xd9\x01\x02', b'\x80', ]) def testset(self): source = {b'foo', None, 42} self.assertEqual(cbor.loads(b''.join(cborutil.streamencode(source))), source) class BoolTests(unittest.TestCase): def testbasic(self): self.assertEqual(list(cborutil.streamencode(True)), [b'\xf5']) self.assertEqual(list(cborutil.streamencode(False)), [b'\xf4']) self.assertIs(loadit(cborutil.streamencode(True)), True) self.assertIs(loadit(cborutil.streamencode(False)), False) class NoneTests(unittest.TestCase): def testbasic(self): self.assertEqual(list(cborutil.streamencode(None)), [b'\xf6']) self.assertIs(loadit(cborutil.streamencode(None)), None) class MapTests(unittest.TestCase): def testempty(self): self.assertEqual(list(cborutil.streamencode({})), [b'\xa0']) self.assertEqual(loadit(cborutil.streamencode({})), {}) def testemptyindefinite(self): self.assertEqual(list(cborutil.streamencodemapfromiter([])), [ b'\xbf', b'\xff']) self.assertEqual(loadit(cborutil.streamencodemapfromiter([])), {}) def testone(self): source = {b'foo': b'bar'} self.assertEqual(list(cborutil.streamencode(source)), [ b'\xa1', b'\x43', b'foo', b'\x43', b'bar']) self.assertEqual(loadit(cborutil.streamencode(source)), source) def testmultiple(self): source = { b'foo': b'bar', b'baz': b'value1', } self.assertEqual(loadit(cborutil.streamencode(source)), source) self.assertEqual( loadit(cborutil.streamencodemapfromiter(source.items())), source) def testcomplex(self): source = { b'key': 1, 2: -10, } self.assertEqual(loadit(cborutil.streamencode(source)), source) self.assertEqual( loadit(cborutil.streamencodemapfromiter(source.items())), source) if __name__ == '__main__': import silenttestrunner silenttestrunner.main(__name__)