From 02c5676928287147afa253c0ecb086d7025031f7 Mon Sep 17 00:00:00 2001 From: M1nd3r Date: Mon, 24 Feb 2025 18:17:33 +0100 Subject: [PATCH] feat: use autoconnect=False credentials as autoconnect=True in case of channel replacement [no changelog] --- core/src/apps/thp/pairing.py | 2 ++ core/src/storage/cache_thp.py | 14 +++++++++ core/src/trezor/wire/thp/channel.py | 9 ++++++ core/src/trezor/wire/thp/memory_manager.py | 6 ++++ tests/device_tests/thp/test_thp.py | 35 +++++++++++++++++++++- 5 files changed, 65 insertions(+), 1 deletion(-) diff --git a/core/src/apps/thp/pairing.py b/core/src/apps/thp/pairing.py index aa08041f4b..2e438f841a 100644 --- a/core/src/apps/thp/pairing.py +++ b/core/src/apps/thp/pairing.py @@ -159,6 +159,8 @@ async def handle_credential_phase( if credential is not None: autoconnect = is_credential_autoconnect(credential) + if not autoconnect: + autoconnect = ctx.channel_ctx.is_channel_to_replace() if credential.cred_metadata is not None: ctx.host_name = credential.cred_metadata.host_name if ctx.host_name is None: diff --git a/core/src/storage/cache_thp.py b/core/src/storage/cache_thp.py index 6f5019bc80..55b3809528 100644 --- a/core/src/storage/cache_thp.py +++ b/core/src/storage/cache_thp.py @@ -267,6 +267,20 @@ def conditionally_replace_channel( return was_any_channel_replaced +def is_there_a_channel_to_replace( + new_channel: ChannelCache, required_state: int, required_key: int +) -> bool: + state = required_state.to_bytes(_CHANNEL_STATE_LENGTH, "big") + for channel in _CHANNELS: + if channel.channel_id == new_channel.channel_id: + continue + if channel.state == state and channel.get(required_key) == new_channel.get( + required_key + ): + return True + return False + + def _get_usage_counter_and_increment() -> int: global _usage_counter _usage_counter += 1 diff --git a/core/src/trezor/wire/thp/channel.py b/core/src/trezor/wire/thp/channel.py index 953ddf3879..90e357802c 100644 --- a/core/src/trezor/wire/thp/channel.py +++ b/core/src/trezor/wire/thp/channel.py @@ -15,6 +15,7 @@ from storage.cache_thp import ( ChannelCache, clear_sessions_with_channel_id, conditionally_replace_channel, + is_there_a_channel_to_replace, ) from trezor import log, loop, protobuf, utils, workflow from trezor.wire.errors import WireBufferError @@ -90,6 +91,7 @@ class Channel: def clear(self) -> None: clear_sessions_with_channel_id(self.channel_id) + memory_manager.release_lock_if_owner(self.get_channel_id_int()) self.channel_cache.clear() # ACCESS TO CHANNEL_DATA @@ -121,6 +123,13 @@ class Channel: ) log.debug(__name__, "Was any channel replaced? %s", str(was_any_replaced)) + def is_channel_to_replace(self) -> bool: + return is_there_a_channel_to_replace( + new_channel=self.channel_cache, + required_state=ChannelState.ENCRYPTED_TRANSPORT, + required_key=CHANNEL_HOST_STATIC_PUBKEY, + ) + # READ and DECRYPT def receive_packet(self, packet: utils.BufferType) -> Awaitable[None] | None: diff --git a/core/src/trezor/wire/thp/memory_manager.py b/core/src/trezor/wire/thp/memory_manager.py index 681642d8fc..ecfac1e6ff 100644 --- a/core/src/trezor/wire/thp/memory_manager.py +++ b/core/src/trezor/wire/thp/memory_manager.py @@ -30,6 +30,12 @@ _WRITE: int = const(1) # Access to buffer slices +def release_lock_if_owner(channel_id: int) -> None: + global lock_owner_cid + if lock_owner_cid == channel_id: + lock_owner_cid = None + + def get_new_read_buffer(channel_id: int, length: int) -> memoryview: return _get_new_buffer(_READ, channel_id, length) diff --git a/tests/device_tests/thp/test_thp.py b/tests/device_tests/thp/test_thp.py index 8519ed21d9..6347b4e12a 100644 --- a/tests/device_tests/thp/test_thp.py +++ b/tests/device_tests/thp/test_thp.py @@ -316,13 +316,22 @@ def test_credential_phase(client: Client) -> None: client.debug.press_yes() protocol._read_message(ThpEndResponse) - # Connect using credential with confirmation and ask for autoconnect credential + # Delete channel from the device by sending badly encrypted message + # This is done to prevent channel replacement and trigerring of autoconnect false -> true + protocol.nonce_request = 250 + protocol._send_message(ButtonAck()) + with pytest.raises(Exception) as e: + protocol.read(1) + assert e.value.args[0] == "Received ThpError: DECRYPTION FAILED" + + # Connect using credential with confirmation and ask for autoconnect credential. protocol = _prepare_protocol(client) protocol._do_channel_allocation() protocol._do_handshake(credential, host_static_privkey) protocol._send_message( ThpCredentialRequest(host_static_pubkey=host_static_pubkey, autoconnect=True) ) + # Confirmation dialog is shown. (Channel replacement is not triggered.) button_req = protocol._read_message(ButtonRequest) assert button_req.name == "connection_request" protocol._send_message(ButtonAck()) @@ -333,6 +342,15 @@ def test_credential_phase(client: Client) -> None: protocol._send_message(ThpEndRequest()) protocol._read_message(ThpEndResponse) + # Connect using credential with confirmation + protocol = _prepare_protocol(client) + protocol._do_channel_allocation() + protocol._do_handshake(credential, host_static_privkey) + # Confirmation dialog is not shown as channel in ENCRYPTED TRANSPORT state with the same + # host static public key is still available in Trezor's cache. (Channel replacement is triggered.) + protocol._send_message(ThpEndRequest()) + protocol._read_message(ThpEndResponse) + # Connect using autoconnect credential protocol = _prepare_protocol(client) protocol._do_channel_allocation() @@ -340,6 +358,21 @@ def test_credential_phase(client: Client) -> None: protocol._send_message(ThpEndRequest()) protocol._read_message(ThpEndResponse) + # Delete channel from the device by sending badly encrypted message + # This is done to prevent channel replacement and trigerring of autoconnect false -> true + protocol.nonce_request = 250 + protocol._send_message(ButtonAck()) + with pytest.raises(Exception) as e: + protocol.read(1) + assert e.value.args[0] == "Received ThpError: DECRYPTION FAILED" + + # Connect using autoconnect credential - should work the same as above + protocol = _prepare_protocol(client) + protocol._do_channel_allocation() + protocol._do_handshake(credential_auto, host_static_privkey) + protocol._send_message(ThpEndRequest()) + protocol._read_message(ThpEndResponse) + @pytest.mark.setup_client(passphrase=True) def test_channel_replacement(client: Client) -> None: