You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
129 lines
4.0 KiB
129 lines
4.0 KiB
from storage.cache_thp import SESSION_ID_LENGTH, TAG_LENGTH
|
|
from trezor import log, protobuf, utils
|
|
from trezor.enums import FailureType, MessageType
|
|
from trezor.messages import Failure
|
|
|
|
from . import ChannelState
|
|
from .checksum import CHECKSUM_LENGTH
|
|
from .thp_session import ThpError
|
|
from .writer import (
|
|
INIT_DATA_OFFSET,
|
|
MAX_PAYLOAD_LEN,
|
|
MESSAGE_TYPE_LENGTH,
|
|
REPORT_LENGTH,
|
|
)
|
|
|
|
|
|
def select_buffer(
|
|
channel_state: int,
|
|
channel_buffer: utils.BufferType,
|
|
packet_payload: utils.BufferType,
|
|
payload_length: int,
|
|
) -> utils.BufferType:
|
|
|
|
if channel_state is ChannelState.ENCRYPTED_TRANSPORT:
|
|
session_id = packet_payload[0]
|
|
if session_id == 0:
|
|
pass
|
|
# TODO use small buffer
|
|
else:
|
|
pass
|
|
# TODO use big buffer but only if the channel owns the buffer lock.
|
|
# Otherwise send BUSY message and return
|
|
else:
|
|
pass
|
|
# TODO use small buffer
|
|
try:
|
|
# TODO for now, we create a new big buffer every time. It should be changed
|
|
buffer: utils.BufferType = _get_buffer_for_message(
|
|
payload_length, channel_buffer
|
|
)
|
|
return buffer
|
|
except Exception as e:
|
|
if __debug__:
|
|
log.exception(__name__, e)
|
|
raise Exception("Failed to create a buffer for channel") # TODO handle better
|
|
|
|
|
|
def encode_into_buffer(
|
|
buffer: memoryview, msg: protobuf.MessageType, session_id: int
|
|
) -> int:
|
|
|
|
# cannot write message without wire type
|
|
assert msg.MESSAGE_WIRE_TYPE is not None
|
|
|
|
msg_size = protobuf.encoded_length(msg)
|
|
payload_size = SESSION_ID_LENGTH + MESSAGE_TYPE_LENGTH + msg_size
|
|
required_min_size = payload_size + CHECKSUM_LENGTH + TAG_LENGTH
|
|
|
|
if required_min_size > len(buffer):
|
|
# message is too big, we need to allocate a new buffer
|
|
buffer = memoryview(bytearray(required_min_size))
|
|
|
|
_encode_session_into_buffer(memoryview(buffer), session_id)
|
|
_encode_message_type_into_buffer(
|
|
memoryview(buffer), msg.MESSAGE_WIRE_TYPE, SESSION_ID_LENGTH
|
|
)
|
|
_encode_message_into_buffer(
|
|
memoryview(buffer), msg, SESSION_ID_LENGTH + MESSAGE_TYPE_LENGTH
|
|
)
|
|
|
|
return payload_size
|
|
|
|
|
|
def encode_error_into_buffer(
|
|
buffer: memoryview, err_code: FailureType, message: str
|
|
) -> int:
|
|
error_message: protobuf.MessageType = Failure(code=err_code, message=message)
|
|
_encode_message_type_into_buffer(buffer, MessageType.Failure)
|
|
_encode_message_into_buffer(buffer, error_message, MESSAGE_TYPE_LENGTH)
|
|
return protobuf.encoded_length(error_message)
|
|
|
|
|
|
def _encode_session_into_buffer(
|
|
buffer: memoryview, session_id: int, buffer_offset: int = 0
|
|
) -> None:
|
|
session_id_bytes = int.to_bytes(session_id, SESSION_ID_LENGTH, "big")
|
|
utils.memcpy(buffer, buffer_offset, session_id_bytes, 0)
|
|
|
|
|
|
def _encode_message_type_into_buffer(
|
|
buffer: memoryview, message_type: int, offset: int = 0
|
|
) -> None:
|
|
msg_type_bytes = int.to_bytes(message_type, MESSAGE_TYPE_LENGTH, "big")
|
|
utils.memcpy(buffer, offset, msg_type_bytes, 0)
|
|
|
|
|
|
def _encode_message_into_buffer(
|
|
buffer: memoryview, message: protobuf.MessageType, buffer_offset: int = 0
|
|
) -> None:
|
|
protobuf.encode(memoryview(buffer[buffer_offset:]), message)
|
|
|
|
|
|
def _get_buffer_for_message(
|
|
payload_length: int, existing_buffer: utils.BufferType, max_length=MAX_PAYLOAD_LEN
|
|
) -> utils.BufferType:
|
|
length = payload_length + INIT_DATA_OFFSET
|
|
if __debug__:
|
|
log.debug(
|
|
__name__,
|
|
"get_buffer_for_message - length: %d, %s %s",
|
|
length,
|
|
"existing buffer type:",
|
|
type(existing_buffer),
|
|
)
|
|
if length > max_length:
|
|
raise ThpError("Message too large")
|
|
|
|
if length > len(existing_buffer):
|
|
# allocate a new buffer to fit the message
|
|
try:
|
|
payload: utils.BufferType = bytearray(length)
|
|
except MemoryError:
|
|
payload = bytearray(REPORT_LENGTH)
|
|
raise ThpError("Message too large")
|
|
return payload
|
|
|
|
# reuse a part of the supplied buffer
|
|
return memoryview(existing_buffer)[:length]
|