1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-02-07 05:02:38 +00:00
trezor-firmware/tests/device_tests/thp/test_thp.py

323 lines
9.3 KiB
Python
Raw Normal View History

2025-01-27 13:04:51 +00:00
import hashlib
import os
import typing as t
import pytest
import typing_extensions as tx
from trezorlib import protobuf
from trezorlib.client import ProtocolV2
from trezorlib.debuglink import TrezorClientDebugLink as Client
2025-01-27 13:04:51 +00:00
from trezorlib.messages import (
ButtonAck,
ButtonRequest,
ThpCodeEntryChallenge,
ThpCodeEntryCommitment,
ThpCodeEntryCpaceHostTag,
2025-01-27 13:04:51 +00:00
ThpCodeEntryCpaceTrezor,
ThpCodeEntrySecret,
2025-01-27 13:04:51 +00:00
ThpEndRequest,
ThpEndResponse,
ThpNfcTagHost,
ThpNfcTagTrezor,
2025-01-27 13:04:51 +00:00
ThpPairingMethod,
ThpPairingPreparationsFinished,
ThpPairingRequest,
ThpPairingRequestApproved,
ThpQrCodeSecret,
ThpQrCodeTag,
ThpSelectMethod,
)
from trezorlib.transport.thp import curve25519
2025-01-27 13:04:51 +00:00
from trezorlib.transport.thp.protocol_v2 import MANAGEMENT_SESSION_ID, _hkdf
if t.TYPE_CHECKING:
P = tx.ParamSpec("P")
MT = t.TypeVar("MT", bound=protobuf.MessageType)
pytestmark = [pytest.mark.protocol("protocol_v2")]
protocol: ProtocolV2
def _prepare_protocol(client: Client):
global protocol
protocol = client.protocol
protocol.sync_bit_send = 0
protocol.sync_bit_receive = 0
def test_allocate_channel(client: Client) -> None:
global protocol
_prepare_protocol(client)
# protocol: ProtocolV2 = client.protocol
nonce = b"\x1A\x2B\x3B\x4A\x5C\x6D\x7E\x8F"
# Use valid nonce
protocol._send_channel_allocation_request(nonce)
protocol._read_channel_allocation_response(nonce)
# Expect different nonce
protocol._send_channel_allocation_request(nonce)
with pytest.raises(Exception, match="Invalid channel allocation response."):
protocol._read_channel_allocation_response(
expected_nonce=b"\xDE\xAD\xBE\xEF\xDE\xAD\xBE\xEF"
)
client.invalidate()
def test_handshake(client: Client) -> None:
global protocol
_prepare_protocol(client)
# protocol: ProtocolV2 = client.protocol
host_ephemeral_privkey = curve25519.get_private_key(os.urandom(32))
host_ephemeral_pubkey = curve25519.get_public_key(host_ephemeral_privkey)
protocol._do_channel_allocation()
protocol._send_handshake_init_request(host_ephemeral_pubkey)
protocol._read_ack()
init_response = protocol._read_handshake_init_response()
trezor_ephemeral_pubkey = init_response[:32]
encrypted_trezor_static_pubkey = init_response[32:80]
noise_tag = init_response[80:96]
# TODO check noise_tag is valid
ck = protocol._send_handshake_completion_request(
host_ephemeral_pubkey,
host_ephemeral_privkey,
trezor_ephemeral_pubkey,
encrypted_trezor_static_pubkey,
)
protocol._read_ack()
protocol._read_handshake_completion_response()
protocol.key_request, protocol.key_response = _hkdf(ck, b"")
protocol.nonce_request = 0
protocol.nonce_response = 1
# TODO - without pairing, the client is damaged and results in fail of the following test
# so far no luck in solving it - it should be also tackled in FW, as it causes unexpected FW error
2025-01-27 13:04:51 +00:00
protocol._do_pairing(client.debug)
# TODO the following is just to make style checker happy
assert noise_tag is not None
2025-01-27 13:04:51 +00:00
def _send_message(
message: MT,
session_id: int = MANAGEMENT_SESSION_ID,
):
global protocol
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(session_id, message_type, message_data)
protocol._read_ack()
def _read_message(message_type: type[MT]) -> MT:
global protocol
_, msg_type, msg_data = protocol.read_and_decrypt()
msg = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(msg, message_type)
return msg
2025-01-27 13:04:51 +00:00
def test_pairing_qr_code(client: Client) -> None:
global protocol
_prepare_protocol(client)
2025-01-27 13:04:51 +00:00
# Generate ephemeral keys
host_ephemeral_privkey = curve25519.get_private_key(os.urandom(32))
host_ephemeral_pubkey = curve25519.get_public_key(host_ephemeral_privkey)
protocol._do_channel_allocation()
protocol._do_handshake(host_ephemeral_privkey, host_ephemeral_pubkey)
_send_message(ThpPairingRequest())
2025-01-27 13:04:51 +00:00
_read_message(ButtonRequest)
2025-01-27 13:04:51 +00:00
_send_message(ButtonAck())
2025-01-27 13:04:51 +00:00
client.debug.press_yes()
_read_message(ThpPairingRequestApproved)
2025-01-27 13:04:51 +00:00
_send_message(ThpSelectMethod(selected_pairing_method=ThpPairingMethod.QrCode))
2025-01-27 13:04:51 +00:00
_read_message(ThpPairingPreparationsFinished)
2025-01-27 13:04:51 +00:00
# QR Code shown
_read_message(ButtonRequest)
_send_message(ButtonAck())
2025-01-27 13:04:51 +00:00
# Read code from "Trezor's display" using debuglink
pairing_info = client.debug.pairing_info(
thp_channel_id=protocol.channel_id.to_bytes(2, "big")
)
code = pairing_info.code_qr_code
2025-01-27 13:04:51 +00:00
# Compute tag for response
2025-01-27 13:04:51 +00:00
sha_ctx = hashlib.sha256(protocol.handshake_hash)
sha_ctx.update(code)
2025-01-27 13:04:51 +00:00
tag = sha_ctx.digest()
_send_message(ThpQrCodeTag(tag=tag))
2025-01-27 13:04:51 +00:00
secret_msg = _read_message(ThpQrCodeSecret)
2025-01-27 13:04:51 +00:00
# Check that the `code` was derived from the revealed secret
sha_ctx = hashlib.sha256(ThpPairingMethod.QrCode.to_bytes(1, "big"))
sha_ctx.update(protocol.handshake_hash)
sha_ctx.update(secret_msg.secret)
computed_code = sha_ctx.digest()[:16]
assert code == computed_code
2025-01-27 13:04:51 +00:00
_send_message(ThpEndRequest())
_read_message(ThpEndResponse)
2025-01-27 13:04:51 +00:00
protocol._has_valid_channel = True
@pytest.mark.skip("Cpace is not implemented yet")
def test_pairing_code_entry(client: Client) -> None:
global protocol
_prepare_protocol(client)
2025-01-27 13:04:51 +00:00
# Generate ephemeral keys
host_ephemeral_privkey = curve25519.get_private_key(os.urandom(32))
host_ephemeral_pubkey = curve25519.get_public_key(host_ephemeral_privkey)
protocol._do_channel_allocation()
protocol._do_handshake(host_ephemeral_privkey, host_ephemeral_pubkey)
_send_message(ThpPairingRequest())
2025-01-27 13:04:51 +00:00
_read_message(ButtonRequest)
2025-01-27 13:04:51 +00:00
_send_message(ButtonAck())
2025-01-27 13:04:51 +00:00
client.debug.press_yes()
_read_message(ThpPairingRequestApproved)
2025-01-27 13:04:51 +00:00
_send_message(ThpSelectMethod(selected_pairing_method=ThpPairingMethod.CodeEntry))
2025-01-27 13:04:51 +00:00
commitment_msg = _read_message(ThpCodeEntryCommitment)
commitment = commitment_msg.commitment
2025-01-27 13:04:51 +00:00
challenge = b"\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xAA\xBB\xCC\xDD\xEE\xFF"
_send_message(ThpCodeEntryChallenge(challenge=challenge))
2025-01-27 13:04:51 +00:00
cpace_trezor = _read_message(ThpCodeEntryCpaceTrezor)
cpace_trezor_public_key = cpace_trezor.cpace_trezor_public_key
2025-01-27 13:04:51 +00:00
# Code Entry code shown
_read_message(ButtonRequest)
_send_message(ButtonAck())
2025-01-27 13:04:51 +00:00
pairing_info = client.debug.pairing_info(
thp_channel_id=protocol.channel_id.to_bytes(2, "big")
)
code = pairing_info.code_entry_code
2025-01-27 13:04:51 +00:00
# TODO fix missing CPACE
cpace_shared_secret = b"\x01"
sha_ctx = hashlib.sha256(cpace_shared_secret)
2025-01-27 13:04:51 +00:00
tag = sha_ctx.digest()
cpace_host_public_key = cpace_trezor_public_key
2025-01-27 13:04:51 +00:00
_send_message(
ThpCodeEntryCpaceHostTag(
cpace_host_public_key=cpace_host_public_key,
tag=tag,
)
)
2025-01-27 13:04:51 +00:00
secret_msg = _read_message(ThpCodeEntrySecret)
2025-01-27 13:04:51 +00:00
# Check `commitment` and `code`
sha_ctx = hashlib.sha256(secret_msg.secret)
computed_commitment = sha_ctx.digest()
assert commitment == computed_commitment
assert code == b"" # TODO implement
2025-01-27 13:04:51 +00:00
_send_message(ThpEndRequest())
_read_message(ThpEndResponse)
2025-01-27 13:04:51 +00:00
protocol._has_valid_channel = True
def test_pairing_nfc(client: Client) -> None:
global protocol
_prepare_protocol(client)
# Generate ephemeral keys
host_ephemeral_privkey = curve25519.get_private_key(os.urandom(32))
host_ephemeral_pubkey = curve25519.get_public_key(host_ephemeral_privkey)
protocol._do_channel_allocation()
protocol._do_handshake(host_ephemeral_privkey, host_ephemeral_pubkey)
_send_message(ThpPairingRequest())
_read_message(ButtonRequest)
_send_message(ButtonAck())
client.debug.press_yes()
_read_message(ThpPairingRequestApproved)
_send_message(ThpSelectMethod(selected_pairing_method=ThpPairingMethod.NFC))
_read_message(ThpPairingPreparationsFinished)
# NFC screen shown
_read_message(ButtonRequest)
_send_message(ButtonAck())
nfc_secret_host = b"\x02\x11\x22\x33\x44\x55\x66\x77\x88\x99\xAA\xBB\xCC\xDD\xEE\xFF" # TODO generate randomly
# Read `nfc_secret` and `handshake_hash` from Trezor using debuglink
pairing_info = client.debug.pairing_info(
thp_channel_id=protocol.channel_id.to_bytes(2, "big"),
handshake_hash=protocol.handshake_hash,
nfc_secret_host=nfc_secret_host,
)
handshake_hash_trezor = pairing_info.handshake_hash
nfc_secret_trezor = pairing_info.nfc_secret_trezor
assert handshake_hash_trezor[:16] == protocol.handshake_hash[:16]
# Compute tag for response
sha_ctx = hashlib.sha256(ThpPairingMethod.NFC.to_bytes(1, "big"))
sha_ctx.update(protocol.handshake_hash)
sha_ctx.update(nfc_secret_trezor)
tag_host = sha_ctx.digest()
_send_message(ThpNfcTagHost(tag=tag_host))
tag_trezor_msg = _read_message(ThpNfcTagTrezor)
# Check that the `code` was derived from the revealed secret
sha_ctx = hashlib.sha256(ThpPairingMethod.NFC.to_bytes(1, "big"))
sha_ctx.update(protocol.handshake_hash)
sha_ctx.update(nfc_secret_host)
computed_tag = sha_ctx.digest()
assert tag_trezor_msg.tag == computed_tag
_send_message(ThpEndRequest())
_read_message(ThpEndResponse)
protocol._has_valid_channel = True