Fix thp import error, part 1

M1nd3r 1 month ago
parent a889b73d8e
commit d3f7a5b65a

@ -13,9 +13,11 @@ from . import thp_session as THP
from .checksum import CHECKSUM_LENGTH
from .thp_messages import (
ACK_MESSAGE,
CONT_DATA_OFFSET,
CONTINUATION_PACKET,
ENCRYPTED_TRANSPORT,
HANDSHAKE_INIT,
INIT_DATA_OFFSET,
)
from .thp_session import ThpError
@ -23,9 +25,6 @@ if TYPE_CHECKING:
from trezorio import WireInterface # type:ignore
_INIT_DATA_OFFSET = const(5)
_CONT_DATA_OFFSET = const(3)
_WIRE_INTERFACE_USB = b"\x01"
_MOCK_INTERFACE_HID = b"\x00"
@ -108,11 +107,11 @@ class ChannelContext(Context):
async def _handle_cont_packet(self, packet):
if not self.is_cont_packet_expected:
return # Continuation packet is not expected, ignoring
await self._buffer_packet_data(self.buffer, packet, _CONT_DATA_OFFSET)
await self._buffer_packet_data(self.buffer, packet, CONT_DATA_OFFSET)
async def _handle_completed_message(self):
ctrl_byte, _, payload_length = ustruct.unpack(">BHH", self.buffer)
msg_len = payload_length + _INIT_DATA_OFFSET
msg_len = payload_length + INIT_DATA_OFFSET
if not checksum.is_valid(
checksum=self.buffer[msg_len - CHECKSUM_LENGTH : msg_len],
data=self.buffer[: msg_len - CHECKSUM_LENGTH],
@ -137,7 +136,7 @@ class ChannelContext(Context):
"Message received is not a valid handshake init request!"
)
host_ephemeral_key = bytearray(
self.buffer[_INIT_DATA_OFFSET : msg_len - CHECKSUM_LENGTH]
self.buffer[INIT_DATA_OFFSET : msg_len - CHECKSUM_LENGTH]
)
cache_thp.set_channel_host_ephemeral_key(
self.channel_cache, host_ephemeral_key
@ -155,7 +154,7 @@ class ChannelContext(Context):
if state is ChannelState.ENCRYPTED_TRANSPORT:
self._decrypt_buffer()
session_id, message_type = ustruct.unpack(
">BH", self.buffer[_INIT_DATA_OFFSET:]
">BH", self.buffer[INIT_DATA_OFFSET:]
)
if session_id not in self.sessions:
raise Exception("Unalloacted session")
@ -166,15 +165,15 @@ class ChannelContext(Context):
await self.sessions[session_id].receive_message(
message_type,
self.buffer[_INIT_DATA_OFFSET + 3 : msg_len - CHECKSUM_LENGTH],
self.buffer[INIT_DATA_OFFSET + 3 : msg_len - CHECKSUM_LENGTH],
)
if state is ChannelState.TH2:
host_encrypted_static_pubkey = self.buffer[
_INIT_DATA_OFFSET : _INIT_DATA_OFFSET + KEY_LENGTH + TAG_LENGTH
INIT_DATA_OFFSET : INIT_DATA_OFFSET + KEY_LENGTH + TAG_LENGTH
]
handshake_completion_request_noise_payload = self.buffer[
_INIT_DATA_OFFSET + KEY_LENGTH + TAG_LENGTH : msg_len - CHECKSUM_LENGTH
INIT_DATA_OFFSET + KEY_LENGTH + TAG_LENGTH : msg_len - CHECKSUM_LENGTH
]
print(
host_encrypted_static_pubkey,

@ -1,4 +1,5 @@
import ustruct # pyright:ignore[reportMissingModuleSource]
from micropython import const
from storage.cache_thp import BROADCAST_CHANNEL_ID
@ -11,6 +12,8 @@ HANDSHAKE_INIT = 0x00
ACK_MESSAGE = 0x20
_ERROR = 0x41
_CHANNEL_ALLOCATION_RES = 0x40
INIT_DATA_OFFSET = const(5)
CONT_DATA_OFFSET = const(3)
class InitHeader:

@ -9,8 +9,6 @@ from .protocol_common import MessageWithId
from .thp import ChannelState, ack_handler, checksum, thp_messages
from .thp import thp_session as THP
from .thp.channel_context import (
_CONT_DATA_OFFSET,
_INIT_DATA_OFFSET,
_MAX_PAYLOAD_LEN,
_REPORT_LENGTH,
ChannelContext,
@ -21,6 +19,8 @@ from .thp.thp_messages import (
CODEC_V1,
CONTINUATION_PACKET,
ENCRYPTED_TRANSPORT,
CONT_DATA_OFFSET,
INIT_DATA_OFFSET,
InitHeader,
InterruptingInitPacket,
)
@ -213,7 +213,7 @@ async def _buffer_received_data(
payload: utils.BufferType, header: InitHeader, iface, report
) -> None | InterruptingInitPacket:
# buffer the initial data
nread = utils.memcpy(payload, 0, report, _INIT_DATA_OFFSET)
nread = utils.memcpy(payload, 0, report, INIT_DATA_OFFSET)
while nread < header.length:
# wait for continuation report
report = await _get_loop_wait_read(iface)
@ -237,7 +237,7 @@ async def _buffer_received_data(
continue
# buffer the continuation data
nread += utils.memcpy(payload, nread, report, _CONT_DATA_OFFSET)
nread += utils.memcpy(payload, nread, report, CONT_DATA_OFFSET)
async def write_message_with_sync_control(
@ -306,7 +306,7 @@ async def write_to_wire(
header.pack_to_buffer(report)
# write initial report
nwritten = utils.memcpy(report, _INIT_DATA_OFFSET, payload, 0)
nwritten = utils.memcpy(report, INIT_DATA_OFFSET, payload, 0)
await _write_report(loop_write, iface, report)
# if we have more data to write, use continuation reports for it
@ -314,7 +314,7 @@ async def write_to_wire(
header.pack_to_cont_buffer(report)
while nwritten < payload_length:
nwritten += utils.memcpy(report, _CONT_DATA_OFFSET, payload, nwritten)
nwritten += utils.memcpy(report, CONT_DATA_OFFSET, payload, nwritten)
await _write_report(loop_write, iface, report)

Loading…
Cancel
Save