import hashlib import os import typing as t import pytest import typing_extensions as tx from trezorlib import protobuf from trezorlib.client import ProtocolV2 from trezorlib.debuglink import TrezorClientDebugLink as Client from trezorlib.messages import ( ButtonAck, ButtonRequest, ThpCodeEntryChallenge, ThpCodeEntryCommitment, ThpCodeEntryCpaceHostTag, ThpCodeEntryCpaceTrezor, ThpCodeEntrySecret, ThpEndRequest, ThpEndResponse, ThpNfcTagHost, ThpNfcTagTrezor, ThpPairingMethod, ThpPairingPreparationsFinished, ThpPairingRequest, ThpPairingRequestApproved, ThpQrCodeSecret, ThpQrCodeTag, ThpSelectMethod, ) from trezorlib.transport.thp import curve25519 from trezorlib.transport.thp.protocol_v2 import MANAGEMENT_SESSION_ID, _hkdf if t.TYPE_CHECKING: P = tx.ParamSpec("P") MT = t.TypeVar("MT", bound=protobuf.MessageType) pytestmark = [pytest.mark.protocol("protocol_v2")] protocol: ProtocolV2 def _prepare_protocol(client: Client): global protocol protocol = client.protocol protocol.sync_bit_send = 0 protocol.sync_bit_receive = 0 def test_allocate_channel(client: Client) -> None: global protocol _prepare_protocol(client) # protocol: ProtocolV2 = client.protocol nonce = b"\x1A\x2B\x3B\x4A\x5C\x6D\x7E\x8F" # Use valid nonce protocol._send_channel_allocation_request(nonce) protocol._read_channel_allocation_response(nonce) # Expect different nonce protocol._send_channel_allocation_request(nonce) with pytest.raises(Exception, match="Invalid channel allocation response."): protocol._read_channel_allocation_response( expected_nonce=b"\xDE\xAD\xBE\xEF\xDE\xAD\xBE\xEF" ) client.invalidate() def test_handshake(client: Client) -> None: global protocol _prepare_protocol(client) # protocol: ProtocolV2 = client.protocol host_ephemeral_privkey = curve25519.get_private_key(os.urandom(32)) host_ephemeral_pubkey = curve25519.get_public_key(host_ephemeral_privkey) protocol._do_channel_allocation() protocol._send_handshake_init_request(host_ephemeral_pubkey) protocol._read_ack() init_response = protocol._read_handshake_init_response() trezor_ephemeral_pubkey = init_response[:32] encrypted_trezor_static_pubkey = init_response[32:80] noise_tag = init_response[80:96] # TODO check noise_tag is valid ck = protocol._send_handshake_completion_request( host_ephemeral_pubkey, host_ephemeral_privkey, trezor_ephemeral_pubkey, encrypted_trezor_static_pubkey, ) protocol._read_ack() protocol._read_handshake_completion_response() protocol.key_request, protocol.key_response = _hkdf(ck, b"") protocol.nonce_request = 0 protocol.nonce_response = 1 # TODO - without pairing, the client is damaged and results in fail of the following test # so far no luck in solving it - it should be also tackled in FW, as it causes unexpected FW error protocol._do_pairing(client.debug) # TODO the following is just to make style checker happy assert noise_tag is not None def _send_message( message: MT, session_id: int = MANAGEMENT_SESSION_ID, ): global protocol message_type, message_data = protocol.mapping.encode(message) protocol._encrypt_and_write(session_id, message_type, message_data) protocol._read_ack() def _read_message(message_type: type[MT]) -> MT: global protocol _, msg_type, msg_data = protocol.read_and_decrypt() msg = protocol.mapping.decode(msg_type, msg_data) assert isinstance(msg, message_type) return msg def test_pairing_qr_code(client: Client) -> None: global protocol _prepare_protocol(client) # Generate ephemeral keys host_ephemeral_privkey = curve25519.get_private_key(os.urandom(32)) host_ephemeral_pubkey = curve25519.get_public_key(host_ephemeral_privkey) protocol._do_channel_allocation() protocol._do_handshake(host_ephemeral_privkey, host_ephemeral_pubkey) _send_message(ThpPairingRequest()) _read_message(ButtonRequest) _send_message(ButtonAck()) client.debug.press_yes() _read_message(ThpPairingRequestApproved) _send_message(ThpSelectMethod(selected_pairing_method=ThpPairingMethod.QrCode)) _read_message(ThpPairingPreparationsFinished) # QR Code shown _read_message(ButtonRequest) _send_message(ButtonAck()) # Read code from "Trezor's display" using debuglink pairing_info = client.debug.pairing_info( thp_channel_id=protocol.channel_id.to_bytes(2, "big") ) code = pairing_info.code_qr_code # Compute tag for response sha_ctx = hashlib.sha256(protocol.handshake_hash) sha_ctx.update(code) tag = sha_ctx.digest() _send_message(ThpQrCodeTag(tag=tag)) secret_msg = _read_message(ThpQrCodeSecret) # Check that the `code` was derived from the revealed secret sha_ctx = hashlib.sha256(ThpPairingMethod.QrCode.to_bytes(1, "big")) sha_ctx.update(protocol.handshake_hash) sha_ctx.update(secret_msg.secret) computed_code = sha_ctx.digest()[:16] assert code == computed_code _send_message(ThpEndRequest()) _read_message(ThpEndResponse) protocol._has_valid_channel = True @pytest.mark.skip("Cpace is not implemented yet") def test_pairing_code_entry(client: Client) -> None: global protocol _prepare_protocol(client) # Generate ephemeral keys host_ephemeral_privkey = curve25519.get_private_key(os.urandom(32)) host_ephemeral_pubkey = curve25519.get_public_key(host_ephemeral_privkey) protocol._do_channel_allocation() protocol._do_handshake(host_ephemeral_privkey, host_ephemeral_pubkey) _send_message(ThpPairingRequest()) _read_message(ButtonRequest) _send_message(ButtonAck()) client.debug.press_yes() _read_message(ThpPairingRequestApproved) _send_message(ThpSelectMethod(selected_pairing_method=ThpPairingMethod.CodeEntry)) commitment_msg = _read_message(ThpCodeEntryCommitment) commitment = commitment_msg.commitment challenge = b"\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xAA\xBB\xCC\xDD\xEE\xFF" _send_message(ThpCodeEntryChallenge(challenge=challenge)) cpace_trezor = _read_message(ThpCodeEntryCpaceTrezor) cpace_trezor_public_key = cpace_trezor.cpace_trezor_public_key # Code Entry code shown _read_message(ButtonRequest) _send_message(ButtonAck()) pairing_info = client.debug.pairing_info( thp_channel_id=protocol.channel_id.to_bytes(2, "big") ) code = pairing_info.code_entry_code # TODO fix missing CPACE cpace_shared_secret = b"\x01" sha_ctx = hashlib.sha256(cpace_shared_secret) tag = sha_ctx.digest() cpace_host_public_key = cpace_trezor_public_key _send_message( ThpCodeEntryCpaceHostTag( cpace_host_public_key=cpace_host_public_key, tag=tag, ) ) secret_msg = _read_message(ThpCodeEntrySecret) # Check `commitment` and `code` sha_ctx = hashlib.sha256(secret_msg.secret) computed_commitment = sha_ctx.digest() assert commitment == computed_commitment assert code == b"" # TODO implement _send_message(ThpEndRequest()) _read_message(ThpEndResponse) protocol._has_valid_channel = True def test_pairing_nfc(client: Client) -> None: global protocol _prepare_protocol(client) # Generate ephemeral keys host_ephemeral_privkey = curve25519.get_private_key(os.urandom(32)) host_ephemeral_pubkey = curve25519.get_public_key(host_ephemeral_privkey) protocol._do_channel_allocation() protocol._do_handshake(host_ephemeral_privkey, host_ephemeral_pubkey) _send_message(ThpPairingRequest()) _read_message(ButtonRequest) _send_message(ButtonAck()) client.debug.press_yes() _read_message(ThpPairingRequestApproved) _send_message(ThpSelectMethod(selected_pairing_method=ThpPairingMethod.NFC)) _read_message(ThpPairingPreparationsFinished) # NFC screen shown _read_message(ButtonRequest) _send_message(ButtonAck()) nfc_secret_host = b"\x02\x11\x22\x33\x44\x55\x66\x77\x88\x99\xAA\xBB\xCC\xDD\xEE\xFF" # TODO generate randomly # Read `nfc_secret` and `handshake_hash` from Trezor using debuglink pairing_info = client.debug.pairing_info( thp_channel_id=protocol.channel_id.to_bytes(2, "big"), handshake_hash=protocol.handshake_hash, nfc_secret_host=nfc_secret_host, ) handshake_hash_trezor = pairing_info.handshake_hash nfc_secret_trezor = pairing_info.nfc_secret_trezor assert handshake_hash_trezor[:16] == protocol.handshake_hash[:16] # Compute tag for response sha_ctx = hashlib.sha256(ThpPairingMethod.NFC.to_bytes(1, "big")) sha_ctx.update(protocol.handshake_hash) sha_ctx.update(nfc_secret_trezor) tag_host = sha_ctx.digest() _send_message(ThpNfcTagHost(tag=tag_host)) tag_trezor_msg = _read_message(ThpNfcTagTrezor) # Check that the `code` was derived from the revealed secret sha_ctx = hashlib.sha256(ThpPairingMethod.NFC.to_bytes(1, "big")) sha_ctx.update(protocol.handshake_hash) sha_ctx.update(nfc_secret_host) computed_tag = sha_ctx.digest() assert tag_trezor_msg.tag == computed_tag _send_message(ThpEndRequest()) _read_message(ThpEndResponse) protocol._has_valid_channel = True