view mercurial/thirdparty/cbor/cbor2/ @ 37126:4bd73a955ab0

thirdparty: vendor cbor2 python library CBOR stands for Concise Binary Object Representation, which is a data format which is very compact and extensible. This patch moves the python library which can serilaize and deserialize python objects to/from cbor formats. The library is taken from from commit 84181540f6eb650437e3f73cd104a65661fe8e67. Unrequired files from the cbor library - docs/, tests/,, setup.cfg, and tox.ini - have not been vendored. There is another python library for cbor at which is used in evolve extension and was imported in initial version of this series. That library though contains C code and is bit faster, but has known bugs around serializing nested structures, is unmaintained, raises an Exception object instead of a more dedicated Error type. So, it's better to use a bug free and actively maintained library. This library is not yet used and will be used in later commits. # no-check-commit because we are importing a third library module Differential Revision:
author Pulkit Goyal <>
date Mon, 26 Mar 2018 08:33:57 -0700
line wrap: on
line source

import re
import struct
from datetime import datetime, timedelta
from io import BytesIO

from .compat import timezone, xrange, byte_as_integer, unpack_float16
from .types import CBORTag, undefined, break_marker, CBORSimpleValue

timestamp_re = re.compile(r'^(\d{4})-(\d\d)-(\d\d)T(\d\d):(\d\d):(\d\d)'

class CBORDecodeError(Exception):
    """Raised when an error occurs deserializing a CBOR datastream."""

def decode_uint(decoder, subtype, shareable_index=None, allow_indefinite=False):
    # Major tag 0
    if subtype < 24:
        return subtype
    elif subtype == 24:
        return struct.unpack('>B',[0]
    elif subtype == 25:
        return struct.unpack('>H',[0]
    elif subtype == 26:
        return struct.unpack('>L',[0]
    elif subtype == 27:
        return struct.unpack('>Q',[0]
    elif subtype == 31 and allow_indefinite:
        return None
        raise CBORDecodeError('unknown unsigned integer subtype 0x%x' % subtype)

def decode_negint(decoder, subtype, shareable_index=None):
    # Major tag 1
    uint = decode_uint(decoder, subtype)
    return -uint - 1

def decode_bytestring(decoder, subtype, shareable_index=None):
    # Major tag 2
    length = decode_uint(decoder, subtype, allow_indefinite=True)
    if length is None:
        # Indefinite length
        buf = bytearray()
        while True:
            initial_byte = byte_as_integer(
            if initial_byte == 255:
                return buf
                length = decode_uint(decoder, initial_byte & 31)
                value =

def decode_string(decoder, subtype, shareable_index=None):
    # Major tag 3
    return decode_bytestring(decoder, subtype).decode('utf-8')

def decode_array(decoder, subtype, shareable_index=None):
    # Major tag 4
    items = []
    decoder.set_shareable(shareable_index, items)
    length = decode_uint(decoder, subtype, allow_indefinite=True)
    if length is None:
        # Indefinite length
        while True:
            value = decoder.decode()
            if value is break_marker:
        for _ in xrange(length):
            item = decoder.decode()

    return items

def decode_map(decoder, subtype, shareable_index=None):
    # Major tag 5
    dictionary = {}
    decoder.set_shareable(shareable_index, dictionary)
    length = decode_uint(decoder, subtype, allow_indefinite=True)
    if length is None:
        # Indefinite length
        while True:
            key = decoder.decode()
            if key is break_marker:
                value = decoder.decode()
                dictionary[key] = value
        for _ in xrange(length):
            key = decoder.decode()
            value = decoder.decode()
            dictionary[key] = value

    if decoder.object_hook:
        return decoder.object_hook(decoder, dictionary)
        return dictionary

def decode_semantic(decoder, subtype, shareable_index=None):
    # Major tag 6
    tagnum = decode_uint(decoder, subtype)

    # Special handling for the "shareable" tag
    if tagnum == 28:
        shareable_index = decoder._allocate_shareable()
        return decoder.decode(shareable_index)

    value = decoder.decode()
    semantic_decoder = semantic_decoders.get(tagnum)
    if semantic_decoder:
        return semantic_decoder(decoder, value, shareable_index)

    tag = CBORTag(tagnum, value)
    if decoder.tag_hook:
        return decoder.tag_hook(decoder, tag, shareable_index)
        return tag

def decode_special(decoder, subtype, shareable_index=None):
    # Simple value
    if subtype < 20:
        return CBORSimpleValue(subtype)

    # Major tag 7
    return special_decoders[subtype](decoder)

# Semantic decoders (major tag 6)

def decode_datetime_string(decoder, value, shareable_index=None):
    # Semantic tag 0
    match = timestamp_re.match(value)
    if match:
        year, month, day, hour, minute, second, micro, offset_h, offset_m = match.groups()
        if offset_h:
            tz = timezone(timedelta(hours=int(offset_h), minutes=int(offset_m)))
            tz = timezone.utc

        return datetime(int(year), int(month), int(day), int(hour), int(minute), int(second),
                        int(micro or 0), tz)
        raise CBORDecodeError('invalid datetime string: {}'.format(value))

def decode_epoch_datetime(decoder, value, shareable_index=None):
    # Semantic tag 1
    return datetime.fromtimestamp(value, timezone.utc)

def decode_positive_bignum(decoder, value, shareable_index=None):
    # Semantic tag 2
    from binascii import hexlify
    return int(hexlify(value), 16)

def decode_negative_bignum(decoder, value, shareable_index=None):
    # Semantic tag 3
    return -decode_positive_bignum(decoder, value) - 1

def decode_fraction(decoder, value, shareable_index=None):
    # Semantic tag 4
    from decimal import Decimal
    exp = Decimal(value[0])
    mantissa = Decimal(value[1])
    return mantissa * (10 ** exp)

def decode_bigfloat(decoder, value, shareable_index=None):
    # Semantic tag 5
    from decimal import Decimal
    exp = Decimal(value[0])
    mantissa = Decimal(value[1])
    return mantissa * (2 ** exp)

def decode_sharedref(decoder, value, shareable_index=None):
    # Semantic tag 29
        shared = decoder._shareables[value]
    except IndexError:
        raise CBORDecodeError('shared reference %d not found' % value)

    if shared is None:
        raise CBORDecodeError('shared value %d has not been initialized' % value)
        return shared

def decode_rational(decoder, value, shareable_index=None):
    # Semantic tag 30
    from fractions import Fraction
    return Fraction(*value)

def decode_regexp(decoder, value, shareable_index=None):
    # Semantic tag 35
    return re.compile(value)

def decode_mime(decoder, value, shareable_index=None):
    # Semantic tag 36
    from email.parser import Parser
    return Parser().parsestr(value)

def decode_uuid(decoder, value, shareable_index=None):
    # Semantic tag 37
    from uuid import UUID
    return UUID(bytes=value)

def decode_set(decoder, value, shareable_index=None):
    # Semantic tag 258
    return set(value)

# Special decoders (major tag 7)

def decode_simple_value(decoder, shareable_index=None):
    return CBORSimpleValue(struct.unpack('>B',[0])

def decode_float16(decoder, shareable_index=None):
    payload =
    return unpack_float16(payload)

def decode_float32(decoder, shareable_index=None):
    return struct.unpack('>f',[0]

def decode_float64(decoder, shareable_index=None):
    return struct.unpack('>d',[0]

major_decoders = {
    0: decode_uint,
    1: decode_negint,
    2: decode_bytestring,
    3: decode_string,
    4: decode_array,
    5: decode_map,
    6: decode_semantic,
    7: decode_special

special_decoders = {
    20: lambda self: False,
    21: lambda self: True,
    22: lambda self: None,
    23: lambda self: undefined,
    24: decode_simple_value,
    25: decode_float16,
    26: decode_float32,
    27: decode_float64,
    31: lambda self: break_marker

semantic_decoders = {
    0: decode_datetime_string,
    1: decode_epoch_datetime,
    2: decode_positive_bignum,
    3: decode_negative_bignum,
    4: decode_fraction,
    5: decode_bigfloat,
    29: decode_sharedref,
    30: decode_rational,
    35: decode_regexp,
    36: decode_mime,
    37: decode_uuid,
    258: decode_set

class CBORDecoder(object):
    Deserializes a CBOR encoded byte stream.

    :param tag_hook: Callable that takes 3 arguments: the decoder instance, the
        :class:`~cbor2.types.CBORTag` and the shareable index for the resulting object, if any.
        This callback is called for any tags for which there is no built-in decoder.
        The return value is substituted for the CBORTag object in the deserialized output.
    :param object_hook: Callable that takes 2 arguments: the decoder instance and the dictionary.
        This callback is called for each deserialized :class:`dict` object.
        The return value is substituted for the dict in the deserialized output.

    __slots__ = ('fp', 'tag_hook', 'object_hook', '_shareables')

    def __init__(self, fp, tag_hook=None, object_hook=None):
        self.fp = fp
        self.tag_hook = tag_hook
        self.object_hook = object_hook
        self._shareables = []

    def _allocate_shareable(self):
        return len(self._shareables) - 1

    def set_shareable(self, index, value):
        Set the shareable value for the last encountered shared value marker, if any.

        If the given index is ``None``, nothing is done.

        :param index: the value of the ``shared_index`` argument to the decoder
        :param value: the shared value

        if index is not None:
            self._shareables[index] = value

    def read(self, amount):
        Read bytes from the data stream.

        :param int amount: the number of bytes to read

        data =
        if len(data) < amount:
            raise CBORDecodeError('premature end of stream (expected to read {} bytes, got {} '
                                  'instead)'.format(amount, len(data)))

        return data

    def decode(self, shareable_index=None):
        Decode the next value from the stream.

        :raises CBORDecodeError: if there is any problem decoding the stream

            initial_byte = byte_as_integer(
            major_type = initial_byte >> 5
            subtype = initial_byte & 31
        except Exception as e:
            raise CBORDecodeError('error reading major type at index {}: {}'
                                  .format(self.fp.tell(), e))

        decoder = major_decoders[major_type]
            return decoder(self, subtype, shareable_index)
        except CBORDecodeError:
        except Exception as e:
            raise CBORDecodeError('error decoding value at index {}: {}'.format(self.fp.tell(), e))

    def decode_from_bytes(self, buf):
        Wrap the given bytestring as a file and call :meth:`decode` with it as the argument.

        This method was intended to be used from the ``tag_hook`` hook when an object needs to be
        decoded separately from the rest but while still taking advantage of the shared value

        old_fp = self.fp
        self.fp = BytesIO(buf)
        retval = self.decode()
        self.fp = old_fp
        return retval

def loads(payload, **kwargs):
    Deserialize an object from a bytestring.

    :param bytes payload: the bytestring to serialize
    :param kwargs: keyword arguments passed to :class:`~.CBORDecoder`
    :return: the deserialized object

    fp = BytesIO(payload)
    return CBORDecoder(fp, **kwargs).decode()

def load(fp, **kwargs):
    Deserialize an object from an open file.

    :param fp: the input file (any file-like object)
    :param kwargs: keyword arguments passed to :class:`~.CBORDecoder`
    :return: the deserialized object

    return CBORDecoder(fp, **kwargs).decode()