From 9b16b9b7a7ecce37fa6db1b0603c5d1d6d18f651 Mon Sep 17 00:00:00 2001 From: M1nd3r Date: Thu, 30 Jan 2025 18:52:25 +0100 Subject: [PATCH] feat(core, python): implement new code entry pairing [no changelog] --- core/src/apps/thp/pairing.py | 5 +-- python/src/trezorlib/transport/thp/cpace.py | 40 +++++++++++++++++++++ tests/device_tests/thp/test_thp.py | 40 ++++++++++++--------- 3 files changed, 66 insertions(+), 19 deletions(-) create mode 100644 python/src/trezorlib/transport/thp/cpace.py diff --git a/core/src/apps/thp/pairing.py b/core/src/apps/thp/pairing.py index 289070a542..b3eeb79956 100644 --- a/core/src/apps/thp/pairing.py +++ b/core/src/apps/thp/pairing.py @@ -198,10 +198,10 @@ async def _handle_code_entry_is_selected_first_time(ctx: PairingContext) -> None if challenge_message.challenge is None: raise Exception("Invalid message") - sha_ctx = sha256(ctx.channel_ctx.get_handshake_hash()) + sha_ctx = sha256(ThpPairingMethod.CodeEntry.to_bytes(1, "big")) + sha_ctx.update(ctx.channel_ctx.get_handshake_hash()) sha_ctx.update(ctx.code_entry_secret) sha_ctx.update(challenge_message.challenge) - sha_ctx.update(bytes("PairingMethod_CodeEntry", "utf-8")) code_code_entry_hash = sha_ctx.digest() ctx.display_data.code_code_entry = ( int.from_bytes(code_code_entry_hash, "big") % 1000000 @@ -327,6 +327,7 @@ async def _handle_nfc_tag( assert ctx.nfc_secret is not None assert ctx.handshake_hash_host is not None assert ctx.nfc_secret_host is not None + assert len(ctx.nfc_secret_host) == 16 sha_ctx = sha256(ThpPairingMethod.NFC.to_bytes(1, "big")) sha_ctx.update(ctx.channel_ctx.get_handshake_hash()) diff --git a/python/src/trezorlib/transport/thp/cpace.py b/python/src/trezorlib/transport/thp/cpace.py new file mode 100644 index 0000000000..d0b28e265c --- /dev/null +++ b/python/src/trezorlib/transport/thp/cpace.py @@ -0,0 +1,40 @@ +import typing as t +from hashlib import sha512 + +from . import curve25519 + +_PREFIX = b"\x08\x43\x50\x61\x63\x65\x32\x35\x35\x06" +_PADDING = b"\x6f\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20" + + +class Cpace: + """ + CPace, a balanced composable PAKE: https://datatracker.ietf.org/doc/draft-irtf-cfrg-cpace/ + """ + + random_bytes: t.Callable[[int], bytes] + + def __init__(self, handshake_hash: bytes) -> None: + self.handshake_hash: bytes = handshake_hash + self.shared_secret: bytes + self.host_private_key: bytes + self.host_public_key: bytes + + def generate_keys_and_secret( + self, code_code_entry: bytes, trezor_public_key: bytes + ) -> None: + """ + Generate ephemeral key pair and a shared secret using Elligator2 with X25519. + """ + sha_ctx = sha512(_PREFIX) + sha_ctx.update(code_code_entry) + sha_ctx.update(_PADDING) + sha_ctx.update(self.handshake_hash) + sha_ctx.update(b"\x00") + pregenerator = sha_ctx.digest()[:32] + generator = curve25519.elligator2(pregenerator) + self.host_private_key = self.random_bytes(32) + self.host_public_key = curve25519.multiply(self.host_private_key, generator) + self.shared_secret = curve25519.multiply( + self.host_private_key, trezor_public_key + ) diff --git a/tests/device_tests/thp/test_thp.py b/tests/device_tests/thp/test_thp.py index 8c0b16433d..72bfe3a75c 100644 --- a/tests/device_tests/thp/test_thp.py +++ b/tests/device_tests/thp/test_thp.py @@ -1,6 +1,7 @@ -import hashlib import os +import random import typing as t +from hashlib import sha256 import pytest import typing_extensions as tx @@ -29,6 +30,7 @@ from trezorlib.messages import ( ThpSelectMethod, ) from trezorlib.transport.thp import curve25519 +from trezorlib.transport.thp.cpace import Cpace from trezorlib.transport.thp.protocol_v2 import MANAGEMENT_SESSION_ID, _hkdf if t.TYPE_CHECKING: @@ -164,7 +166,7 @@ def test_pairing_qr_code(client: Client) -> None: code = pairing_info.code_qr_code # Compute tag for response - sha_ctx = hashlib.sha256(protocol.handshake_hash) + sha_ctx = sha256(protocol.handshake_hash) sha_ctx.update(code) tag = sha_ctx.digest() @@ -173,7 +175,7 @@ def test_pairing_qr_code(client: Client) -> None: secret_msg = _read_message(ThpQrCodeSecret) # Check that the `code` was derived from the revealed secret - sha_ctx = hashlib.sha256(ThpPairingMethod.QrCode.to_bytes(1, "big")) + sha_ctx = sha256(ThpPairingMethod.QrCode.to_bytes(1, "big")) sha_ctx.update(protocol.handshake_hash) sha_ctx.update(secret_msg.secret) computed_code = sha_ctx.digest()[:16] @@ -185,7 +187,6 @@ def test_pairing_qr_code(client: Client) -> None: protocol._has_valid_channel = True -@pytest.mark.skip("Cpace is not implemented yet") def test_pairing_code_entry(client: Client) -> None: global protocol _prepare_protocol(client) @@ -213,7 +214,7 @@ def test_pairing_code_entry(client: Client) -> None: commitment_msg = _read_message(ThpCodeEntryCommitment) commitment = commitment_msg.commitment - challenge = b"\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xAA\xBB\xCC\xDD\xEE\xFF" + challenge = random.randbytes(16) _send_message(ThpCodeEntryChallenge(challenge=challenge)) cpace_trezor = _read_message(ThpCodeEntryCpaceTrezor) @@ -228,16 +229,15 @@ def test_pairing_code_entry(client: Client) -> None: ) code = pairing_info.code_entry_code - # TODO fix missing CPACE - cpace_shared_secret = b"\x01" - sha_ctx = hashlib.sha256(cpace_shared_secret) + cpace = Cpace(handshake_hash=protocol.handshake_hash) + cpace.random_bytes = random.randbytes + cpace.generate_keys_and_secret(code.to_bytes(6, "big"), cpace_trezor_public_key) + sha_ctx = sha256(cpace.shared_secret) tag = sha_ctx.digest() - cpace_host_public_key = cpace_trezor_public_key - _send_message( ThpCodeEntryCpaceHostTag( - cpace_host_public_key=cpace_host_public_key, + cpace_host_public_key=cpace.host_public_key, tag=tag, ) ) @@ -245,10 +245,17 @@ def test_pairing_code_entry(client: Client) -> None: secret_msg = _read_message(ThpCodeEntrySecret) # Check `commitment` and `code` - sha_ctx = hashlib.sha256(secret_msg.secret) + sha_ctx = sha256(secret_msg.secret) computed_commitment = sha_ctx.digest() assert commitment == computed_commitment - assert code == b"" # TODO implement + + sha_ctx = sha256(ThpPairingMethod.CodeEntry.to_bytes(1, "big")) + sha_ctx.update(protocol.handshake_hash) + sha_ctx.update(secret_msg.secret) + sha_ctx.update(challenge) + code_hash = sha_ctx.digest() + computed_code = int.from_bytes(code_hash, "big") % 1000000 + assert code == computed_code _send_message(ThpEndRequest()) _read_message(ThpEndResponse) @@ -286,8 +293,7 @@ def test_pairing_nfc(client: Client) -> None: _read_message(ButtonRequest) _send_message(ButtonAck()) - nfc_secret_host = b"\x02\x11\x22\x33\x44\x55\x66\x77\x88\x99\xAA\xBB\xCC\xDD\xEE\xFF" # TODO generate randomly - + nfc_secret_host = random.randbytes(16) # Read `nfc_secret` and `handshake_hash` from Trezor using debuglink pairing_info = client.debug.pairing_info( thp_channel_id=protocol.channel_id.to_bytes(2, "big"), @@ -300,7 +306,7 @@ def test_pairing_nfc(client: Client) -> None: assert handshake_hash_trezor[:16] == protocol.handshake_hash[:16] # Compute tag for response - sha_ctx = hashlib.sha256(ThpPairingMethod.NFC.to_bytes(1, "big")) + sha_ctx = sha256(ThpPairingMethod.NFC.to_bytes(1, "big")) sha_ctx.update(protocol.handshake_hash) sha_ctx.update(nfc_secret_trezor) tag_host = sha_ctx.digest() @@ -310,7 +316,7 @@ def test_pairing_nfc(client: Client) -> None: tag_trezor_msg = _read_message(ThpNfcTagTrezor) # Check that the `code` was derived from the revealed secret - sha_ctx = hashlib.sha256(ThpPairingMethod.NFC.to_bytes(1, "big")) + sha_ctx = sha256(ThpPairingMethod.NFC.to_bytes(1, "big")) sha_ctx.update(protocol.handshake_hash) sha_ctx.update(nfc_secret_host) computed_tag = sha_ctx.digest()