mirror of
https://github.com/trezor/trezor-firmware.git
synced 2025-07-25 16:08:32 +00:00
wip trezorlib
This commit is contained in:
parent
d59684988c
commit
8b3bfe648c
@ -291,6 +291,7 @@ def list_devices(no_resolve: bool) -> Optional[Iterable["NewTransport"]]:
|
|||||||
try:
|
try:
|
||||||
client = NewTrezorClient(transport)
|
client = NewTrezorClient(transport)
|
||||||
session = client.get_management_session()
|
session = client.get_management_session()
|
||||||
|
|
||||||
description = format_device_name(session.features)
|
description = format_device_name(session.features)
|
||||||
# client.end_session()
|
# client.end_session()
|
||||||
except DeviceIsBusy:
|
except DeviceIsBusy:
|
||||||
|
102
python/src/trezorlib/transport/new/alternating_bit_protocol.py
Normal file
102
python/src/trezorlib/transport/new/alternating_bit_protocol.py
Normal file
@ -0,0 +1,102 @@
|
|||||||
|
# from storage.cache_thp import ChannelCache
|
||||||
|
# from trezor import log
|
||||||
|
# from trezor.wire.thp import ThpError
|
||||||
|
|
||||||
|
|
||||||
|
# def is_ack_valid(cache: ChannelCache, ack_bit: int) -> bool:
|
||||||
|
# """
|
||||||
|
# Checks if:
|
||||||
|
# - an ACK message is expected
|
||||||
|
# - the received ACK message acknowledges correct sequence number (bit)
|
||||||
|
# """
|
||||||
|
# if not _is_ack_expected(cache):
|
||||||
|
# return False
|
||||||
|
|
||||||
|
# if not _has_ack_correct_sync_bit(cache, ack_bit):
|
||||||
|
# return False
|
||||||
|
|
||||||
|
# return True
|
||||||
|
|
||||||
|
|
||||||
|
# def _is_ack_expected(cache: ChannelCache) -> bool:
|
||||||
|
# is_expected: bool = not is_sending_allowed(cache)
|
||||||
|
# if __debug__ and not is_expected:
|
||||||
|
# log.debug(__name__, "Received unexpected ACK message")
|
||||||
|
# return is_expected
|
||||||
|
|
||||||
|
|
||||||
|
# def _has_ack_correct_sync_bit(cache: ChannelCache, sync_bit: int) -> bool:
|
||||||
|
# is_correct: bool = get_send_seq_bit(cache) == sync_bit
|
||||||
|
# if __debug__ and not is_correct:
|
||||||
|
# log.debug(__name__, "Received ACK message with wrong ack bit")
|
||||||
|
# return is_correct
|
||||||
|
|
||||||
|
|
||||||
|
# def is_sending_allowed(cache: ChannelCache) -> bool:
|
||||||
|
# """
|
||||||
|
# Checks whether sending a message in the provided channel is allowed.
|
||||||
|
|
||||||
|
# Note: Sending a message in a channel before receipt of ACK message for the previously
|
||||||
|
# sent message (in the channel) is prohibited, as it can lead to desynchronization.
|
||||||
|
# """
|
||||||
|
# return bool(cache.sync >> 7)
|
||||||
|
|
||||||
|
|
||||||
|
# def get_send_seq_bit(cache: ChannelCache) -> int:
|
||||||
|
# """
|
||||||
|
# Returns the sequential number (bit) of the next message to be sent
|
||||||
|
# in the provided channel.
|
||||||
|
# """
|
||||||
|
# return (cache.sync & 0x20) >> 5
|
||||||
|
|
||||||
|
|
||||||
|
# def get_expected_receive_seq_bit(cache: ChannelCache) -> int:
|
||||||
|
# """
|
||||||
|
# Returns the (expected) sequential number (bit) of the next message
|
||||||
|
# to be received in the provided channel.
|
||||||
|
# """
|
||||||
|
# return (cache.sync & 0x40) >> 6
|
||||||
|
|
||||||
|
|
||||||
|
# def set_sending_allowed(cache: ChannelCache, sending_allowed: bool) -> None:
|
||||||
|
# """
|
||||||
|
# Set the flag whether sending a message in this channel is allowed or not.
|
||||||
|
# """
|
||||||
|
# cache.sync &= 0x7F
|
||||||
|
# if sending_allowed:
|
||||||
|
# cache.sync |= 0x80
|
||||||
|
|
||||||
|
|
||||||
|
# def set_expected_receive_seq_bit(cache: ChannelCache, seq_bit: int) -> None:
|
||||||
|
# """
|
||||||
|
# Set the expected sequential number (bit) of the next message to be received
|
||||||
|
# in the provided channel
|
||||||
|
# """
|
||||||
|
# if __debug__:
|
||||||
|
# log.debug(__name__, "Set sync receive expected seq bit to %d", seq_bit)
|
||||||
|
# if seq_bit not in (0, 1):
|
||||||
|
# raise ThpError("Unexpected receive sync bit")
|
||||||
|
|
||||||
|
# # set second bit to "seq_bit" value
|
||||||
|
# cache.sync &= 0xBF
|
||||||
|
# if seq_bit:
|
||||||
|
# cache.sync |= 0x40
|
||||||
|
|
||||||
|
|
||||||
|
# def _set_send_seq_bit(cache: ChannelCache, seq_bit: int) -> None:
|
||||||
|
# if seq_bit not in (0, 1):
|
||||||
|
# raise ThpError("Unexpected send seq bit")
|
||||||
|
# if __debug__:
|
||||||
|
# log.debug(__name__, "setting sync send seq bit to %d", seq_bit)
|
||||||
|
# # set third bit to "seq_bit" value
|
||||||
|
# cache.sync &= 0xDF
|
||||||
|
# if seq_bit:
|
||||||
|
# cache.sync |= 0x20
|
||||||
|
|
||||||
|
|
||||||
|
# def set_send_seq_bit_to_opposite(cache: ChannelCache) -> None:
|
||||||
|
# """
|
||||||
|
# Set the sequential bit of the "next message to be send" to the opposite value,
|
||||||
|
# i.e. 1 -> 0 and 0 -> 1
|
||||||
|
# """
|
||||||
|
# _set_send_seq_bit(cache=cache, seq_bit=1 - get_send_seq_bit(cache))
|
59
python/src/trezorlib/transport/new/control_byte.py
Normal file
59
python/src/trezorlib/transport/new/control_byte.py
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
CODEC_V1 = 0x3F
|
||||||
|
CONTINUATION_PACKET = 0x80
|
||||||
|
HANDSHAKE_INIT_REQ = 0x00
|
||||||
|
HANDSHAKE_INIT_RES = 0x01
|
||||||
|
HANDSHAKE_COMP_REQ = 0x02
|
||||||
|
HANDSHAKE_COMP_RES = 0x03
|
||||||
|
ENCRYPTED_TRANSPORT = 0x04
|
||||||
|
|
||||||
|
CONTINUATION_PACKET_MASK = 0x80
|
||||||
|
ACK_MASK = 0xF7
|
||||||
|
DATA_MASK = 0xE7
|
||||||
|
|
||||||
|
ACK_MESSAGE = 0x20
|
||||||
|
_ERROR = 0x42
|
||||||
|
CHANNEL_ALLOCATION_REQ = 0x40
|
||||||
|
_CHANNEL_ALLOCATION_RES = 0x41
|
||||||
|
|
||||||
|
TREZOR_STATE_UNPAIRED = b"\x00"
|
||||||
|
TREZOR_STATE_PAIRED = b"\x01"
|
||||||
|
|
||||||
|
|
||||||
|
def add_seq_bit_to_ctrl_byte(ctrl_byte: int, seq_bit: int) -> int:
|
||||||
|
if seq_bit == 0:
|
||||||
|
return ctrl_byte & 0xEF
|
||||||
|
if seq_bit == 1:
|
||||||
|
return ctrl_byte | 0x10
|
||||||
|
raise Exception("Unexpected sequence bit")
|
||||||
|
|
||||||
|
|
||||||
|
def add_ack_bit_to_ctrl_byte(ctrl_byte: int, ack_bit: int) -> int:
|
||||||
|
if ack_bit == 0:
|
||||||
|
return ctrl_byte & 0xF7
|
||||||
|
if ack_bit == 1:
|
||||||
|
return ctrl_byte | 0x08
|
||||||
|
raise Exception("Unexpected acknowledgement bit")
|
||||||
|
|
||||||
|
|
||||||
|
def get_seq_bit(ctrl_byte: int) -> int:
|
||||||
|
return (ctrl_byte & 0x10) >> 4
|
||||||
|
|
||||||
|
|
||||||
|
def is_ack(ctrl_byte: int) -> bool:
|
||||||
|
return ctrl_byte & ACK_MASK == ACK_MESSAGE
|
||||||
|
|
||||||
|
|
||||||
|
def is_continuation(ctrl_byte: int) -> bool:
|
||||||
|
return ctrl_byte & CONTINUATION_PACKET_MASK == CONTINUATION_PACKET
|
||||||
|
|
||||||
|
|
||||||
|
def is_encrypted_transport(ctrl_byte: int) -> bool:
|
||||||
|
return ctrl_byte & DATA_MASK == ENCRYPTED_TRANSPORT
|
||||||
|
|
||||||
|
|
||||||
|
def is_handshake_init_req(ctrl_byte: int) -> bool:
|
||||||
|
return ctrl_byte & DATA_MASK == HANDSHAKE_INIT_REQ
|
||||||
|
|
||||||
|
|
||||||
|
def is_handshake_comp_req(ctrl_byte: int) -> bool:
|
||||||
|
return ctrl_byte & DATA_MASK == HANDSHAKE_COMP_REQ
|
@ -15,6 +15,7 @@ from ...mapping import ProtobufMapping
|
|||||||
from ..thp import checksum, curve25519, thp_io
|
from ..thp import checksum, curve25519, thp_io
|
||||||
from ..thp.checksum import CHECKSUM_LENGTH
|
from ..thp.checksum import CHECKSUM_LENGTH
|
||||||
from ..thp.packet_header import PacketHeader
|
from ..thp.packet_header import PacketHeader
|
||||||
|
from . import control_byte
|
||||||
from .channel_data import ChannelData
|
from .channel_data import ChannelData
|
||||||
from .protocol_and_channel import Channel, ProtocolAndChannel
|
from .protocol_and_channel import Channel, ProtocolAndChannel
|
||||||
from .transport import NewTransport
|
from .transport import NewTransport
|
||||||
@ -77,25 +78,19 @@ class ProtocolV2(ProtocolAndChannel):
|
|||||||
|
|
||||||
def write(self, session_id: int, msg: t.Any) -> None:
|
def write(self, session_id: int, msg: t.Any) -> None:
|
||||||
msg_type, msg_data = self.mapping.encode(msg)
|
msg_type, msg_data = self.mapping.encode(msg)
|
||||||
self._encrypt_and_write(session_id, msg_type, msg_data, 7) # TODO add ctrl_byte
|
self._encrypt_and_write(session_id, msg_type, msg_data)
|
||||||
|
|
||||||
def update_features(self) -> None:
|
def update_features(self) -> None:
|
||||||
message = messages.GetFeatures()
|
message = messages.GetFeatures()
|
||||||
message_type, message_data = self.mapping.encode(message)
|
message_type, message_data = self.mapping.encode(message)
|
||||||
|
|
||||||
self.session_id: int = 0
|
self.session_id: int = 0
|
||||||
self._encrypt_and_write(
|
self._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
|
||||||
MANAGEMENT_SESSION_ID,
|
|
||||||
message_type,
|
|
||||||
message_data,
|
|
||||||
0x14, # TODO update control byte
|
|
||||||
)
|
|
||||||
_ = self._read_until_valid_crc_check() # TODO check ACK
|
_ = self._read_until_valid_crc_check() # TODO check ACK
|
||||||
session_id, msg_type, msg_data = self.read_and_decrypt()
|
session_id, msg_type, msg_data = self.read_and_decrypt()
|
||||||
features = self.mapping.decode(msg_type, msg_data)
|
features = self.mapping.decode(msg_type, msg_data)
|
||||||
assert isinstance(features, messages.Features)
|
assert isinstance(features, messages.Features)
|
||||||
self.features = features
|
self.features = features
|
||||||
self._send_ack_2()
|
|
||||||
|
|
||||||
def _establish_new_channel(self) -> None:
|
def _establish_new_channel(self) -> None:
|
||||||
self.sync_bit_send = 0
|
self.sync_bit_send = 0
|
||||||
@ -134,7 +129,7 @@ class ProtocolV2(ProtocolAndChannel):
|
|||||||
|
|
||||||
# Read handshake init response
|
# Read handshake init response
|
||||||
header, payload = self._read_until_valid_crc_check()
|
header, payload = self._read_until_valid_crc_check()
|
||||||
self._send_ack_1()
|
self._send_ack_0()
|
||||||
|
|
||||||
if not header.is_handshake_init_response():
|
if not header.is_handshake_init_response():
|
||||||
print("Received message is not a valid handshake init response message")
|
print("Received message is not a valid handshake init response message")
|
||||||
@ -218,7 +213,7 @@ class ProtocolV2(ProtocolAndChannel):
|
|||||||
header, _ = self._read_until_valid_crc_check()
|
header, _ = self._read_until_valid_crc_check()
|
||||||
if not header.is_handshake_comp_response():
|
if not header.is_handshake_comp_response():
|
||||||
print("Received message is not a valid handshake completion response")
|
print("Received message is not a valid handshake completion response")
|
||||||
self._send_ack_2()
|
self._send_ack_1()
|
||||||
|
|
||||||
self.key_request, self.key_response = _hkdf(ck, b"")
|
self.key_request, self.key_response = _hkdf(ck, b"")
|
||||||
self.nonce_request = 0
|
self.nonce_request = 0
|
||||||
@ -238,19 +233,17 @@ class ProtocolV2(ProtocolAndChannel):
|
|||||||
# Read
|
# Read
|
||||||
_, msg_type, msg_data = self.read_and_decrypt()
|
_, msg_type, msg_data = self.read_and_decrypt()
|
||||||
maaa = self.mapping.decode(msg_type, msg_data)
|
maaa = self.mapping.decode(msg_type, msg_data)
|
||||||
self._send_ack_1()
|
|
||||||
|
|
||||||
assert isinstance(maaa, messages.ThpEndResponse)
|
assert isinstance(maaa, messages.ThpEndResponse)
|
||||||
self.has_valid_channel = True
|
self.has_valid_channel = True
|
||||||
|
|
||||||
def _get_control_byte(self) -> bytes:
|
def _send_ack_0(self):
|
||||||
return b"\x42"
|
LOG.debug("sending ack 0")
|
||||||
|
|
||||||
def _send_ack_1(self):
|
|
||||||
header = PacketHeader(0x20, self.channel_id, 4)
|
header = PacketHeader(0x20, self.channel_id, 4)
|
||||||
thp_io.write_payload_to_wire_and_add_checksum(self.transport, header, b"")
|
thp_io.write_payload_to_wire_and_add_checksum(self.transport, header, b"")
|
||||||
|
|
||||||
def _send_ack_2(self):
|
def _send_ack_1(self):
|
||||||
|
LOG.debug("sending ack 1")
|
||||||
header = PacketHeader(0x28, self.channel_id, 4)
|
header = PacketHeader(0x28, self.channel_id, 4)
|
||||||
thp_io.write_payload_to_wire_and_add_checksum(self.transport, header, b"")
|
thp_io.write_payload_to_wire_and_add_checksum(self.transport, header, b"")
|
||||||
|
|
||||||
@ -259,11 +252,15 @@ class ProtocolV2(ProtocolAndChannel):
|
|||||||
session_id: int,
|
session_id: int,
|
||||||
message_type: int,
|
message_type: int,
|
||||||
message_data: bytes,
|
message_data: bytes,
|
||||||
ctrl_byte: int = 0x04,
|
ctrl_byte: int | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
assert self.key_request is not None
|
assert self.key_request is not None
|
||||||
aes_ctx = AESGCM(self.key_request)
|
aes_ctx = AESGCM(self.key_request)
|
||||||
|
|
||||||
|
if ctrl_byte is None:
|
||||||
|
ctrl_byte = control_byte.add_seq_bit_to_ctrl_byte(0x04, self.sync_bit_send)
|
||||||
|
self.sync_bit_send = 1 - self.sync_bit_send
|
||||||
|
|
||||||
sid = session_id.to_bytes(1, "big")
|
sid = session_id.to_bytes(1, "big")
|
||||||
msg_type = message_type.to_bytes(2, "big")
|
msg_type = message_type.to_bytes(2, "big")
|
||||||
data = sid + msg_type + message_data
|
data = sid + msg_type + message_data
|
||||||
@ -282,6 +279,18 @@ class ProtocolV2(ProtocolAndChannel):
|
|||||||
header, raw_payload = self._read_until_valid_crc_check()
|
header, raw_payload = self._read_until_valid_crc_check()
|
||||||
if not header.is_encrypted_transport():
|
if not header.is_encrypted_transport():
|
||||||
print("Trying to decrypt not encrypted message!")
|
print("Trying to decrypt not encrypted message!")
|
||||||
|
|
||||||
|
if not control_byte.is_ack(header.ctrl_byte):
|
||||||
|
LOG.debug(
|
||||||
|
"--> Get sequence bit %d %s %s",
|
||||||
|
control_byte.get_seq_bit(header.ctrl_byte),
|
||||||
|
"from control byte",
|
||||||
|
hexlify(header.ctrl_byte.to_bytes(1, "big")).decode(),
|
||||||
|
)
|
||||||
|
if control_byte.get_seq_bit(header.ctrl_byte):
|
||||||
|
self._send_ack_1()
|
||||||
|
else:
|
||||||
|
self._send_ack_0()
|
||||||
aes_ctx = AESGCM(self.key_response)
|
aes_ctx = AESGCM(self.key_response)
|
||||||
nonce = _get_iv_from_nonce(self.nonce_response)
|
nonce = _get_iv_from_nonce(self.nonce_response)
|
||||||
self.nonce_response += 1
|
self.nonce_response += 1
|
||||||
|
@ -56,7 +56,8 @@ class SessionV2(Session):
|
|||||||
def new(
|
def new(
|
||||||
cls, client: NewTrezorClient, passphrase: str, derive_cardano: bool
|
cls, client: NewTrezorClient, passphrase: str, derive_cardano: bool
|
||||||
) -> SessionV2:
|
) -> SessionV2:
|
||||||
assert isinstance(client.protocol, ProtocolV1)
|
|
||||||
|
assert isinstance(client.protocol, ProtocolV2)
|
||||||
session = SessionV2(client, b"\x00")
|
session = SessionV2(client, b"\x00")
|
||||||
new_session: ThpNewSession = session.call(
|
new_session: ThpNewSession = session.call(
|
||||||
ThpCreateNewSession(passphrase=passphrase, derive_cardano=derive_cardano)
|
ThpCreateNewSession(passphrase=passphrase, derive_cardano=derive_cardano)
|
||||||
|
Loading…
Reference in New Issue
Block a user