Mercurial > hg
view mercurial/thirdparty/cbor/cbor2/decoder.py @ 50757:19108906abaf stable
extensions: imp module is removed in Python 3.12 - use importlib to load files
imp has been deprecated for a long time, and has finally been removed in Python
3.12 .
imp was only used for loading extensions that has been specified with direct
.py path or path to a package directory. The same use cases can be achieved
quite simple with importlib, , possiby with small changes in corner cases with
undefined behaviour, such as extensions without .py source.
There might also be corner cases and undefined behaviour around use of
sys.modules and reloading.
author | Mads Kiilerich <mads@kiilerich.com> |
---|---|
date | Wed, 28 Jun 2023 14:23:13 +0200 |
parents | 4bd73a955ab0 |
children |
line wrap: on
line source
import re import struct from datetime import datetime, timedelta from io import BytesIO from .compat import timezone, xrange, byte_as_integer, unpack_float16 from .types import CBORTag, undefined, break_marker, CBORSimpleValue timestamp_re = re.compile(r'^(\d{4})-(\d\d)-(\d\d)T(\d\d):(\d\d):(\d\d)' r'(?:\.(\d+))?(?:Z|([+-]\d\d):(\d\d))$') class CBORDecodeError(Exception): """Raised when an error occurs deserializing a CBOR datastream.""" def decode_uint(decoder, subtype, shareable_index=None, allow_indefinite=False): # Major tag 0 if subtype < 24: return subtype elif subtype == 24: return struct.unpack('>B', decoder.read(1))[0] elif subtype == 25: return struct.unpack('>H', decoder.read(2))[0] elif subtype == 26: return struct.unpack('>L', decoder.read(4))[0] elif subtype == 27: return struct.unpack('>Q', decoder.read(8))[0] elif subtype == 31 and allow_indefinite: return None else: raise CBORDecodeError('unknown unsigned integer subtype 0x%x' % subtype) def decode_negint(decoder, subtype, shareable_index=None): # Major tag 1 uint = decode_uint(decoder, subtype) return -uint - 1 def decode_bytestring(decoder, subtype, shareable_index=None): # Major tag 2 length = decode_uint(decoder, subtype, allow_indefinite=True) if length is None: # Indefinite length buf = bytearray() while True: initial_byte = byte_as_integer(decoder.read(1)) if initial_byte == 255: return buf else: length = decode_uint(decoder, initial_byte & 31) value = decoder.read(length) buf.extend(value) else: return decoder.read(length) def decode_string(decoder, subtype, shareable_index=None): # Major tag 3 return decode_bytestring(decoder, subtype).decode('utf-8') def decode_array(decoder, subtype, shareable_index=None): # Major tag 4 items = [] decoder.set_shareable(shareable_index, items) length = decode_uint(decoder, subtype, allow_indefinite=True) if length is None: # Indefinite length while True: value = decoder.decode() if value is break_marker: break else: items.append(value) else: for _ in xrange(length): item = decoder.decode() items.append(item) return items def decode_map(decoder, subtype, shareable_index=None): # Major tag 5 dictionary = {} decoder.set_shareable(shareable_index, dictionary) length = decode_uint(decoder, subtype, allow_indefinite=True) if length is None: # Indefinite length while True: key = decoder.decode() if key is break_marker: break else: value = decoder.decode() dictionary[key] = value else: for _ in xrange(length): key = decoder.decode() value = decoder.decode() dictionary[key] = value if decoder.object_hook: return decoder.object_hook(decoder, dictionary) else: return dictionary def decode_semantic(decoder, subtype, shareable_index=None): # Major tag 6 tagnum = decode_uint(decoder, subtype) # Special handling for the "shareable" tag if tagnum == 28: shareable_index = decoder._allocate_shareable() return decoder.decode(shareable_index) value = decoder.decode() semantic_decoder = semantic_decoders.get(tagnum) if semantic_decoder: return semantic_decoder(decoder, value, shareable_index) tag = CBORTag(tagnum, value) if decoder.tag_hook: return decoder.tag_hook(decoder, tag, shareable_index) else: return tag def decode_special(decoder, subtype, shareable_index=None): # Simple value if subtype < 20: return CBORSimpleValue(subtype) # Major tag 7 return special_decoders[subtype](decoder) # # Semantic decoders (major tag 6) # def decode_datetime_string(decoder, value, shareable_index=None): # Semantic tag 0 match = timestamp_re.match(value) if match: year, month, day, hour, minute, second, micro, offset_h, offset_m = match.groups() if offset_h: tz = timezone(timedelta(hours=int(offset_h), minutes=int(offset_m))) else: tz = timezone.utc return datetime(int(year), int(month), int(day), int(hour), int(minute), int(second), int(micro or 0), tz) else: raise CBORDecodeError('invalid datetime string: {}'.format(value)) def decode_epoch_datetime(decoder, value, shareable_index=None): # Semantic tag 1 return datetime.fromtimestamp(value, timezone.utc) def decode_positive_bignum(decoder, value, shareable_index=None): # Semantic tag 2 from binascii import hexlify return int(hexlify(value), 16) def decode_negative_bignum(decoder, value, shareable_index=None): # Semantic tag 3 return -decode_positive_bignum(decoder, value) - 1 def decode_fraction(decoder, value, shareable_index=None): # Semantic tag 4 from decimal import Decimal exp = Decimal(value[0]) mantissa = Decimal(value[1]) return mantissa * (10 ** exp) def decode_bigfloat(decoder, value, shareable_index=None): # Semantic tag 5 from decimal import Decimal exp = Decimal(value[0]) mantissa = Decimal(value[1]) return mantissa * (2 ** exp) def decode_sharedref(decoder, value, shareable_index=None): # Semantic tag 29 try: shared = decoder._shareables[value] except IndexError: raise CBORDecodeError('shared reference %d not found' % value) if shared is None: raise CBORDecodeError('shared value %d has not been initialized' % value) else: return shared def decode_rational(decoder, value, shareable_index=None): # Semantic tag 30 from fractions import Fraction return Fraction(*value) def decode_regexp(decoder, value, shareable_index=None): # Semantic tag 35 return re.compile(value) def decode_mime(decoder, value, shareable_index=None): # Semantic tag 36 from email.parser import Parser return Parser().parsestr(value) def decode_uuid(decoder, value, shareable_index=None): # Semantic tag 37 from uuid import UUID return UUID(bytes=value) def decode_set(decoder, value, shareable_index=None): # Semantic tag 258 return set(value) # # Special decoders (major tag 7) # def decode_simple_value(decoder, shareable_index=None): return CBORSimpleValue(struct.unpack('>B', decoder.read(1))[0]) def decode_float16(decoder, shareable_index=None): payload = decoder.read(2) return unpack_float16(payload) def decode_float32(decoder, shareable_index=None): return struct.unpack('>f', decoder.read(4))[0] def decode_float64(decoder, shareable_index=None): return struct.unpack('>d', decoder.read(8))[0] major_decoders = { 0: decode_uint, 1: decode_negint, 2: decode_bytestring, 3: decode_string, 4: decode_array, 5: decode_map, 6: decode_semantic, 7: decode_special } special_decoders = { 20: lambda self: False, 21: lambda self: True, 22: lambda self: None, 23: lambda self: undefined, 24: decode_simple_value, 25: decode_float16, 26: decode_float32, 27: decode_float64, 31: lambda self: break_marker } semantic_decoders = { 0: decode_datetime_string, 1: decode_epoch_datetime, 2: decode_positive_bignum, 3: decode_negative_bignum, 4: decode_fraction, 5: decode_bigfloat, 29: decode_sharedref, 30: decode_rational, 35: decode_regexp, 36: decode_mime, 37: decode_uuid, 258: decode_set } class CBORDecoder(object): """ Deserializes a CBOR encoded byte stream. :param tag_hook: Callable that takes 3 arguments: the decoder instance, the :class:`~cbor2.types.CBORTag` and the shareable index for the resulting object, if any. This callback is called for any tags for which there is no built-in decoder. The return value is substituted for the CBORTag object in the deserialized output. :param object_hook: Callable that takes 2 arguments: the decoder instance and the dictionary. This callback is called for each deserialized :class:`dict` object. The return value is substituted for the dict in the deserialized output. """ __slots__ = ('fp', 'tag_hook', 'object_hook', '_shareables') def __init__(self, fp, tag_hook=None, object_hook=None): self.fp = fp self.tag_hook = tag_hook self.object_hook = object_hook self._shareables = [] def _allocate_shareable(self): self._shareables.append(None) return len(self._shareables) - 1 def set_shareable(self, index, value): """ Set the shareable value for the last encountered shared value marker, if any. If the given index is ``None``, nothing is done. :param index: the value of the ``shared_index`` argument to the decoder :param value: the shared value """ if index is not None: self._shareables[index] = value def read(self, amount): """ Read bytes from the data stream. :param int amount: the number of bytes to read """ data = self.fp.read(amount) if len(data) < amount: raise CBORDecodeError('premature end of stream (expected to read {} bytes, got {} ' 'instead)'.format(amount, len(data))) return data def decode(self, shareable_index=None): """ Decode the next value from the stream. :raises CBORDecodeError: if there is any problem decoding the stream """ try: initial_byte = byte_as_integer(self.fp.read(1)) major_type = initial_byte >> 5 subtype = initial_byte & 31 except Exception as e: raise CBORDecodeError('error reading major type at index {}: {}' .format(self.fp.tell(), e)) decoder = major_decoders[major_type] try: return decoder(self, subtype, shareable_index) except CBORDecodeError: raise except Exception as e: raise CBORDecodeError('error decoding value at index {}: {}'.format(self.fp.tell(), e)) def decode_from_bytes(self, buf): """ Wrap the given bytestring as a file and call :meth:`decode` with it as the argument. This method was intended to be used from the ``tag_hook`` hook when an object needs to be decoded separately from the rest but while still taking advantage of the shared value registry. """ old_fp = self.fp self.fp = BytesIO(buf) retval = self.decode() self.fp = old_fp return retval def loads(payload, **kwargs): """ Deserialize an object from a bytestring. :param bytes payload: the bytestring to serialize :param kwargs: keyword arguments passed to :class:`~.CBORDecoder` :return: the deserialized object """ fp = BytesIO(payload) return CBORDecoder(fp, **kwargs).decode() def load(fp, **kwargs): """ Deserialize an object from an open file. :param fp: the input file (any file-like object) :param kwargs: keyword arguments passed to :class:`~.CBORDecoder` :return: the deserialized object """ return CBORDecoder(fp, **kwargs).decode()