Mercurial > evolve
view hgext3rd/evolve/thirdparty/cbor.py @ 4714:c51fc0ae7a7e
py3: switch from iteritems() to items()
author | Martin von Zweigbergk <martinvonz@google.com> |
---|---|
date | Wed, 03 Jul 2019 11:13:47 -0700 |
parents | b3dbba6e34c9 |
children | 48b30ff742cb |
line wrap: on
line source
#!python # -*- Python -*- # # Copyright 2014-2015 Brian Olson # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime import re import struct import sys _IS_PY3 = sys.version_info[0] >= 3 if _IS_PY3: from io import BytesIO as StringIO else: try: from cStringIO import StringIO except: from StringIO import StringIO CBOR_TYPE_MASK = 0xE0 # top 3 bits CBOR_INFO_BITS = 0x1F # low 5 bits CBOR_UINT = 0x00 CBOR_NEGINT = 0x20 CBOR_BYTES = 0x40 CBOR_TEXT = 0x60 CBOR_ARRAY = 0x80 CBOR_MAP = 0xA0 CBOR_TAG = 0xC0 CBOR_7 = 0xE0 # float and other types CBOR_UINT8_FOLLOWS = 24 # 0x18 CBOR_UINT16_FOLLOWS = 25 # 0x19 CBOR_UINT32_FOLLOWS = 26 # 0x1a CBOR_UINT64_FOLLOWS = 27 # 0x1b CBOR_VAR_FOLLOWS = 31 # 0x1f CBOR_BREAK = 0xFF CBOR_FALSE = (CBOR_7 | 20) CBOR_TRUE = (CBOR_7 | 21) CBOR_NULL = (CBOR_7 | 22) CBOR_UNDEFINED = (CBOR_7 | 23) # js 'undefined' value CBOR_FLOAT16 = (CBOR_7 | 25) CBOR_FLOAT32 = (CBOR_7 | 26) CBOR_FLOAT64 = (CBOR_7 | 27) CBOR_TAG_DATE_STRING = 0 # RFC3339 CBOR_TAG_DATE_ARRAY = 1 # any number type follows, seconds since 1970-01-01T00:00:00 UTC CBOR_TAG_BIGNUM = 2 # big endian byte string follows CBOR_TAG_NEGBIGNUM = 3 # big endian byte string follows CBOR_TAG_DECIMAL = 4 # [ 10^x exponent, number ] CBOR_TAG_BIGFLOAT = 5 # [ 2^x exponent, number ] CBOR_TAG_BASE64URL = 21 CBOR_TAG_BASE64 = 22 CBOR_TAG_BASE16 = 23 CBOR_TAG_CBOR = 24 # following byte string is embedded CBOR data CBOR_TAG_URI = 32 CBOR_TAG_BASE64URL = 33 CBOR_TAG_BASE64 = 34 CBOR_TAG_REGEX = 35 CBOR_TAG_MIME = 36 # following text is MIME message, headers, separators and all CBOR_TAG_CBOR_FILEHEADER = 55799 # can open a file with 0xd9d9f7 _CBOR_TAG_BIGNUM_BYTES = struct.pack('B', CBOR_TAG | CBOR_TAG_BIGNUM) def dumps_int(val): "return bytes representing int val in CBOR" if val >= 0: # CBOR_UINT is 0, so I'm lazy/efficient about not OR-ing it in. if val <= 23: return struct.pack('B', val) if val <= 0x0ff: return struct.pack('BB', CBOR_UINT8_FOLLOWS, val) if val <= 0x0ffff: return struct.pack('!BH', CBOR_UINT16_FOLLOWS, val) if val <= 0x0ffffffff: return struct.pack('!BI', CBOR_UINT32_FOLLOWS, val) if val <= 0x0ffffffffffffffff: return struct.pack('!BQ', CBOR_UINT64_FOLLOWS, val) outb = _dumps_bignum_to_bytearray(val) return _CBOR_TAG_BIGNUM_BYTES + _encode_type_num(CBOR_BYTES, len(outb)) + outb val = -1 - val return _encode_type_num(CBOR_NEGINT, val) if _IS_PY3: def _dumps_bignum_to_bytearray(val): out = [] while val > 0: out.insert(0, val & 0x0ff) val = val >> 8 return bytes(out) else: def _dumps_bignum_to_bytearray(val): out = [] while val > 0: out.insert(0, chr(val & 0x0ff)) val = val >> 8 return b''.join(out) def dumps_float(val): return struct.pack("!Bd", CBOR_FLOAT64, val) _CBOR_TAG_NEGBIGNUM_BYTES = struct.pack('B', CBOR_TAG | CBOR_TAG_NEGBIGNUM) def _encode_type_num(cbor_type, val): """For some CBOR primary type [0..7] and an auxiliary unsigned number, return CBOR encoded bytes""" assert val >= 0 if val <= 23: return struct.pack('B', cbor_type | val) if val <= 0x0ff: return struct.pack('BB', cbor_type | CBOR_UINT8_FOLLOWS, val) if val <= 0x0ffff: return struct.pack('!BH', cbor_type | CBOR_UINT16_FOLLOWS, val) if val <= 0x0ffffffff: return struct.pack('!BI', cbor_type | CBOR_UINT32_FOLLOWS, val) if (((cbor_type == CBOR_NEGINT) and (val <= 0x07fffffffffffffff)) or ((cbor_type != CBOR_NEGINT) and (val <= 0x0ffffffffffffffff))): return struct.pack('!BQ', cbor_type | CBOR_UINT64_FOLLOWS, val) if cbor_type != CBOR_NEGINT: raise Exception("value too big for CBOR unsigned number: {0!r}".format(val)) outb = _dumps_bignum_to_bytearray(val) return _CBOR_TAG_NEGBIGNUM_BYTES + _encode_type_num(CBOR_BYTES, len(outb)) + outb if _IS_PY3: def _is_unicode(val): return isinstance(val, str) else: def _is_unicode(val): return isinstance(val, unicode) def dumps_string(val, is_text=None, is_bytes=None): if _is_unicode(val): val = val.encode('utf8') is_text = True is_bytes = False if (is_bytes) or not (is_text == True): return _encode_type_num(CBOR_BYTES, len(val)) + val return _encode_type_num(CBOR_TEXT, len(val)) + val def dumps_array(arr, sort_keys=False): head = _encode_type_num(CBOR_ARRAY, len(arr)) parts = [dumps(x, sort_keys=sort_keys) for x in arr] return head + b''.join(parts) if _IS_PY3: def dumps_dict(d, sort_keys=False): head = _encode_type_num(CBOR_MAP, len(d)) parts = [head] if sort_keys: for k in sorted(d.keys()): v = d[k] parts.append(dumps(k, sort_keys=sort_keys)) parts.append(dumps(v, sort_keys=sort_keys)) else: for k,v in d.items(): parts.append(dumps(k, sort_keys=sort_keys)) parts.append(dumps(v, sort_keys=sort_keys)) return b''.join(parts) else: def dumps_dict(d, sort_keys=False): head = _encode_type_num(CBOR_MAP, len(d)) parts = [head] if sort_keys: for k in sorted(d.iterkeys()): v = d[k] parts.append(dumps(k, sort_keys=sort_keys)) parts.append(dumps(v, sort_keys=sort_keys)) else: for k,v in d.items(): parts.append(dumps(k, sort_keys=sort_keys)) parts.append(dumps(v, sort_keys=sort_keys)) return b''.join(parts) def dumps_bool(b): if b: return struct.pack('B', CBOR_TRUE) return struct.pack('B', CBOR_FALSE) def dumps_tag(t, sort_keys=False): return _encode_type_num(CBOR_TAG, t.tag) + dumps(t.value, sort_keys=sort_keys) if _IS_PY3: def _is_stringish(x): return isinstance(x, (str, bytes)) def _is_intish(x): return isinstance(x, int) else: def _is_stringish(x): return isinstance(x, (str, basestring, bytes, unicode)) def _is_intish(x): return isinstance(x, (int, long)) def dumps(ob, sort_keys=False): if ob is None: return struct.pack('B', CBOR_NULL) if isinstance(ob, bool): return dumps_bool(ob) if _is_stringish(ob): return dumps_string(ob) if isinstance(ob, (list, tuple)): return dumps_array(ob, sort_keys=sort_keys) # TODO: accept other enumerables and emit a variable length array if isinstance(ob, dict): return dumps_dict(ob, sort_keys=sort_keys) if isinstance(ob, float): return dumps_float(ob) if _is_intish(ob): return dumps_int(ob) if isinstance(ob, Tag): return dumps_tag(ob, sort_keys=sort_keys) raise Exception("don't know how to cbor serialize object of type %s", type(ob)) # same basic signature as json.dump, but with no options (yet) def dump(obj, fp, sort_keys=False): """ obj: Python object to serialize fp: file-like object capable of .write(bytes) """ # this is kinda lame, but probably not inefficient for non-huge objects # TODO: .write() to fp as we go as each inner object is serialized blob = dumps(obj, sort_keys=sort_keys) fp.write(blob) class Tag(object): def __init__(self, tag=None, value=None): self.tag = tag self.value = value def __repr__(self): return "Tag({0!r}, {1!r})".format(self.tag, self.value) def __eq__(self, other): if not isinstance(other, Tag): return False return (self.tag == other.tag) and (self.value == other.value) def loads(data): """ Parse CBOR bytes and return Python objects. """ if data is None: raise ValueError("got None for buffer to decode in loads") fp = StringIO(data) return _loads(fp)[0] def load(fp): """ Parse and return object from fp, a file-like object supporting .read(n) """ return _loads(fp)[0] _MAX_DEPTH = 100 def _tag_aux(fp, tb): bytes_read = 1 tag = tb & CBOR_TYPE_MASK tag_aux = tb & CBOR_INFO_BITS if tag_aux <= 23: aux = tag_aux elif tag_aux == CBOR_UINT8_FOLLOWS: data = fp.read(1) aux = struct.unpack_from("!B", data, 0)[0] bytes_read += 1 elif tag_aux == CBOR_UINT16_FOLLOWS: data = fp.read(2) aux = struct.unpack_from("!H", data, 0)[0] bytes_read += 2 elif tag_aux == CBOR_UINT32_FOLLOWS: data = fp.read(4) aux = struct.unpack_from("!I", data, 0)[0] bytes_read += 4 elif tag_aux == CBOR_UINT64_FOLLOWS: data = fp.read(8) aux = struct.unpack_from("!Q", data, 0)[0] bytes_read += 8 else: assert tag_aux == CBOR_VAR_FOLLOWS, "bogus tag {0:02x}".format(tb) aux = None return tag, tag_aux, aux, bytes_read def _read_byte(fp): tb = fp.read(1) if len(tb) == 0: # I guess not all file-like objects do this raise EOFError() return ord(tb) def _loads_var_array(fp, limit, depth, returntags, bytes_read): ob = [] tb = _read_byte(fp) while tb != CBOR_BREAK: (subob, sub_len) = _loads_tb(fp, tb, limit, depth, returntags) bytes_read += 1 + sub_len ob.append(subob) tb = _read_byte(fp) return (ob, bytes_read + 1) def _loads_var_map(fp, limit, depth, returntags, bytes_read): ob = {} tb = _read_byte(fp) while tb != CBOR_BREAK: (subk, sub_len) = _loads_tb(fp, tb, limit, depth, returntags) bytes_read += 1 + sub_len (subv, sub_len) = _loads(fp, limit, depth, returntags) bytes_read += sub_len ob[subk] = subv tb = _read_byte(fp) return (ob, bytes_read + 1) if _IS_PY3: def _loads_array(fp, limit, depth, returntags, aux, bytes_read): ob = [] for i in range(aux): subob, subpos = _loads(fp) bytes_read += subpos ob.append(subob) return ob, bytes_read def _loads_map(fp, limit, depth, returntags, aux, bytes_read): ob = {} for i in range(aux): subk, subpos = _loads(fp) bytes_read += subpos subv, subpos = _loads(fp) bytes_read += subpos ob[subk] = subv return ob, bytes_read else: def _loads_array(fp, limit, depth, returntags, aux, bytes_read): ob = [] for i in xrange(aux): subob, subpos = _loads(fp) bytes_read += subpos ob.append(subob) return ob, bytes_read def _loads_map(fp, limit, depth, returntags, aux, bytes_read): ob = {} for i in xrange(aux): subk, subpos = _loads(fp) bytes_read += subpos subv, subpos = _loads(fp) bytes_read += subpos ob[subk] = subv return ob, bytes_read def _loads(fp, limit=None, depth=0, returntags=False): "return (object, bytes read)" if depth > _MAX_DEPTH: raise Exception("hit CBOR loads recursion depth limit") tb = _read_byte(fp) return _loads_tb(fp, tb, limit, depth, returntags) def _loads_tb(fp, tb, limit=None, depth=0, returntags=False): # Some special cases of CBOR_7 best handled by special struct.unpack logic here if tb == CBOR_FLOAT16: data = fp.read(2) hibyte, lowbyte = struct.unpack_from("BB", data, 0) exp = (hibyte >> 2) & 0x1F mant = ((hibyte & 0x03) << 8) | lowbyte if exp == 0: val = mant * (2.0 ** -24) elif exp == 31: if mant == 0: val = float('Inf') else: val = float('NaN') else: val = (mant + 1024.0) * (2 ** (exp - 25)) if hibyte & 0x80: val = -1.0 * val return (val, 3) elif tb == CBOR_FLOAT32: data = fp.read(4) pf = struct.unpack_from("!f", data, 0) return (pf[0], 5) elif tb == CBOR_FLOAT64: data = fp.read(8) pf = struct.unpack_from("!d", data, 0) return (pf[0], 9) tag, tag_aux, aux, bytes_read = _tag_aux(fp, tb) if tag == CBOR_UINT: return (aux, bytes_read) elif tag == CBOR_NEGINT: return (-1 - aux, bytes_read) elif tag == CBOR_BYTES: ob, subpos = loads_bytes(fp, aux) return (ob, bytes_read + subpos) elif tag == CBOR_TEXT: raw, subpos = loads_bytes(fp, aux, btag=CBOR_TEXT) ob = raw.decode('utf8') return (ob, bytes_read + subpos) elif tag == CBOR_ARRAY: if aux is None: return _loads_var_array(fp, limit, depth, returntags, bytes_read) return _loads_array(fp, limit, depth, returntags, aux, bytes_read) elif tag == CBOR_MAP: if aux is None: return _loads_var_map(fp, limit, depth, returntags, bytes_read) return _loads_map(fp, limit, depth, returntags, aux, bytes_read) elif tag == CBOR_TAG: ob, subpos = _loads(fp) bytes_read += subpos if returntags: # Don't interpret the tag, return it and the tagged object. ob = Tag(aux, ob) else: # attempt to interpet the tag and the value into a Python object. ob = tagify(ob, aux) return ob, bytes_read elif tag == CBOR_7: if tb == CBOR_TRUE: return (True, bytes_read) if tb == CBOR_FALSE: return (False, bytes_read) if tb == CBOR_NULL: return (None, bytes_read) if tb == CBOR_UNDEFINED: return (None, bytes_read) raise ValueError("unknown cbor tag 7 byte: {:02x}".format(tb)) def loads_bytes(fp, aux, btag=CBOR_BYTES): # TODO: limit to some maximum number of chunks and some maximum total bytes if aux is not None: # simple case ob = fp.read(aux) return (ob, aux) # read chunks of bytes chunklist = [] total_bytes_read = 0 while True: tb = fp.read(1)[0] if not _IS_PY3: tb = ord(tb) if tb == CBOR_BREAK: total_bytes_read += 1 break tag, tag_aux, aux, bytes_read = _tag_aux(fp, tb) assert tag == btag, 'variable length value contains unexpected component' ob = fp.read(aux) chunklist.append(ob) total_bytes_read += bytes_read + aux return (b''.join(chunklist), total_bytes_read) if _IS_PY3: def _bytes_to_biguint(bs): out = 0 for ch in bs: out = out << 8 out = out | ch return out else: def _bytes_to_biguint(bs): out = 0 for ch in bs: out = out << 8 out = out | ord(ch) return out def tagify(ob, aux): # TODO: make this extensible? # cbor.register_tag_handler(tagnumber, tag_handler) # where tag_handler takes (tagnumber, tagged_object) if aux == CBOR_TAG_DATE_STRING: # TODO: parse RFC3339 date string pass if aux == CBOR_TAG_DATE_ARRAY: return datetime.datetime.utcfromtimestamp(ob) if aux == CBOR_TAG_BIGNUM: return _bytes_to_biguint(ob) if aux == CBOR_TAG_NEGBIGNUM: return -1 - _bytes_to_biguint(ob) if aux == CBOR_TAG_REGEX: # Is this actually a good idea? Should we just return the tag and the raw value to the user somehow? return re.compile(ob) return Tag(aux, ob)