comparison mercurial/utils/cborutil.py @ 37711:65a23cc8e75b

cborutil: implement support for streaming encoding, bytestring decoding The vendored cbor2 package is... a bit disappointing. On the encoding side, it insists that you pass it something with a write() to send data to. That means if you want to emit data to a generator, you have to construct an e.g. io.BytesIO(), write() to it, then get the data back out. There can be non-trivial overhead involved. The encoder also doesn't support indefinite types - bytestrings, arrays, and maps that don't have a known length. Again, this is really unfortunate because it requires you to buffer the entire source and destination in memory to encode large things. On the decoding side, it supports reading indefinite length types. But it buffers them completely before returning. More sadness. This commit implements "streaming" encoders for various CBOR types. Encoding emits a generator of hunks. So you can efficiently stream encoded data elsewhere. It also implements support for emitting indefinite length bytestrings, arrays, and maps. On the decoding side, we only implement support for decoding an indefinite length bytestring from a file object. It will emit a generator of raw chunks from the source. I didn't want to reinvent so many wheels. But profiling the wire protocol revealed that the overhead of constructing io.BytesIO() instances to temporarily hold results has a non-trivial overhead. We're talking >15% of execution time for operations like "transfer the fulltexts of all files in a revision." So I can justify this effort. Fortunately, CBOR is a relatively straightforward format. And we have a reference implementation in the repo we can test against. Differential Revision: https://phab.mercurial-scm.org/D3303
author Gregory Szorc <gregory.szorc@gmail.com>
date Sat, 14 Apr 2018 16:36:15 -0700
parents
children 2ae6a3134362
comparison
equal deleted inserted replaced
37710:0a5fe2a08e82 37711:65a23cc8e75b
1 # cborutil.py - CBOR extensions
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
7
8 from __future__ import absolute_import
9
10 import struct
11
12 from ..thirdparty.cbor.cbor2 import (
13 decoder as decodermod,
14 )
15
16 # Very short very of RFC 7049...
17 #
18 # Each item begins with a byte. The 3 high bits of that byte denote the
19 # "major type." The lower 5 bits denote the "subtype." Each major type
20 # has its own encoding mechanism.
21 #
22 # Most types have lengths. However, bytestring, string, array, and map
23 # can be indefinite length. These are denotes by a subtype with value 31.
24 # Sub-components of those types then come afterwards and are terminated
25 # by a "break" byte.
26
27 MAJOR_TYPE_UINT = 0
28 MAJOR_TYPE_NEGINT = 1
29 MAJOR_TYPE_BYTESTRING = 2
30 MAJOR_TYPE_STRING = 3
31 MAJOR_TYPE_ARRAY = 4
32 MAJOR_TYPE_MAP = 5
33 MAJOR_TYPE_SEMANTIC = 6
34 MAJOR_TYPE_SPECIAL = 7
35
36 SUBTYPE_MASK = 0b00011111
37
38 SUBTYPE_HALF_FLOAT = 25
39 SUBTYPE_SINGLE_FLOAT = 26
40 SUBTYPE_DOUBLE_FLOAT = 27
41 SUBTYPE_INDEFINITE = 31
42
43 # Indefinite types begin with their major type ORd with information value 31.
44 BEGIN_INDEFINITE_BYTESTRING = struct.pack(
45 r'>B', MAJOR_TYPE_BYTESTRING << 5 | SUBTYPE_INDEFINITE)
46 BEGIN_INDEFINITE_ARRAY = struct.pack(
47 r'>B', MAJOR_TYPE_ARRAY << 5 | SUBTYPE_INDEFINITE)
48 BEGIN_INDEFINITE_MAP = struct.pack(
49 r'>B', MAJOR_TYPE_MAP << 5 | SUBTYPE_INDEFINITE)
50
51 ENCODED_LENGTH_1 = struct.Struct(r'>B')
52 ENCODED_LENGTH_2 = struct.Struct(r'>BB')
53 ENCODED_LENGTH_3 = struct.Struct(r'>BH')
54 ENCODED_LENGTH_4 = struct.Struct(r'>BL')
55 ENCODED_LENGTH_5 = struct.Struct(r'>BQ')
56
57 # The break ends an indefinite length item.
58 BREAK = b'\xff'
59 BREAK_INT = 255
60
61 def encodelength(majortype, length):
62 """Obtain a value encoding the major type and its length."""
63 if length < 24:
64 return ENCODED_LENGTH_1.pack(majortype << 5 | length)
65 elif length < 256:
66 return ENCODED_LENGTH_2.pack(majortype << 5 | 24, length)
67 elif length < 65536:
68 return ENCODED_LENGTH_3.pack(majortype << 5 | 25, length)
69 elif length < 4294967296:
70 return ENCODED_LENGTH_4.pack(majortype << 5 | 26, length)
71 else:
72 return ENCODED_LENGTH_5.pack(majortype << 5 | 27, length)
73
74 def streamencodebytestring(v):
75 yield encodelength(MAJOR_TYPE_BYTESTRING, len(v))
76 yield v
77
78 def streamencodebytestringfromiter(it):
79 """Convert an iterator of chunks to an indefinite bytestring.
80
81 Given an input that is iterable and each element in the iterator is
82 representable as bytes, emit an indefinite length bytestring.
83 """
84 yield BEGIN_INDEFINITE_BYTESTRING
85
86 for chunk in it:
87 yield encodelength(MAJOR_TYPE_BYTESTRING, len(chunk))
88 yield chunk
89
90 yield BREAK
91
92 def streamencodeindefinitebytestring(source, chunksize=65536):
93 """Given a large source buffer, emit as an indefinite length bytestring.
94
95 This is a generator of chunks constituting the encoded CBOR data.
96 """
97 yield BEGIN_INDEFINITE_BYTESTRING
98
99 i = 0
100 l = len(source)
101
102 while True:
103 chunk = source[i:i + chunksize]
104 i += len(chunk)
105
106 yield encodelength(MAJOR_TYPE_BYTESTRING, len(chunk))
107 yield chunk
108
109 if i >= l:
110 break
111
112 yield BREAK
113
114 def streamencodeint(v):
115 if v >= 18446744073709551616 or v < -18446744073709551616:
116 raise ValueError('big integers not supported')
117
118 if v >= 0:
119 yield encodelength(MAJOR_TYPE_UINT, v)
120 else:
121 yield encodelength(MAJOR_TYPE_NEGINT, abs(v) - 1)
122
123 def streamencodearray(l):
124 """Encode a known size iterable to an array."""
125
126 yield encodelength(MAJOR_TYPE_ARRAY, len(l))
127
128 for i in l:
129 for chunk in streamencode(i):
130 yield chunk
131
132 def streamencodearrayfromiter(it):
133 """Encode an iterator of items to an indefinite length array."""
134
135 yield BEGIN_INDEFINITE_ARRAY
136
137 for i in it:
138 for chunk in streamencode(i):
139 yield chunk
140
141 yield BREAK
142
143 def streamencodeset(s):
144 # https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml defines
145 # semantic tag 258 for finite sets.
146 yield encodelength(MAJOR_TYPE_SEMANTIC, 258)
147
148 for chunk in streamencodearray(sorted(s)):
149 yield chunk
150
151 def streamencodemap(d):
152 """Encode dictionary to a generator.
153
154 Does not supporting indefinite length dictionaries.
155 """
156 yield encodelength(MAJOR_TYPE_MAP, len(d))
157
158 for key, value in sorted(d.iteritems()):
159 for chunk in streamencode(key):
160 yield chunk
161 for chunk in streamencode(value):
162 yield chunk
163
164 def streamencodemapfromiter(it):
165 """Given an iterable of (key, value), encode to an indefinite length map."""
166 yield BEGIN_INDEFINITE_MAP
167
168 for key, value in it:
169 for chunk in streamencode(key):
170 yield chunk
171 for chunk in streamencode(value):
172 yield chunk
173
174 yield BREAK
175
176 def streamencodebool(b):
177 # major type 7, simple value 20 and 21.
178 yield b'\xf5' if b else b'\xf4'
179
180 def streamencodenone(v):
181 # major type 7, simple value 22.
182 yield b'\xf6'
183
184 STREAM_ENCODERS = {
185 bytes: streamencodebytestring,
186 int: streamencodeint,
187 list: streamencodearray,
188 tuple: streamencodearray,
189 dict: streamencodemap,
190 set: streamencodeset,
191 bool: streamencodebool,
192 type(None): streamencodenone,
193 }
194
195 def streamencode(v):
196 """Encode a value in a streaming manner.
197
198 Given an input object, encode it to CBOR recursively.
199
200 Returns a generator of CBOR encoded bytes. There is no guarantee
201 that each emitted chunk fully decodes to a value or sub-value.
202
203 Encoding is deterministic - unordered collections are sorted.
204 """
205 fn = STREAM_ENCODERS.get(v.__class__)
206
207 if not fn:
208 raise ValueError('do not know how to encode %s' % type(v))
209
210 return fn(v)
211
212 def readindefinitebytestringtoiter(fh, expectheader=True):
213 """Read an indefinite bytestring to a generator.
214
215 Receives an object with a ``read(X)`` method to read N bytes.
216
217 If ``expectheader`` is True, it is expected that the first byte read
218 will represent an indefinite length bytestring. Otherwise, we
219 expect the first byte to be part of the first bytestring chunk.
220 """
221 read = fh.read
222 decodeuint = decodermod.decode_uint
223 byteasinteger = decodermod.byte_as_integer
224
225 if expectheader:
226 initial = decodermod.byte_as_integer(read(1))
227
228 majortype = initial >> 5
229 subtype = initial & SUBTYPE_MASK
230
231 if majortype != MAJOR_TYPE_BYTESTRING:
232 raise decodermod.CBORDecodeError(
233 'expected major type %d; got %d' % (MAJOR_TYPE_BYTESTRING,
234 majortype))
235
236 if subtype != SUBTYPE_INDEFINITE:
237 raise decodermod.CBORDecodeError(
238 'expected indefinite subtype; got %d' % subtype)
239
240 # The indefinite bytestring is composed of chunks of normal bytestrings.
241 # Read chunks until we hit a BREAK byte.
242
243 while True:
244 # We need to sniff for the BREAK byte.
245 initial = byteasinteger(read(1))
246
247 if initial == BREAK_INT:
248 break
249
250 length = decodeuint(fh, initial & SUBTYPE_MASK)
251 chunk = read(length)
252
253 if len(chunk) != length:
254 raise decodermod.CBORDecodeError(
255 'failed to read bytestring chunk: got %d bytes; expected %d' % (
256 len(chunk), length))
257
258 yield chunk