From 88e2e0c7064d70e086fe87cd4d1730e09b92b2c9 Mon Sep 17 00:00:00 2001 From: M1nd3r Date: Thu, 20 Mar 2025 15:36:42 +0100 Subject: [PATCH] chore(python): test fallback in handshake --- .../trezorlib/transport/thp/protocol_v2.py | 11 +++++++- tests/device_tests/thp/test_multiple_apps.py | 25 ++++++++++++++++++- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/python/src/trezorlib/transport/thp/protocol_v2.py b/python/src/trezorlib/transport/thp/protocol_v2.py index df4a77ced3..7bb789b0ce 100644 --- a/python/src/trezorlib/transport/thp/protocol_v2.py +++ b/python/src/trezorlib/transport/thp/protocol_v2.py @@ -171,9 +171,12 @@ class ProtocolV2Channel(Channel): header, payload = self._read_until_valid_crc_check() self._send_ack_0() - if header.ctrl_byte == 0x42: + if control_byte.is_error(header.ctrl_byte): if payload == b"\x05": raise exceptions.DeviceLockedException() + else: + err = _get_error_from_int(payload[0]) + raise Exception("Received ThpError: " + err) if not header.is_handshake_init_response(): LOG.error("Received message is not a valid handshake init response message") @@ -214,6 +217,9 @@ class ProtocolV2Channel(Channel): header, data = self._read_until_valid_crc_check() if not header.is_handshake_comp_response(): LOG.error("Received message is not a valid handshake completion response") + if control_byte.is_error(header.ctrl_byte): + err = _get_error_from_int(data[0]) + raise Exception("Received ThpError: " + err) trezor_state = self._noise.decrypt(bytes(data)) # TODO handle trezor_state print("trezor state:", trezor_state) @@ -243,6 +249,9 @@ class ProtocolV2Channel(Channel): header, payload = self._read_until_valid_crc_check() if not header.is_ack() or len(payload) > 0: LOG.error("Received message is not a valid ACK") + if control_byte.is_error(header.ctrl_byte): + err = _get_error_from_int(payload[0]) + raise Exception("Received ThpError: " + err) def _send_ack_0(self): LOG.debug("sending ack 0") diff --git a/tests/device_tests/thp/test_multiple_apps.py b/tests/device_tests/thp/test_multiple_apps.py index adea18a2b9..7ac1c029b8 100644 --- a/tests/device_tests/thp/test_multiple_apps.py +++ b/tests/device_tests/thp/test_multiple_apps.py @@ -12,6 +12,7 @@ def test_multiple_hosts(client: Client) -> None: assert isinstance(client.protocol, ProtocolV2Channel) protocol_1 = client.protocol protocol_2 = ProtocolV2Channel(protocol_1.transport, protocol_1.mapping) + protocol_2._reset_sync_bits() nonce_1 = os.urandom(8) nonce_2 = os.urandom(8) @@ -33,7 +34,29 @@ def test_multiple_hosts(client: Client) -> None: protocol_1._read_ack() protocol_1._read_handshake_init_response() + protocol_2._send_handshake_init_request() + + with pytest.raises(Exception) as e: + protocol_2._read_ack() + assert e.value.args[0] == "Received ThpError: TRANSPORT BUSY" + time.sleep(0.2) # To pass LOCK_TIME + protocol_2._init_noise() protocol_2._send_handshake_init_request() protocol_2._read_ack() - # protocol_2._read_handshake_init_response() + protocol_2._read_handshake_init_response() + + protocol_2._send_handshake_completion_request() + protocol_2._read_ack() + protocol_2._read_handshake_completion_response() + + protocol_2._do_pairing(helper_debug=client.debug) + + protocol_1._send_handshake_completion_request() + protocol_1._read_ack() + + with pytest.raises(Exception) as e: + protocol_1._read_handshake_completion_response() + assert e.value.args[0] == "Received ThpError: UNALLOCATED CHANNEL" + + # TODO - test ACK fallback, test standard encrypted message fallback