|
|
|
@ -1,11 +1,6 @@
|
|
|
|
|
import ustruct # pyright: ignore[reportMissingModuleSource]
|
|
|
|
|
from micropython import const # pyright: ignore[reportMissingModuleSource]
|
|
|
|
|
from typing import ( # pyright:ignore[reportShadowedImports]
|
|
|
|
|
TYPE_CHECKING,
|
|
|
|
|
Any,
|
|
|
|
|
Callable,
|
|
|
|
|
Coroutine,
|
|
|
|
|
)
|
|
|
|
|
from typing import TYPE_CHECKING # pyright:ignore[reportShadowedImports]
|
|
|
|
|
|
|
|
|
|
import usb
|
|
|
|
|
from storage import cache_thp
|
|
|
|
@ -16,8 +11,6 @@ from ..protocol_common import Context
|
|
|
|
|
from . import ChannelState, SessionState, checksum
|
|
|
|
|
from . import thp_session as THP
|
|
|
|
|
from .checksum import CHECKSUM_LENGTH
|
|
|
|
|
|
|
|
|
|
# from . import thp_session
|
|
|
|
|
from .thp_messages import (
|
|
|
|
|
ACK_MESSAGE,
|
|
|
|
|
CONTINUATION_PACKET,
|
|
|
|
@ -26,20 +19,12 @@ from .thp_messages import (
|
|
|
|
|
)
|
|
|
|
|
from .thp_session import ThpError
|
|
|
|
|
|
|
|
|
|
# from .thp_session import SessionState, ThpError
|
|
|
|
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
|
|
|
from trezorio import WireInterface # type:ignore
|
|
|
|
|
|
|
|
|
|
Handler = Callable[
|
|
|
|
|
[bytes, Any, Any, Any], Coroutine
|
|
|
|
|
] # TODO Adjust parameters to be more restrictive
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_INIT_DATA_OFFSET = const(5)
|
|
|
|
|
_CONT_DATA_OFFSET = const(3)
|
|
|
|
|
_INIT_DATA_OFFSET = const(5)
|
|
|
|
|
_REPORT_CONT_DATA_OFFSET = const(3)
|
|
|
|
|
|
|
|
|
|
_WIRE_INTERFACE_USB = b"\x01"
|
|
|
|
|
_MOCK_INTERFACE_HID = b"\x00"
|
|
|
|
@ -216,45 +201,6 @@ class ChannelContext(Context):
|
|
|
|
|
self.expected_payload_length = 0
|
|
|
|
|
self.is_cont_packet_expected = False
|
|
|
|
|
|
|
|
|
|
def _get_handler(self) -> Handler:
|
|
|
|
|
state = self.get_channel_state()
|
|
|
|
|
if state is ChannelState.UNAUTHENTICATED:
|
|
|
|
|
return self._handler_unauthenticated
|
|
|
|
|
if state is ChannelState.ENCRYPTED_TRANSPORT:
|
|
|
|
|
return self._handler_encrypted_transport
|
|
|
|
|
raise Exception("Unimplemented situation")
|
|
|
|
|
|
|
|
|
|
# Handlers for init packets
|
|
|
|
|
# TODO adjust
|
|
|
|
|
async def _handler_encrypted_transport(
|
|
|
|
|
self, ctrl_byte: bytes, payload_length: int, packet_payload: bytes, packet
|
|
|
|
|
) -> None:
|
|
|
|
|
self.expected_payload_length = payload_length
|
|
|
|
|
self.bytes_read = 0
|
|
|
|
|
|
|
|
|
|
await self._buffer_packet_data(self.buffer, packet, _INIT_DATA_OFFSET)
|
|
|
|
|
# TODO Set/Provide different buffer for management session
|
|
|
|
|
|
|
|
|
|
if self.expected_payload_length == self.bytes_read:
|
|
|
|
|
self._finish_message()
|
|
|
|
|
else:
|
|
|
|
|
self.is_cont_packet_expected = True
|
|
|
|
|
|
|
|
|
|
# TODO adjust
|
|
|
|
|
async def _handler_unauthenticated(
|
|
|
|
|
self, ctrl_byte: bytes, payload_length: int, packet_payload: bytes, packet
|
|
|
|
|
) -> None:
|
|
|
|
|
self.expected_payload_length = payload_length
|
|
|
|
|
self.bytes_read = 0
|
|
|
|
|
|
|
|
|
|
await self._buffer_packet_data(self.buffer, packet, _INIT_DATA_OFFSET)
|
|
|
|
|
# TODO Set/Provide different buffer for management session
|
|
|
|
|
|
|
|
|
|
if self.expected_payload_length == self.bytes_read:
|
|
|
|
|
self._finish_message()
|
|
|
|
|
else:
|
|
|
|
|
self.is_cont_packet_expected = True
|
|
|
|
|
|
|
|
|
|
# CALLED BY WORKFLOW / SESSION CONTEXT
|
|
|
|
|
|
|
|
|
|
async def write(self, msg: protobuf.MessageType, session_id: int = 0) -> None:
|
|
|
|
|