2021-02-19 21:13:39 +00:00
|
|
|
import math
|
|
|
|
|
2018-06-14 14:28:50 +00:00
|
|
|
from common import *
|
|
|
|
|
2019-05-22 14:38:42 +00:00
|
|
|
from apps.common.cbor import (
|
2018-06-14 14:28:50 +00:00
|
|
|
IndefiniteLengthArray,
|
2021-06-15 21:11:27 +00:00
|
|
|
OrderedMap,
|
|
|
|
Tagged,
|
2021-06-30 12:13:43 +00:00
|
|
|
create_array_header,
|
|
|
|
create_map_header,
|
2022-06-06 12:52:23 +00:00
|
|
|
create_embedded_cbor_bytes_header,
|
2019-05-22 14:38:42 +00:00
|
|
|
decode,
|
|
|
|
encode,
|
2021-02-19 21:13:39 +00:00
|
|
|
encode_streamed,
|
2022-09-15 10:57:17 +00:00
|
|
|
utils
|
2018-06-14 14:28:50 +00:00
|
|
|
)
|
|
|
|
|
2022-09-15 10:57:17 +00:00
|
|
|
# NOTE: moved into tests not to occupy flash space
|
|
|
|
# in firmware binary, when it is not used in production
|
|
|
|
def encode_chunked(value: "Value", max_chunk_size: int) -> "Iterator[bytes]":
|
|
|
|
"""
|
|
|
|
Returns the encoded value as an iterable of chunks of a given size,
|
|
|
|
removing the need to reserve a continuous chunk of memory for the
|
|
|
|
full serialized representation of the value.
|
|
|
|
"""
|
|
|
|
if max_chunk_size <= 0:
|
|
|
|
raise ValueError
|
|
|
|
|
|
|
|
chunks = encode_streamed(value)
|
|
|
|
|
|
|
|
chunk_buffer = utils.empty_bytearray(max_chunk_size)
|
|
|
|
try:
|
|
|
|
current_chunk_view = utils.BufferReader(next(chunks))
|
|
|
|
while True:
|
|
|
|
num_bytes_to_write = min(
|
|
|
|
current_chunk_view.remaining_count(),
|
|
|
|
max_chunk_size - len(chunk_buffer),
|
|
|
|
)
|
|
|
|
chunk_buffer.extend(current_chunk_view.read(num_bytes_to_write))
|
|
|
|
|
|
|
|
if len(chunk_buffer) >= max_chunk_size:
|
|
|
|
yield chunk_buffer
|
|
|
|
chunk_buffer[:] = bytes()
|
|
|
|
|
|
|
|
if current_chunk_view.remaining_count() == 0:
|
|
|
|
current_chunk_view = utils.BufferReader(next(chunks))
|
|
|
|
except StopIteration:
|
|
|
|
if len(chunk_buffer) > 0:
|
|
|
|
yield chunk_buffer
|
|
|
|
|
|
|
|
|
2021-06-30 12:13:43 +00:00
|
|
|
|
2018-06-14 14:28:50 +00:00
|
|
|
class TestCardanoCbor(unittest.TestCase):
|
2021-06-30 12:13:43 +00:00
|
|
|
def test_create_array_header(self):
|
|
|
|
test_vectors = [
|
|
|
|
(0, '80'),
|
|
|
|
(23, '97'),
|
|
|
|
((2 ** 8) - 1, '98ff'),
|
|
|
|
((2 ** 16) - 1, '99ffff'),
|
|
|
|
((2 ** 32) - 1, '9affffffff'),
|
|
|
|
((2 ** 64) - 1, '9bffffffffffffffff'),
|
|
|
|
]
|
|
|
|
for val, header_hex in test_vectors:
|
|
|
|
header = unhexlify(header_hex)
|
|
|
|
self.assertEqual(create_array_header(val), header)
|
|
|
|
|
|
|
|
with self.assertRaises(NotImplementedError):
|
|
|
|
create_array_header(2 ** 64)
|
|
|
|
|
|
|
|
def test_create_map_header(self):
|
|
|
|
test_vectors = [
|
|
|
|
(0, 'a0'),
|
|
|
|
(23, 'b7'),
|
|
|
|
((2 ** 8) - 1, 'b8ff'),
|
|
|
|
((2 ** 16) - 1, 'b9ffff'),
|
|
|
|
((2 ** 32) - 1, 'baffffffff'),
|
|
|
|
((2 ** 64) - 1, 'bbffffffffffffffff'),
|
|
|
|
]
|
|
|
|
for val, header_hex in test_vectors:
|
|
|
|
header = unhexlify(header_hex)
|
|
|
|
self.assertEqual(create_map_header(val), header)
|
|
|
|
|
|
|
|
with self.assertRaises(NotImplementedError):
|
|
|
|
create_map_header(2 ** 64)
|
|
|
|
|
2022-06-06 12:52:23 +00:00
|
|
|
def test_create_embedded_cbor_bytes_header(self):
|
|
|
|
test_vectors = [
|
|
|
|
(0, 'd81840'),
|
|
|
|
(23, 'd81857'),
|
|
|
|
((2 ** 8) - 1, 'd81858ff'),
|
|
|
|
((2 ** 16) - 1, 'd81859ffff'),
|
|
|
|
((2 ** 32) - 1, 'd8185affffffff'),
|
|
|
|
((2 ** 64) - 1, 'd8185bffffffffffffffff'),
|
|
|
|
]
|
|
|
|
for val, header_hex in test_vectors:
|
|
|
|
header = unhexlify(header_hex)
|
|
|
|
self.assertEqual(create_embedded_cbor_bytes_header(val), header)
|
|
|
|
|
|
|
|
with self.assertRaises(NotImplementedError):
|
|
|
|
create_embedded_cbor_bytes_header(2 ** 64)
|
|
|
|
|
2018-06-14 14:28:50 +00:00
|
|
|
def test_cbor_encoding(self):
|
|
|
|
test_vectors = [
|
2019-05-22 14:38:42 +00:00
|
|
|
# unsigned integers
|
2018-06-14 14:28:50 +00:00
|
|
|
(0, '00'),
|
|
|
|
(1, '01'),
|
|
|
|
(10, '0a'),
|
|
|
|
(23, '17'),
|
|
|
|
(24, '1818'),
|
|
|
|
(25, '1819'),
|
|
|
|
(100, '1864'),
|
|
|
|
(1000, '1903e8'),
|
|
|
|
(1000000, '1a000f4240'),
|
|
|
|
(1000000000000, '1b000000e8d4a51000'),
|
|
|
|
|
2019-05-22 14:38:42 +00:00
|
|
|
# negative integers
|
|
|
|
(-1, '20'),
|
|
|
|
(-10, '29'),
|
|
|
|
(-24, '37'),
|
|
|
|
(-25, '3818'),
|
|
|
|
(-26, '3819'),
|
|
|
|
(-100, '3863'),
|
|
|
|
(-1000, '3903E7'),
|
|
|
|
(-1000000, '3A000F423F'),
|
|
|
|
(-1000000000000, '3B000000E8D4A50FFF'),
|
|
|
|
|
2018-06-14 14:28:50 +00:00
|
|
|
# binary strings
|
|
|
|
(b'', '40'),
|
|
|
|
(unhexlify('01020304'), '4401020304'),
|
|
|
|
|
2019-05-22 14:38:42 +00:00
|
|
|
# text strings
|
|
|
|
('', '60'),
|
|
|
|
('Fun', '6346756e'),
|
|
|
|
(u'P\u0159\xed\u0161ern\u011b \u017elu\u0165ou\u010dk\xfd k\u016f\u0148 \xfap\u011bl \u010f\xe1belsk\xe9 \xf3dy z\xe1ke\u0159n\xfd u\u010de\u0148 b\u011b\u017e\xed pod\xe9l z\xf3ny \xfal\u016f', '786550c599c3adc5a165726ec49b20c5be6c75c5a56f75c48d6bc3bd206bc5afc58820c3ba70c49b6c20c48fc3a162656c736bc3a920c3b36479207ac3a16b65c5996ec3bd2075c48d65c5882062c49bc5bec3ad20706f64c3a96c207ac3b36e7920c3ba6cc5af'),
|
|
|
|
|
2018-06-14 14:28:50 +00:00
|
|
|
# tags
|
|
|
|
(Tagged(1, 1363896240), 'c11a514b67b0'),
|
|
|
|
(Tagged(23, unhexlify('01020304')), 'd74401020304'),
|
|
|
|
|
|
|
|
# arrays
|
|
|
|
([], '80'),
|
|
|
|
([1, 2, 3], '83010203'),
|
|
|
|
([1, [2, 3], [4, 5]], '8301820203820405'),
|
|
|
|
(list(range(1, 26)), '98190102030405060708090a0b0c0d0e0f101112131415161718181819'),
|
|
|
|
|
|
|
|
# maps
|
|
|
|
({}, 'a0'),
|
2019-05-22 14:38:42 +00:00
|
|
|
({1: 2, 3: 4}, 'a201020304'),
|
2021-06-15 21:11:27 +00:00
|
|
|
({3: 4, 1: 2}, 'a201020304'),
|
2018-06-14 14:28:50 +00:00
|
|
|
|
|
|
|
# indefinite
|
|
|
|
(IndefiniteLengthArray([]), '9fff'),
|
|
|
|
(IndefiniteLengthArray([1, [2, 3], [4, 5]]), '9f01820203820405ff'),
|
|
|
|
(IndefiniteLengthArray([1, [2, 3], IndefiniteLengthArray([4, 5])]),
|
|
|
|
'9f018202039f0405ffff'),
|
2019-05-22 14:38:42 +00:00
|
|
|
|
|
|
|
# boolean
|
|
|
|
(True, 'f5'),
|
|
|
|
(False, 'f4'),
|
2020-07-01 13:43:02 +00:00
|
|
|
|
|
|
|
# null
|
|
|
|
(None, 'f6'),
|
2018-06-14 14:28:50 +00:00
|
|
|
]
|
2021-02-05 18:51:01 +00:00
|
|
|
for val, encoded_hex in test_vectors:
|
|
|
|
encoded = unhexlify(encoded_hex)
|
|
|
|
self.assertEqual(encode(val), encoded)
|
|
|
|
self.assertEqual(decode(encoded), val)
|
2018-06-14 14:28:50 +00:00
|
|
|
|
2020-07-01 13:43:02 +00:00
|
|
|
def test_cbor_tuples(self):
|
|
|
|
"""
|
|
|
|
Tuples should be encoded as arrays and decoded back as lists.
|
|
|
|
"""
|
|
|
|
test_vectors = [
|
|
|
|
([], '80'),
|
|
|
|
([1, 2, 3], '83010203'),
|
|
|
|
([1, [2, 3], [4, 5]], '8301820203820405'),
|
|
|
|
(list(range(1, 26)), '98190102030405060708090a0b0c0d0e0f101112131415161718181819'),
|
|
|
|
]
|
2021-02-05 18:51:01 +00:00
|
|
|
for val, encoded_hex in test_vectors:
|
2020-07-01 13:43:02 +00:00
|
|
|
value_tuple = tuple(val)
|
2021-02-05 18:51:01 +00:00
|
|
|
encoded = unhexlify(encoded_hex)
|
|
|
|
self.assertEqual(encode(value_tuple), encoded)
|
|
|
|
self.assertEqual(decode(encoded), val)
|
2020-07-01 13:43:02 +00:00
|
|
|
|
2021-06-15 21:11:27 +00:00
|
|
|
def test_cbor_ordered_map(self):
|
|
|
|
"""
|
|
|
|
OrderedMaps should be encoded as maps without any ordering and decoded back as dicts.
|
|
|
|
"""
|
|
|
|
test_vectors = [
|
|
|
|
({}, 'a0'),
|
|
|
|
([[1, 2], [3, 4]], 'a201020304'),
|
|
|
|
([[3, 4], [1, 2]], 'a203040102'),
|
|
|
|
]
|
|
|
|
|
|
|
|
for val, encoded_hex in test_vectors:
|
|
|
|
ordered_map = OrderedMap()
|
|
|
|
for key, value in val:
|
|
|
|
ordered_map[key] = value
|
|
|
|
|
|
|
|
encoded = unhexlify(encoded_hex)
|
|
|
|
self.assertEqual(encode(ordered_map), encoded)
|
|
|
|
self.assertEqual(decode(encoded), {k: v for k, v in val})
|
|
|
|
|
2021-02-19 21:13:39 +00:00
|
|
|
def test_encode_streamed(self):
|
|
|
|
large_dict = {i: i for i in range(100)}
|
|
|
|
encoded = encode(large_dict)
|
|
|
|
|
|
|
|
encoded_streamed = [
|
|
|
|
bytes(item) for item in encode_streamed(large_dict)
|
|
|
|
]
|
|
|
|
|
|
|
|
self.assertEqual(b''.join(encoded_streamed), encoded)
|
|
|
|
|
|
|
|
def test_encode_chunked(self):
|
|
|
|
large_dict = {i: i for i in range(100)}
|
|
|
|
encoded = encode(large_dict)
|
|
|
|
|
|
|
|
encoded_len = len(encoded)
|
|
|
|
assert encoded_len == 354
|
|
|
|
|
|
|
|
arbitrary_encoded_len_factor = 59
|
|
|
|
arbitrary_power_of_two = 64
|
|
|
|
larger_than_encoded_len = encoded_len + 1
|
|
|
|
|
|
|
|
for max_chunk_size in [
|
|
|
|
1,
|
|
|
|
10,
|
|
|
|
arbitrary_encoded_len_factor,
|
|
|
|
arbitrary_power_of_two,
|
|
|
|
encoded_len,
|
|
|
|
larger_than_encoded_len
|
|
|
|
]:
|
|
|
|
encoded_chunks = [
|
|
|
|
bytes(chunk) for chunk in encode_chunked(large_dict, max_chunk_size)
|
|
|
|
]
|
|
|
|
|
|
|
|
expected_number_of_chunks = math.ceil(len(encoded) / max_chunk_size)
|
|
|
|
self.assertEqual(len(encoded_chunks), expected_number_of_chunks)
|
|
|
|
|
|
|
|
# all chunks except the last should be of chunk_size
|
|
|
|
for i in range(len(encoded_chunks) - 1):
|
|
|
|
self.assertEqual(len(encoded_chunks[i]), max_chunk_size)
|
|
|
|
|
|
|
|
# last chunk should contain the remaining bytes or the whole chunk
|
|
|
|
remaining_bytes = len(encoded) % max_chunk_size
|
|
|
|
expected_last_chunk_size = remaining_bytes if remaining_bytes > 0 else max_chunk_size
|
|
|
|
self.assertEqual(len(encoded_chunks[-1]), expected_last_chunk_size)
|
|
|
|
|
|
|
|
self.assertEqual(b''.join(encoded_chunks), encoded)
|
|
|
|
|
|
|
|
|
2018-06-14 14:28:50 +00:00
|
|
|
if __name__ == '__main__':
|
|
|
|
unittest.main()
|