parent
9a9b0eb052
commit
cf15dce326
@ -0,0 +1 @@
|
|||||||
|
Refactor RLP codec for better clarity and some small memory savings.
|
@ -1,53 +1,97 @@
|
|||||||
def int_to_bytes(x: int) -> bytes:
|
from micropython import const
|
||||||
if x == 0:
|
|
||||||
return b""
|
if False:
|
||||||
r = bytearray()
|
from typing import Union
|
||||||
while x:
|
from trezor.utils import Writer
|
||||||
r.append(x % 256)
|
|
||||||
x //= 256
|
# what we want:
|
||||||
return bytes(reversed(r))
|
# RLPItem = Union[list["RLPItem"], bytes, int]
|
||||||
|
# what mypy can process:
|
||||||
|
RLPItem = Union[list, bytes, int]
|
||||||
def encode_length(l: int, is_list: bool) -> bytes:
|
|
||||||
offset = 0xC0 if is_list else 0x80
|
|
||||||
if l < 56:
|
STRING_HEADER_BYTE = const(0x80)
|
||||||
return bytes([l + offset])
|
LIST_HEADER_BYTE = const(0xC0)
|
||||||
elif l < 256 ** 8:
|
|
||||||
bl = int_to_bytes(l)
|
|
||||||
return bytes([len(bl) + offset + 55]) + bl
|
def _byte_size(x: int) -> int:
|
||||||
|
if x < 0:
|
||||||
|
raise ValueError # only unsigned ints are supported
|
||||||
|
for exp in range(64):
|
||||||
|
if x < 0x100 ** exp:
|
||||||
|
return exp
|
||||||
else:
|
else:
|
||||||
raise ValueError("Input too long")
|
raise ValueError # int is too large
|
||||||
|
|
||||||
|
|
||||||
def encode(data, include_length=True) -> bytes:
|
def int_to_bytes(x: int) -> bytes:
|
||||||
if isinstance(data, int):
|
return x.to_bytes(_byte_size(x), "big")
|
||||||
data = int_to_bytes(data)
|
|
||||||
if isinstance(data, bytearray):
|
|
||||||
data = bytes(data)
|
def write_header(
|
||||||
if isinstance(data, bytes):
|
w: Writer,
|
||||||
if (len(data) == 1 and ord(data) < 128) or not include_length:
|
length: int,
|
||||||
return data
|
header_byte: int,
|
||||||
else:
|
data_start: bytes | None = None,
|
||||||
return encode_length(len(data), is_list=False) + data
|
) -> None:
|
||||||
elif isinstance(data, list):
|
if length == 1 and data_start is not None and data_start[0] <= 0x7F:
|
||||||
output = b""
|
# no header when encoding one byte below 0x80
|
||||||
for item in data:
|
pass
|
||||||
output += encode(item)
|
|
||||||
if include_length:
|
elif length <= 55:
|
||||||
return encode_length(len(output), is_list=True) + output
|
w.append(header_byte + length)
|
||||||
else:
|
|
||||||
return output
|
|
||||||
else:
|
else:
|
||||||
raise TypeError("Invalid input of type " + str(type(data)))
|
encoded_length = int_to_bytes(length)
|
||||||
|
w.append(header_byte + 55 + len(encoded_length))
|
||||||
|
w.extend(encoded_length)
|
||||||
|
|
||||||
|
|
||||||
def field_length(length: int, first_byte: bytearray) -> int:
|
def header_length(length: int, data_start: bytes | None = None) -> int:
|
||||||
if length == 1 and first_byte[0] <= 0x7F:
|
if length == 1 and data_start is not None and data_start[0] <= 0x7F:
|
||||||
|
# no header when encoding one byte below 0x80
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if length <= 55:
|
||||||
return 1
|
return 1
|
||||||
elif length <= 55:
|
|
||||||
return 1 + length
|
return 1 + _byte_size(length)
|
||||||
elif length <= 0xFF:
|
|
||||||
return 2 + length
|
|
||||||
elif length <= 0xFFFF:
|
def length(item: RLPItem) -> int:
|
||||||
return 3 + length
|
data: bytes | None = None
|
||||||
return 4 + length
|
if isinstance(item, int):
|
||||||
|
data = int_to_bytes(item)
|
||||||
|
item_length = len(data)
|
||||||
|
elif isinstance(item, (bytes, bytearray)):
|
||||||
|
data = item
|
||||||
|
item_length = len(item)
|
||||||
|
elif isinstance(item, list):
|
||||||
|
item_length = sum(length(i) for i in item)
|
||||||
|
else:
|
||||||
|
raise TypeError
|
||||||
|
|
||||||
|
return header_length(item_length, data) + item_length
|
||||||
|
|
||||||
|
|
||||||
|
def write_string(w: Writer, string: bytes) -> None:
|
||||||
|
write_header(w, len(string), STRING_HEADER_BYTE, string)
|
||||||
|
w.extend(string)
|
||||||
|
|
||||||
|
|
||||||
|
def write_list(w: Writer, lst: list[RLPItem]) -> None:
|
||||||
|
payload_length = sum(length(item) for item in lst)
|
||||||
|
write_header(w, payload_length, LIST_HEADER_BYTE)
|
||||||
|
for item in lst:
|
||||||
|
write(w, item)
|
||||||
|
|
||||||
|
|
||||||
|
def write(w: Writer, item: RLPItem) -> None:
|
||||||
|
if isinstance(item, int):
|
||||||
|
write_string(w, int_to_bytes(item))
|
||||||
|
elif isinstance(item, (bytes, bytearray)):
|
||||||
|
write_string(w, item)
|
||||||
|
elif isinstance(item, list):
|
||||||
|
write_list(w, item)
|
||||||
|
else:
|
||||||
|
raise TypeError
|
||||||
|
Loading…
Reference in new issue