1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-03-12 22:26:08 +00:00

feat: use autoconnect=False credentials as autoconnect=True in case of channel replacement

[no changelog]
This commit is contained in:
M1nd3r 2025-02-24 18:17:33 +01:00
parent e7c7cf5443
commit 02c5676928
5 changed files with 65 additions and 1 deletions

View File

@ -159,6 +159,8 @@ async def handle_credential_phase(
if credential is not None:
autoconnect = is_credential_autoconnect(credential)
if not autoconnect:
autoconnect = ctx.channel_ctx.is_channel_to_replace()
if credential.cred_metadata is not None:
ctx.host_name = credential.cred_metadata.host_name
if ctx.host_name is None:

View File

@ -267,6 +267,20 @@ def conditionally_replace_channel(
return was_any_channel_replaced
def is_there_a_channel_to_replace(
new_channel: ChannelCache, required_state: int, required_key: int
) -> bool:
state = required_state.to_bytes(_CHANNEL_STATE_LENGTH, "big")
for channel in _CHANNELS:
if channel.channel_id == new_channel.channel_id:
continue
if channel.state == state and channel.get(required_key) == new_channel.get(
required_key
):
return True
return False
def _get_usage_counter_and_increment() -> int:
global _usage_counter
_usage_counter += 1

View File

@ -15,6 +15,7 @@ from storage.cache_thp import (
ChannelCache,
clear_sessions_with_channel_id,
conditionally_replace_channel,
is_there_a_channel_to_replace,
)
from trezor import log, loop, protobuf, utils, workflow
from trezor.wire.errors import WireBufferError
@ -90,6 +91,7 @@ class Channel:
def clear(self) -> None:
clear_sessions_with_channel_id(self.channel_id)
memory_manager.release_lock_if_owner(self.get_channel_id_int())
self.channel_cache.clear()
# ACCESS TO CHANNEL_DATA
@ -121,6 +123,13 @@ class Channel:
)
log.debug(__name__, "Was any channel replaced? %s", str(was_any_replaced))
def is_channel_to_replace(self) -> bool:
return is_there_a_channel_to_replace(
new_channel=self.channel_cache,
required_state=ChannelState.ENCRYPTED_TRANSPORT,
required_key=CHANNEL_HOST_STATIC_PUBKEY,
)
# READ and DECRYPT
def receive_packet(self, packet: utils.BufferType) -> Awaitable[None] | None:

View File

@ -30,6 +30,12 @@ _WRITE: int = const(1)
# Access to buffer slices
def release_lock_if_owner(channel_id: int) -> None:
global lock_owner_cid
if lock_owner_cid == channel_id:
lock_owner_cid = None
def get_new_read_buffer(channel_id: int, length: int) -> memoryview:
return _get_new_buffer(_READ, channel_id, length)

View File

@ -316,13 +316,22 @@ def test_credential_phase(client: Client) -> None:
client.debug.press_yes()
protocol._read_message(ThpEndResponse)
# Connect using credential with confirmation and ask for autoconnect credential
# Delete channel from the device by sending badly encrypted message
# This is done to prevent channel replacement and trigerring of autoconnect false -> true
protocol.nonce_request = 250
protocol._send_message(ButtonAck())
with pytest.raises(Exception) as e:
protocol.read(1)
assert e.value.args[0] == "Received ThpError: DECRYPTION FAILED"
# Connect using credential with confirmation and ask for autoconnect credential.
protocol = _prepare_protocol(client)
protocol._do_channel_allocation()
protocol._do_handshake(credential, host_static_privkey)
protocol._send_message(
ThpCredentialRequest(host_static_pubkey=host_static_pubkey, autoconnect=True)
)
# Confirmation dialog is shown. (Channel replacement is not triggered.)
button_req = protocol._read_message(ButtonRequest)
assert button_req.name == "connection_request"
protocol._send_message(ButtonAck())
@ -333,6 +342,15 @@ def test_credential_phase(client: Client) -> None:
protocol._send_message(ThpEndRequest())
protocol._read_message(ThpEndResponse)
# Connect using credential with confirmation
protocol = _prepare_protocol(client)
protocol._do_channel_allocation()
protocol._do_handshake(credential, host_static_privkey)
# Confirmation dialog is not shown as channel in ENCRYPTED TRANSPORT state with the same
# host static public key is still available in Trezor's cache. (Channel replacement is triggered.)
protocol._send_message(ThpEndRequest())
protocol._read_message(ThpEndResponse)
# Connect using autoconnect credential
protocol = _prepare_protocol(client)
protocol._do_channel_allocation()
@ -340,6 +358,21 @@ def test_credential_phase(client: Client) -> None:
protocol._send_message(ThpEndRequest())
protocol._read_message(ThpEndResponse)
# Delete channel from the device by sending badly encrypted message
# This is done to prevent channel replacement and trigerring of autoconnect false -> true
protocol.nonce_request = 250
protocol._send_message(ButtonAck())
with pytest.raises(Exception) as e:
protocol.read(1)
assert e.value.args[0] == "Received ThpError: DECRYPTION FAILED"
# Connect using autoconnect credential - should work the same as above
protocol = _prepare_protocol(client)
protocol._do_channel_allocation()
protocol._do_handshake(credential_auto, host_static_privkey)
protocol._send_message(ThpEndRequest())
protocol._read_message(ThpEndResponse)
@pytest.mark.setup_client(passphrase=True)
def test_channel_replacement(client: Client) -> None: