1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-02-07 05:02:38 +00:00

temp: update pairing process, part 1

This commit is contained in:
M1nd3r 2025-01-27 14:04:51 +01:00
parent d478f7fa27
commit 8b64fe364c
24 changed files with 1470 additions and 1035 deletions

View File

@ -134,7 +134,7 @@ message DebugLinkState {
repeated string tokens = 13; // current layout represented as a list of string tokens repeated string tokens = 13; // current layout represented as a list of string tokens
optional uint32 thp_pairing_code_entry_code = 14; optional uint32 thp_pairing_code_entry_code = 14;
optional bytes thp_pairing_code_qr_code = 15; optional bytes thp_pairing_code_qr_code = 15;
optional bytes thp_pairing_code_nfc_unidirectional = 16; optional bytes thp_pairing_code_nfc = 16;
} }
/** /**

View File

@ -13,28 +13,28 @@ option (include_in_bitcoin_only) = true;
* Mapping between Trezor wire identifier (uint) and a Thp protobuf message * Mapping between Trezor wire identifier (uint) and a Thp protobuf message
*/ */
enum ThpMessageType { enum ThpMessageType {
reserved 0 to 999; // Values reserved by other messages, see messages.proto reserved 0 to 999; // Values reserved by other messages, see messages.proto
ThpMessageType_ThpCreateNewSession = 1000[(bitcoin_only)=true]; ThpMessageType_ThpCreateNewSession = 1000 [(bitcoin_only)=true];
ThpMessageType_ThpNewSession = 1001[(bitcoin_only)=true]; ThpMessageType_ThpPairingRequest = 1006 [(bitcoin_only) = true];
ThpMessageType_ThpStartPairingRequest = 1008 [(bitcoin_only) = true]; ThpMessageType_ThpPairingRequestApproved = 1007 [(bitcoin_only) = true];
ThpMessageType_ThpSelectMethod = 1008 [(bitcoin_only) = true];
ThpMessageType_ThpPairingPreparationsFinished = 1009 [(bitcoin_only) = true]; ThpMessageType_ThpPairingPreparationsFinished = 1009 [(bitcoin_only) = true];
ThpMessageType_ThpCredentialRequest = 1010 [(bitcoin_only) = true]; ThpMessageType_ThpCredentialRequest = 1010 [(bitcoin_only) = true];
ThpMessageType_ThpCredentialResponse = 1011 [(bitcoin_only) = true]; ThpMessageType_ThpCredentialResponse = 1011 [(bitcoin_only) = true];
ThpMessageType_ThpEndRequest = 1012 [(bitcoin_only) = true]; ThpMessageType_ThpEndRequest = 1012 [(bitcoin_only) = true];
ThpMessageType_ThpEndResponse = 1013[(bitcoin_only) = true]; ThpMessageType_ThpEndResponse = 1013 [(bitcoin_only) = true];
ThpMessageType_ThpCodeEntryCommitment = 1016[(bitcoin_only)=true]; ThpMessageType_ThpCodeEntryCommitment = 1016 [(bitcoin_only)=true];
ThpMessageType_ThpCodeEntryChallenge = 1017[(bitcoin_only)=true]; ThpMessageType_ThpCodeEntryChallenge = 1017 [(bitcoin_only)=true];
ThpMessageType_ThpCodeEntryCpaceHost = 1018[(bitcoin_only)=true]; ThpMessageType_ThpCodeEntryCpaceTrezor = 1018 [(bitcoin_only)=true];
ThpMessageType_ThpCodeEntryCpaceTrezor = 1019[(bitcoin_only)=true]; ThpMessageType_ThpCodeEntryCpaceHostTag = 1019 [(bitcoin_only)=true];
ThpMessageType_ThpCodeEntryTag = 1020[(bitcoin_only)=true]; ThpMessageType_ThpCodeEntrySecret = 1020 [(bitcoin_only)=true];
ThpMessageType_ThpCodeEntrySecret = 1021[(bitcoin_only)=true]; ThpMessageType_ThpQrCodeTag = 1024 [(bitcoin_only)=true];
ThpMessageType_ThpQrCodeTag = 1024[(bitcoin_only)=true]; ThpMessageType_ThpQrCodeSecret = 1025 [(bitcoin_only)=true];
ThpMessageType_ThpQrCodeSecret = 1025[(bitcoin_only)=true]; ThpMessageType_ThpNfcTagHost = 1032 [(bitcoin_only)=true];
ThpMessageType_ThpNfcUnidirectionalTag = 1032[(bitcoin_only)=true]; ThpMessageType_ThpNfcTagTrezor = 1033 [(bitcoin_only)=true];
ThpMessageType_ThpNfcUnidirectionalSecret = 1033[(bitcoin_only)=true];
reserved 1100 to 2147483647; // Values reserved by other messages, see messages.proto reserved 1100 to 2147483647; // Values reserved by other messages, see messages.proto
} }
@ -43,10 +43,10 @@ enum ThpMessageType {
* @embed * @embed
*/ */
enum ThpPairingMethod { enum ThpPairingMethod {
NoMethod = 1; // Trust without MITM protection. SkipPairing = 1; // Trust without MITM protection.
CodeEntry = 2; // User types code diplayed on Trezor into the host application. CodeEntry = 2; // User types code diplayed on Trezor into the host application.
QrCode = 3; // User scans code displayed on Trezor into host application. QrCode = 3; // User scans code displayed on Trezor into host application.
NFC_Unidirectional = 4; // Trezor transmits an authentication key to the host device via NFC. NFC = 4; // Trezor and host application exchange authentication secrets via NFC.
} }
/** /**
@ -55,8 +55,8 @@ enum ThpPairingMethod {
message ThpDeviceProperties { message ThpDeviceProperties {
optional string internal_model = 1; // Internal model name e.g. "T2B1". optional string internal_model = 1; // Internal model name e.g. "T2B1".
optional uint32 model_variant = 2; // Encodes the device properties such as color. optional uint32 model_variant = 2; // Encodes the device properties such as color.
optional bool bootloader_mode = 3; // Indicates whether the device is in bootloader or firmware mode. optional uint32 protocol_version_major = 3; // The major version of the communication protocol used by the firmware.
optional uint32 protocol_version = 4; // The communication protocol version supported by the firmware. optional uint32 protocol_version_minor = 4; // The minor version of the communication protocol used by the firmware.
repeated ThpPairingMethod pairing_methods = 5; // The pairing methods supported by the Trezor. repeated ThpPairingMethod pairing_methods = 5; // The pairing methods supported by the Trezor.
} }
@ -65,13 +65,11 @@ message ThpDeviceProperties {
*/ */
message ThpHandshakeCompletionReqNoisePayload { message ThpHandshakeCompletionReqNoisePayload {
optional bytes host_pairing_credential = 1; // Host's pairing credential optional bytes host_pairing_credential = 1; // Host's pairing credential
repeated ThpPairingMethod pairing_methods = 2; // The pairing methods chosen by the host
} }
/** /**
* Request: Ask device for a new session with given passphrase. * Request: Ask device for a new session with given passphrase.
* @start * @start
* @next ThpNewSession
* @next Success * @next Success
*/ */
message ThpCreateNewSession{ message ThpCreateNewSession{
@ -80,31 +78,41 @@ message ThpCreateNewSession{
optional bool derive_cardano = 3; // If True, Cardano keys will be derived. Ignored with BTC-only optional bool derive_cardano = 3; // If True, Cardano keys will be derived. Ignored with BTC-only
} }
/**
* Response: Contains session_id of the newly created session.
* @end
*/
message ThpNewSession{
optional uint32 new_session_id = 1;
}
/** /**
* Request: Start pairing process. * Request: Start pairing process.
* @start * @start
* @next ThpCodeEntryCommitment * @next ThpPairingRequestApproved
* @next ThpPairingPreparationsFinished
*/ */
message ThpStartPairingRequest{ message ThpPairingRequest{
optional string host_name = 1; // Human-readable host name optional string host_name = 1; // Human-readable host name
}
/**
* Response: Host is allowed to start pairing process.
* @start
* @next ThpSelectMethod
*/
message ThpPairingRequestApproved{
}
/**
* Request: Start pairing using the method selected.
* @start
* @next ThpPairingPreparationsFinished
* @next ThpCodeEntryCommitment
*/
message ThpSelectMethod {
optional ThpPairingMethod selected_pairing_method = 1 [default=NFC];;
} }
/** /**
* Response: Pairing is ready for user input / OOB communication. * Response: Pairing is ready for user input / OOB communication.
* @next ThpCodeEntryCpace * @next ThpCodeEntryCpace
* @next ThpQrCodeTag * @next ThpQrCodeTag
* @next ThpNfcUnidirectionalTag * @next ThpNfcTagHost
*/ */
message ThpPairingPreparationsFinished{ message ThpPairingPreparationsFinished{
} }
/** /**
@ -117,34 +125,30 @@ message ThpCodeEntryCommitment {
/** /**
* Response: Host responds to Trezor's Code Entry commitment with a challenge. * Response: Host responds to Trezor's Code Entry commitment with a challenge.
* @next ThpPairingPreparationsFinished
*/
message ThpCodeEntryChallenge {
optional bytes challenge = 1; // host's random 32-byte challenge
}
/**
* Request: User selected Code Entry option in Host. Host starts CPACE protocol with Trezor.
* @next ThpCodeEntryCpaceTrezor * @next ThpCodeEntryCpaceTrezor
*/ */
message ThpCodeEntryCpaceHost { message ThpCodeEntryChallenge {
optional bytes cpace_host_public_key = 1; // Host's ephemeral CPace public key optional bytes challenge = 1; // Host's random 32-byte challenge
} }
/** /**
* Response: Trezor continues with the CPACE protocol. * Response: Trezor continues with the CPACE protocol.
* @next ThpCodeEntryTag * @next ThpCodeEntryCpaceHostTag
*/ */
message ThpCodeEntryCpaceTrezor { message ThpCodeEntryCpaceTrezor {
optional bytes cpace_trezor_public_key = 1; // Trezor's ephemeral CPace public key optional bytes cpace_trezor_public_key = 1; // Trezor's ephemeral CPace public key
} }
/** /**
* Response: Host continues with the CPACE protocol. * Request: User selected Code Entry option in Host. Host starts CPACE protocol with Trezor.
* @next ThpCodeEntrySecret * @next ThpCodeEntrySecret
*/ */
message ThpCodeEntryTag { message ThpCodeEntryCpaceHostTag {
optional bytes tag = 2; // SHA-256 of shared secret optional bytes cpace_host_public_key = 1; // Host's ephemeral CPace public key
optional bytes tag = 2; // SHA-256 of shared secret
} }
/** /**
@ -175,10 +179,10 @@ message ThpQrCodeSecret {
/** /**
* Request: User selected Unidirectional NFC pairing option. Host sends an Unidirectional NFC Tag. * Request: User selected Unidirectional NFC pairing option. Host sends an Unidirectional NFC Tag.
* @next ThpNfcUnidirectionalSecret * @next ThpNfcTagTrezor
*/ */
message ThpNfcUnidirectionalTag { message ThpNfcTagHost {
optional bytes tag = 1; // SHA-256 of shared secret optional bytes tag = 1; // Host's tag
} }
/** /**
@ -186,8 +190,8 @@ message ThpNfcUnidirectionalTag {
* @next ThpCredentialRequest * @next ThpCredentialRequest
* @next ThpEndRequest * @next ThpEndRequest
*/ */
message ThpNfcUnidirectionalSecret { message ThpNfcTagTrezor {
optional bytes secret = 1; // Trezor's secret optional bytes tag = 1; // Trezor's tag
} }
/** /**
@ -197,6 +201,7 @@ message ThpNfcUnidirectionalSecret {
*/ */
message ThpCredentialRequest { message ThpCredentialRequest {
optional bytes host_static_pubkey = 1; // Host's static public key used in the handshake. optional bytes host_static_pubkey = 1; // Host's static public key used in the handshake.
optional bool autoconnect = 2; // Whether host wants to autoconnect without user confirmation
} }
/** /**
@ -229,6 +234,7 @@ message ThpEndResponse {}
message ThpCredentialMetadata { message ThpCredentialMetadata {
option (internal_only) = true; option (internal_only) = true;
optional string host_name = 1; // Human-readable host name optional string host_name = 1; // Human-readable host name
optional bool autoconnect = 2; // Whether host is allowed to autoconnect without user confirmation
} }
/** /**

View File

@ -252,7 +252,7 @@ if __debug__:
def _state( def _state(
thp_pairing_code_entry_code: int | None = None, thp_pairing_code_entry_code: int | None = None,
thp_pairing_code_qr_code: bytes | None = None, thp_pairing_code_qr_code: bytes | None = None,
thp_pairing_code_nfc_unidirectional: bytes | None = None, thp_pairing_code_nfc: bytes | None = None,
) -> DebugLinkState: ) -> DebugLinkState:
from trezor.messages import DebugLinkState from trezor.messages import DebugLinkState
@ -274,7 +274,7 @@ if __debug__:
tokens=tokens, tokens=tokens,
thp_pairing_code_entry_code=thp_pairing_code_entry_code, thp_pairing_code_entry_code=thp_pairing_code_entry_code,
thp_pairing_code_qr_code=thp_pairing_code_qr_code, thp_pairing_code_qr_code=thp_pairing_code_qr_code,
thp_pairing_code_nfc_unidirectional=thp_pairing_code_nfc_unidirectional, thp_pairing_code_nfc=thp_pairing_code_nfc,
) )
async def dispatch_DebugLinkGetState( async def dispatch_DebugLinkGetState(
@ -283,7 +283,7 @@ if __debug__:
thp_pairing_code_entry_code: int | None = None thp_pairing_code_entry_code: int | None = None
thp_pairing_code_qr_code: bytes | None = None thp_pairing_code_qr_code: bytes | None = None
thp_pairing_code_nfc_unidirectional: bytes | None = None thp_pairing_code_nfc: bytes | None = None
if utils.USE_THP and msg.thp_channel_id is not None: if utils.USE_THP and msg.thp_channel_id is not None:
channel_id = int.from_bytes(msg.thp_channel_id, "big") channel_id = int.from_bytes(msg.thp_channel_id, "big")
@ -301,15 +301,15 @@ if __debug__:
if ctx is not None and isinstance(ctx, PairingContext): if ctx is not None and isinstance(ctx, PairingContext):
thp_pairing_code_entry_code = ctx.display_data.code_code_entry thp_pairing_code_entry_code = ctx.display_data.code_code_entry
thp_pairing_code_qr_code = ctx.display_data.code_qr_code thp_pairing_code_qr_code = ctx.display_data.code_qr_code
thp_pairing_code_nfc_unidirectional = ( thp_pairing_code_nfc = ctx.display_data.code_nfc
ctx.display_data.code_nfc_unidirectional # if msg.host_nfc_secret is not None:
) # ctx.host_nfc_secret = msg.host_nfc_secret
if msg.wait_layout == DebugWaitType.IMMEDIATE: if msg.wait_layout == DebugWaitType.IMMEDIATE:
return _state( return _state(
thp_pairing_code_entry_code, thp_pairing_code_entry_code,
thp_pairing_code_qr_code, thp_pairing_code_qr_code,
thp_pairing_code_nfc_unidirectional, thp_pairing_code_nfc,
) )
assert DEBUG_CONTEXT is not None assert DEBUG_CONTEXT is not None
@ -324,7 +324,7 @@ if __debug__:
return _state( return _state(
thp_pairing_code_entry_code, thp_pairing_code_entry_code,
thp_pairing_code_qr_code, thp_pairing_code_qr_code,
thp_pairing_code_nfc_unidirectional, thp_pairing_code_nfc,
) )
async def dispatch_DebugLinkRecordScreen(msg: DebugLinkRecordScreen) -> Success: async def dispatch_DebugLinkRecordScreen(msg: DebugLinkRecordScreen) -> Success:

View File

@ -2,30 +2,33 @@ from typing import TYPE_CHECKING
from ubinascii import hexlify from ubinascii import hexlify
from trezor import loop, protobuf from trezor import loop, protobuf
from trezor.crypto import random
from trezor.crypto.hashlib import sha256 from trezor.crypto.hashlib import sha256
from trezor.enums import ThpMessageType, ThpPairingMethod from trezor.enums import ThpMessageType, ThpPairingMethod
from trezor.messages import ( from trezor.messages import (
Cancel, Cancel,
ThpCodeEntryChallenge, ThpCodeEntryChallenge,
ThpCodeEntryCommitment, ThpCodeEntryCommitment,
ThpCodeEntryCpaceHost, ThpCodeEntryCpaceHostTag,
ThpCodeEntryCpaceTrezor, ThpCodeEntryCpaceTrezor,
ThpCodeEntrySecret, ThpCodeEntrySecret,
ThpCodeEntryTag,
ThpCredentialMetadata, ThpCredentialMetadata,
ThpCredentialRequest, ThpCredentialRequest,
ThpCredentialResponse, ThpCredentialResponse,
ThpEndRequest, ThpEndRequest,
ThpEndResponse, ThpEndResponse,
ThpNfcUnidirectionalSecret, ThpNfcTagHost,
ThpNfcUnidirectionalTag, ThpNfcTagTrezor,
ThpPairingPreparationsFinished, ThpPairingPreparationsFinished,
ThpPairingRequest,
ThpQrCodeSecret, ThpQrCodeSecret,
ThpQrCodeTag, ThpQrCodeTag,
ThpStartPairingRequest, ThpSelectMethod,
) )
from trezor.wire import message_handler
from trezor.wire.context import UnexpectedMessageException
from trezor.wire.errors import ActionCancelled, SilentError, UnexpectedMessage from trezor.wire.errors import ActionCancelled, SilentError, UnexpectedMessage
from trezor.wire.thp import ChannelState, ThpError, crypto from trezor.wire.thp import ChannelState, ThpError, crypto, get_enabled_pairing_methods
from trezor.wire.thp.pairing_context import PairingContext from trezor.wire.thp.pairing_context import PairingContext
from .credential_manager import issue_credential from .credential_manager import issue_credential
@ -64,12 +67,13 @@ def check_state_and_log(
return decorator return decorator
def check_method_is_allowed( def check_method_is_allowed_and_selected(
pairing_method: ThpPairingMethod, pairing_method: ThpPairingMethod,
) -> Callable[[FuncWithContext], FuncWithContext]: ) -> Callable[[FuncWithContext], FuncWithContext]:
def decorator(f: FuncWithContext) -> FuncWithContext: def decorator(f: FuncWithContext) -> FuncWithContext:
def inner(context: PairingContext, *args: P.args, **kwargs: P.kwargs) -> object: def inner(context: PairingContext, *args: P.args, **kwargs: P.kwargs) -> object:
_check_method_is_allowed(context, pairing_method) _check_method_is_allowed(context, pairing_method)
_check_method_is_selected(context, pairing_method)
return f(context, *args, **kwargs) return f(context, *args, **kwargs)
return inner return inner
@ -81,31 +85,58 @@ def check_method_is_allowed(
# Pairing handlers # Pairing handlers
@check_state_and_log(ChannelState.TP1) @check_state_and_log(ChannelState.TP0)
async def handle_pairing_request( async def handle_pairing_request(
ctx: PairingContext, message: protobuf.MessageType ctx: PairingContext, message: protobuf.MessageType
) -> ThpEndResponse: ) -> ThpEndResponse:
if not ThpStartPairingRequest.is_type_of(message): if not ThpPairingRequest.is_type_of(message):
raise UnexpectedMessage("Unexpected message") raise UnexpectedMessage("Unexpected message")
ctx.host_name = message.host_name or "" ctx.host_name = message.host_name or ""
skip_pairing = _is_method_included(ctx, ThpPairingMethod.NoMethod) await ctx.show_pairing_dialogue()
if skip_pairing: assert ThpSelectMethod.MESSAGE_WIRE_TYPE is not None
return await _end_pairing(ctx) select_method_msg = await ctx.read(
[
await _prepare_pairing(ctx) ThpSelectMethod.MESSAGE_WIRE_TYPE,
await ctx.write(ThpPairingPreparationsFinished()) ]
ctx.channel_ctx.set_channel_state(ChannelState.TP3)
response = await show_display_data(
ctx, _get_possible_pairing_methods_and_cancel(ctx)
) )
if Cancel.is_type_of(response): assert ThpSelectMethod.is_type_of(select_method_msg)
ctx.channel_ctx.clear() assert select_method_msg.selected_pairing_method is not None
raise SilentError("Action was cancelled by the Host")
# TODO disable NFC (if enabled) ctx.set_selected_method(select_method_msg.selected_pairing_method)
if ctx.selected_method == ThpPairingMethod.SkipPairing:
return await _end_pairing(ctx)
while True:
await _prepare_pairing(ctx)
ctx.channel_ctx.set_channel_state(ChannelState.TP3)
try:
response = await ctx.show_pairing_method_screen()
except UnexpectedMessageException as e:
raw_response = e.msg
name = message_handler.get_msg_name(raw_response.type)
if name is None:
req_type = protobuf.type_for_wire(raw_response.type)
else:
req_type = protobuf.type_for_name(name)
response = message_handler.wrap_protobuf_load(raw_response.data, req_type)
if Cancel.is_type_of(response):
ctx.channel_ctx.clear()
raise SilentError("Action was cancelled by the Host")
if ThpSelectMethod.is_type_of(response):
assert response.selected_pairing_method is not None
ctx.set_selected_method(response.selected_pairing_method)
ctx.channel_ctx.set_channel_state(ChannelState.TP1)
else:
break
response = await _handle_different_pairing_methods(ctx, response) response = await _handle_different_pairing_methods(ctx, response)
while ThpCredentialRequest.is_type_of(response): while ThpCredentialRequest.is_type_of(response):
@ -115,15 +146,16 @@ async def handle_pairing_request(
async def _prepare_pairing(ctx: PairingContext) -> None: async def _prepare_pairing(ctx: PairingContext) -> None:
ctx.channel_ctx.set_channel_state(ChannelState.TP1)
if _is_method_included(ctx, ThpPairingMethod.CodeEntry): if ctx.selected_method == ThpPairingMethod.CodeEntry:
await _handle_code_entry_is_included(ctx) await _handle_code_entry_is_selected(ctx)
elif ctx.selected_method == ThpPairingMethod.NFC:
if _is_method_included(ctx, ThpPairingMethod.QrCode): await _handle_nfc_is_selected(ctx)
_handle_qr_code_is_included(ctx) elif ctx.selected_method == ThpPairingMethod.QrCode:
await _handle_qr_code_is_selected(ctx)
if _is_method_included(ctx, ThpPairingMethod.NFC_Unidirectional): else:
_handle_nfc_unidirectional_is_included(ctx) raise Exception() # TODO unknown pairing method
async def show_display_data( async def show_display_data(
@ -143,10 +175,20 @@ async def show_display_data(
@check_state_and_log(ChannelState.TP1) @check_state_and_log(ChannelState.TP1)
async def _handle_code_entry_is_included(ctx: PairingContext) -> None: async def _handle_code_entry_is_selected(ctx: PairingContext) -> None:
commitment = sha256(ctx.secret).digest() if ctx.code_entry_secret is None:
await _handle_code_entry_is_selected_first_time(ctx)
else:
await ctx.write_force(ThpPairingPreparationsFinished())
challenge_message = await ctx.call( # noqa: F841
async def _handle_code_entry_is_selected_first_time(ctx: PairingContext) -> None:
from trezor.wire.thp.cpace import Cpace
ctx.code_entry_secret = random.bytes(16)
commitment = sha256(ctx.code_entry_secret).digest()
challenge_message = await ctx.call(
ThpCodeEntryCommitment(commitment=commitment), ThpCodeEntryChallenge ThpCodeEntryCommitment(commitment=commitment), ThpCodeEntryChallenge
) )
ctx.channel_ctx.set_channel_state(ChannelState.TP2) ctx.channel_ctx.set_channel_state(ChannelState.TP2)
@ -157,84 +199,76 @@ async def _handle_code_entry_is_included(ctx: PairingContext) -> None:
if challenge_message.challenge is None: if challenge_message.challenge is None:
raise Exception("Invalid message") raise Exception("Invalid message")
sha_ctx = sha256(ctx.channel_ctx.get_handshake_hash()) sha_ctx = sha256(ctx.channel_ctx.get_handshake_hash())
sha_ctx.update(ctx.secret) sha_ctx.update(ctx.code_entry_secret)
sha_ctx.update(challenge_message.challenge) sha_ctx.update(challenge_message.challenge)
sha_ctx.update(bytes("PairingMethod_CodeEntry", "utf-8")) sha_ctx.update(bytes("PairingMethod_CodeEntry", "utf-8"))
code_code_entry_hash = sha_ctx.digest() code_code_entry_hash = sha_ctx.digest()
ctx.display_data.code_code_entry = ( ctx.display_data.code_code_entry = (
int.from_bytes(code_code_entry_hash, "big") % 1000000 int.from_bytes(code_code_entry_hash, "big") % 1000000
) )
@check_state_and_log(ChannelState.TP1, ChannelState.TP2)
def _handle_qr_code_is_included(ctx: PairingContext) -> None:
sha_ctx = sha256(ctx.channel_ctx.get_handshake_hash())
sha_ctx.update(ctx.secret)
sha_ctx.update(bytes("PairingMethod_QrCode", "utf-8"))
ctx.display_data.code_qr_code = sha_ctx.digest()[:16]
@check_state_and_log(ChannelState.TP1, ChannelState.TP2)
def _handle_nfc_unidirectional_is_included(ctx: PairingContext) -> None:
sha_ctx = sha256(ctx.channel_ctx.get_handshake_hash())
sha_ctx.update(ctx.secret)
sha_ctx.update(bytes("PairingMethod_NfcUnidirectional", "utf-8"))
ctx.display_data.code_nfc_unidirectional = sha_ctx.digest()[:16]
@check_state_and_log(ChannelState.TP3)
async def _handle_different_pairing_methods(
ctx: PairingContext, response: protobuf.MessageType
) -> protobuf.MessageType:
if ThpCodeEntryCpaceHost.is_type_of(response):
return await _handle_code_entry_cpace(ctx, response)
if ThpQrCodeTag.is_type_of(response):
return await _handle_qr_code_tag(ctx, response)
if ThpNfcUnidirectionalTag.is_type_of(response):
return await _handle_nfc_unidirectional_tag(ctx, response)
raise UnexpectedMessage("Unexpected message")
@check_state_and_log(ChannelState.TP3)
@check_method_is_allowed(ThpPairingMethod.CodeEntry)
async def _handle_code_entry_cpace(
ctx: PairingContext, message: protobuf.MessageType
) -> protobuf.MessageType:
from trezor.wire.thp.cpace import Cpace
# TODO check that ThpCodeEntryCpaceHost message is valid
if TYPE_CHECKING:
assert isinstance(message, ThpCodeEntryCpaceHost)
if message.cpace_host_public_key is None:
raise ThpError("Message ThpCodeEntryCpaceHost has no public key")
ctx.cpace = Cpace( ctx.cpace = Cpace(
message.cpace_host_public_key,
ctx.channel_ctx.get_handshake_hash(), ctx.channel_ctx.get_handshake_hash(),
) )
assert ctx.display_data.code_code_entry is not None assert ctx.display_data.code_code_entry is not None
ctx.cpace.generate_keys_and_secret( ctx.cpace.generate_keys_and_secret(
ctx.display_data.code_code_entry.to_bytes(6, "big") ctx.display_data.code_code_entry.to_bytes(6, "big")
) )
await ctx.write_force(
ctx.channel_ctx.set_channel_state(ChannelState.TP4) ThpCodeEntryCpaceTrezor(cpace_trezor_public_key=ctx.cpace.trezor_public_key)
response = await ctx.call(
ThpCodeEntryCpaceTrezor(cpace_trezor_public_key=ctx.cpace.trezor_public_key),
ThpCodeEntryTag,
) )
return await _handle_code_entry_tag(ctx, response)
@check_state_and_log(ChannelState.TP4) @check_state_and_log(ChannelState.TP1)
@check_method_is_allowed(ThpPairingMethod.CodeEntry) async def _handle_nfc_is_selected(ctx: PairingContext) -> None:
async def _handle_code_entry_tag( ctx.nfc_secret = random.bytes(16)
sha_ctx = sha256(ctx.channel_ctx.get_handshake_hash())
sha_ctx.update(ctx.nfc_secret)
sha_ctx.update(bytes("PairingMethod_NfcUnidirectional", "utf-8"))
ctx.display_data.code_nfc = sha_ctx.digest()[:16]
await ctx.write_force(ThpPairingPreparationsFinished())
@check_state_and_log(ChannelState.TP1)
async def _handle_qr_code_is_selected(ctx: PairingContext) -> None:
ctx.qr_code_secret = random.bytes(16)
sha_ctx = sha256(ThpPairingMethod.QrCode.to_bytes(1, "big"))
sha_ctx.update(ctx.channel_ctx.get_handshake_hash())
sha_ctx.update(ctx.qr_code_secret)
ctx.display_data.code_qr_code = sha_ctx.digest()[:16]
await ctx.write_force(ThpPairingPreparationsFinished())
@check_state_and_log(ChannelState.TP3)
async def _handle_different_pairing_methods(
ctx: PairingContext, response: protobuf.MessageType
) -> protobuf.MessageType:
if ThpCodeEntryCpaceHostTag.is_type_of(response):
return await _handle_code_entry_cpace(ctx, response)
if ThpQrCodeTag.is_type_of(response):
return await _handle_qr_code_tag(ctx, response)
if ThpNfcTagHost.is_type_of(response):
return await _handle_nfc_tag(ctx, response)
raise UnexpectedMessage("Unexpected message" + str(response))
@check_state_and_log(ChannelState.TP3)
@check_method_is_allowed_and_selected(ThpPairingMethod.CodeEntry)
async def _handle_code_entry_cpace(
ctx: PairingContext, message: protobuf.MessageType ctx: PairingContext, message: protobuf.MessageType
) -> protobuf.MessageType: ) -> protobuf.MessageType:
if TYPE_CHECKING: if TYPE_CHECKING:
assert isinstance(message, ThpCodeEntryTag) assert ThpCodeEntryCpaceHostTag.is_type_of(message)
if message.cpace_host_public_key is None:
raise ThpError(
"Message ThpCodeEntryCpaceHostTag is missing cpace_host_public_key"
)
if message.tag is None:
raise ThpError("Message ThpCodeEntryCpaceHostTag is missing tag")
ctx.cpace.compute_shared_secret(message.cpace_host_public_key)
expected_tag = sha256(ctx.cpace.shared_secret).digest() expected_tag = sha256(ctx.cpace.shared_secret).digest()
if expected_tag != message.tag: if expected_tag != message.tag:
print( print(
@ -248,56 +282,69 @@ async def _handle_code_entry_tag(
return await _handle_secret_reveal( return await _handle_secret_reveal(
ctx, ctx,
msg=ThpCodeEntrySecret(secret=ctx.secret), msg=ThpCodeEntrySecret(secret=ctx.code_entry_secret),
) )
@check_state_and_log(ChannelState.TP3) @check_state_and_log(ChannelState.TP3)
@check_method_is_allowed(ThpPairingMethod.QrCode) @check_method_is_allowed_and_selected(ThpPairingMethod.QrCode)
async def _handle_qr_code_tag( async def _handle_qr_code_tag(
ctx: PairingContext, message: protobuf.MessageType ctx: PairingContext, message: protobuf.MessageType
) -> protobuf.MessageType: ) -> protobuf.MessageType:
if TYPE_CHECKING: if TYPE_CHECKING:
assert isinstance(message, ThpQrCodeTag) assert isinstance(message, ThpQrCodeTag)
assert ctx.display_data.code_qr_code is not None assert ctx.display_data.code_qr_code is not None
expected_tag = sha256(ctx.display_data.code_qr_code).digest() sha_ctx = sha256(ctx.channel_ctx.get_handshake_hash())
sha_ctx.update(ctx.display_data.code_qr_code)
expected_tag = sha_ctx.digest()
if expected_tag != message.tag: if expected_tag != message.tag:
print( print(
"expected qr code tag:", hexlify(expected_tag).decode() "expected qr code tag:", hexlify(expected_tag).decode()
) # TODO remove after testing ) # TODO remove after testing
print( print(
"expected code qr code tag:", "expected handshake hash:",
hexlify(ctx.channel_ctx.get_handshake_hash()).decode(),
) # TODO remove after testing
print(
"expected code qr code:",
hexlify(ctx.display_data.code_qr_code).decode(), hexlify(ctx.display_data.code_qr_code).decode(),
) # TODO remove after testing ) # TODO remove after testing
print( print(
"expected secret:", hexlify(ctx.secret).decode() "expected secret:", hexlify(ctx.qr_code_secret).decode()
) # TODO remove after testing ) # TODO remove after testing
raise ThpError("Unexpected QR Code Tag") raise ThpError("Unexpected QR Code Tag")
return await _handle_secret_reveal( return await _handle_secret_reveal(
ctx, ctx,
msg=ThpQrCodeSecret(secret=ctx.secret), msg=ThpQrCodeSecret(secret=ctx.qr_code_secret),
) )
@check_state_and_log(ChannelState.TP3) @check_state_and_log(ChannelState.TP3)
@check_method_is_allowed(ThpPairingMethod.NFC_Unidirectional) @check_method_is_allowed_and_selected(ThpPairingMethod.NFC)
async def _handle_nfc_unidirectional_tag( async def _handle_nfc_tag(
ctx: PairingContext, message: protobuf.MessageType ctx: PairingContext, message: protobuf.MessageType
) -> protobuf.MessageType: ) -> protobuf.MessageType:
if TYPE_CHECKING: if TYPE_CHECKING:
assert isinstance(message, ThpNfcUnidirectionalTag) assert isinstance(message, ThpNfcTagHost)
sha_ctx = sha256(ThpPairingMethod.NFC.to_bytes(1, "big"))
expected_tag = sha256(ctx.display_data.code_nfc_unidirectional).digest() sha_ctx.update(ctx.channel_ctx.get_handshake_hash())
sha_ctx.update(ctx.nfc_secret)
expected_tag = sha_ctx.digest()
if expected_tag != message.tag: if expected_tag != message.tag:
print( print(
"expected nfc tag:", hexlify(expected_tag).decode() "expected nfc tag:", hexlify(expected_tag).decode()
) # TODO remove after testing ) # TODO remove after testing
raise ThpError("Unexpected NFC Unidirectional Tag") raise ThpError("Unexpected NFC Unidirectional Tag")
sha_ctx = sha256(ThpPairingMethod.NFC.to_bytes(1, "big"))
sha_ctx.update(ctx.channel_ctx.get_handshake_hash())
# TODO add Host's secret from NFC message transferred over NFC
# sha_ctx.update(host's secret)
trezor_tag = sha_ctx.digest()
return await _handle_secret_reveal( return await _handle_secret_reveal(
ctx, ctx,
msg=ThpNfcUnidirectionalSecret(secret=ctx.secret), msg=ThpNfcTagTrezor(tag=trezor_tag),
) )
@ -318,7 +365,6 @@ async def _handle_secret_reveal(
async def _handle_credential_request( async def _handle_credential_request(
ctx: PairingContext, message: protobuf.MessageType ctx: PairingContext, message: protobuf.MessageType
) -> protobuf.MessageType: ) -> protobuf.MessageType:
ctx.secret
if not ThpCredentialRequest.is_type_of(message): if not ThpCredentialRequest.is_type_of(message):
raise UnexpectedMessage("Unexpected message") raise UnexpectedMessage("Unexpected message")
@ -362,42 +408,43 @@ def _check_state(ctx: PairingContext, *allowed_states: ChannelState) -> None:
def _check_method_is_allowed(ctx: PairingContext, method: ThpPairingMethod) -> None: def _check_method_is_allowed(ctx: PairingContext, method: ThpPairingMethod) -> None:
if not _is_method_included(ctx, method): if method not in get_enabled_pairing_methods(ctx.iface):
raise ThpError("Unexpected pairing method") raise ThpError("Unexpected pairing method")
def _is_method_included(ctx: PairingContext, method: ThpPairingMethod) -> bool: def _check_method_is_selected(ctx: PairingContext, method: ThpPairingMethod) -> None:
return method in ctx.channel_ctx.selected_pairing_methods if method is not ctx.selected_method:
raise ThpError("Not selected pairing method")
# #
# Helpers - getters # Helpers - getters
def _get_possible_pairing_methods_and_cancel(ctx: PairingContext) -> Tuple[int, ...]: def _get_accepted_messages(ctx: PairingContext) -> Tuple[int, ...]:
r = _get_possible_pairing_methods(ctx) r = _get_possible_pairing_methods(ctx)
mtype = Cancel.MESSAGE_WIRE_TYPE mtype = Cancel.MESSAGE_WIRE_TYPE
return r + ((mtype,) if mtype is not None else ()) r += (mtype,) if mtype is not None else ()
mtype = ThpSelectMethod.MESSAGE_WIRE_TYPE
r += (mtype,) if mtype is not None else ()
return r
def _get_possible_pairing_methods(ctx: PairingContext) -> Tuple[int, ...]: def _get_possible_pairing_methods(ctx: PairingContext) -> Tuple[int, ...]:
r = tuple( r = tuple(
_get_message_type_for_method(method) [
for method in ctx.channel_ctx.selected_pairing_methods _get_message_type_for_method(ctx.selected_method),
]
) )
if __debug__:
from trezor.messages import DebugLinkGetState
mtype = DebugLinkGetState.MESSAGE_WIRE_TYPE
return r + ((mtype,) if mtype is not None else ())
return r return r
def _get_message_type_for_method(method: int) -> int: def _get_message_type_for_method(method: int) -> int:
if method is ThpPairingMethod.CodeEntry: if method is ThpPairingMethod.CodeEntry:
return ThpMessageType.ThpCodeEntryCpaceHost return ThpMessageType.ThpCodeEntryCpaceHostTag
if method is ThpPairingMethod.NFC_Unidirectional: if method is ThpPairingMethod.NFC:
return ThpMessageType.ThpNfcUnidirectionalTag return ThpMessageType.ThpNfcTagHost
if method is ThpPairingMethod.QrCode: if method is ThpPairingMethod.QrCode:
return ThpMessageType.ThpQrCodeTag return ThpMessageType.ThpQrCodeTag
raise ValueError("Unexpected pairing method - no message type available") raise ValueError("Unexpected pairing method - no message type available")

View File

@ -3,8 +3,9 @@
# isort:skip_file # isort:skip_file
ThpCreateNewSession = 1000 ThpCreateNewSession = 1000
ThpNewSession = 1001 ThpPairingRequest = 1006
ThpStartPairingRequest = 1008 ThpPairingRequestApproved = 1007
ThpSelectMethod = 1008
ThpPairingPreparationsFinished = 1009 ThpPairingPreparationsFinished = 1009
ThpCredentialRequest = 1010 ThpCredentialRequest = 1010
ThpCredentialResponse = 1011 ThpCredentialResponse = 1011
@ -12,11 +13,10 @@ ThpEndRequest = 1012
ThpEndResponse = 1013 ThpEndResponse = 1013
ThpCodeEntryCommitment = 1016 ThpCodeEntryCommitment = 1016
ThpCodeEntryChallenge = 1017 ThpCodeEntryChallenge = 1017
ThpCodeEntryCpaceHost = 1018 ThpCodeEntryCpaceTrezor = 1018
ThpCodeEntryCpaceTrezor = 1019 ThpCodeEntryCpaceHostTag = 1019
ThpCodeEntryTag = 1020 ThpCodeEntrySecret = 1020
ThpCodeEntrySecret = 1021
ThpQrCodeTag = 1024 ThpQrCodeTag = 1024
ThpQrCodeSecret = 1025 ThpQrCodeSecret = 1025
ThpNfcUnidirectionalTag = 1032 ThpNfcTagHost = 1032
ThpNfcUnidirectionalSecret = 1033 ThpNfcTagTrezor = 1033

View File

@ -2,7 +2,7 @@
# fmt: off # fmt: off
# isort:skip_file # isort:skip_file
NoMethod = 1 SkipPairing = 1
CodeEntry = 2 CodeEntry = 2
QrCode = 3 QrCode = 3
NFC_Unidirectional = 4 NFC = 4

View File

@ -353,8 +353,9 @@ if TYPE_CHECKING:
class ThpMessageType(IntEnum): class ThpMessageType(IntEnum):
ThpCreateNewSession = 1000 ThpCreateNewSession = 1000
ThpNewSession = 1001 ThpPairingRequest = 1006
ThpStartPairingRequest = 1008 ThpPairingRequestApproved = 1007
ThpSelectMethod = 1008
ThpPairingPreparationsFinished = 1009 ThpPairingPreparationsFinished = 1009
ThpCredentialRequest = 1010 ThpCredentialRequest = 1010
ThpCredentialResponse = 1011 ThpCredentialResponse = 1011
@ -362,20 +363,19 @@ if TYPE_CHECKING:
ThpEndResponse = 1013 ThpEndResponse = 1013
ThpCodeEntryCommitment = 1016 ThpCodeEntryCommitment = 1016
ThpCodeEntryChallenge = 1017 ThpCodeEntryChallenge = 1017
ThpCodeEntryCpaceHost = 1018 ThpCodeEntryCpaceTrezor = 1018
ThpCodeEntryCpaceTrezor = 1019 ThpCodeEntryCpaceHostTag = 1019
ThpCodeEntryTag = 1020 ThpCodeEntrySecret = 1020
ThpCodeEntrySecret = 1021
ThpQrCodeTag = 1024 ThpQrCodeTag = 1024
ThpQrCodeSecret = 1025 ThpQrCodeSecret = 1025
ThpNfcUnidirectionalTag = 1032 ThpNfcTagHost = 1032
ThpNfcUnidirectionalSecret = 1033 ThpNfcTagTrezor = 1033
class ThpPairingMethod(IntEnum): class ThpPairingMethod(IntEnum):
NoMethod = 1 SkipPairing = 1
CodeEntry = 2 CodeEntry = 2
QrCode = 3 QrCode = 3
NFC_Unidirectional = 4 NFC = 4
class MessageType(IntEnum): class MessageType(IntEnum):
Initialize = 0 Initialize = 0

View File

@ -2930,7 +2930,7 @@ if TYPE_CHECKING:
tokens: "list[str]" tokens: "list[str]"
thp_pairing_code_entry_code: "int | None" thp_pairing_code_entry_code: "int | None"
thp_pairing_code_qr_code: "bytes | None" thp_pairing_code_qr_code: "bytes | None"
thp_pairing_code_nfc_unidirectional: "bytes | None" thp_pairing_code_nfc: "bytes | None"
def __init__( def __init__(
self, self,
@ -2950,7 +2950,7 @@ if TYPE_CHECKING:
mnemonic_type: "BackupType | None" = None, mnemonic_type: "BackupType | None" = None,
thp_pairing_code_entry_code: "int | None" = None, thp_pairing_code_entry_code: "int | None" = None,
thp_pairing_code_qr_code: "bytes | None" = None, thp_pairing_code_qr_code: "bytes | None" = None,
thp_pairing_code_nfc_unidirectional: "bytes | None" = None, thp_pairing_code_nfc: "bytes | None" = None,
) -> None: ) -> None:
pass pass
@ -6175,8 +6175,8 @@ if TYPE_CHECKING:
class ThpDeviceProperties(protobuf.MessageType): class ThpDeviceProperties(protobuf.MessageType):
internal_model: "str | None" internal_model: "str | None"
model_variant: "int | None" model_variant: "int | None"
bootloader_mode: "bool | None" protocol_version_major: "int | None"
protocol_version: "int | None" protocol_version_minor: "int | None"
pairing_methods: "list[ThpPairingMethod]" pairing_methods: "list[ThpPairingMethod]"
def __init__( def __init__(
@ -6185,8 +6185,8 @@ if TYPE_CHECKING:
pairing_methods: "list[ThpPairingMethod] | None" = None, pairing_methods: "list[ThpPairingMethod] | None" = None,
internal_model: "str | None" = None, internal_model: "str | None" = None,
model_variant: "int | None" = None, model_variant: "int | None" = None,
bootloader_mode: "bool | None" = None, protocol_version_major: "int | None" = None,
protocol_version: "int | None" = None, protocol_version_minor: "int | None" = None,
) -> None: ) -> None:
pass pass
@ -6196,12 +6196,10 @@ if TYPE_CHECKING:
class ThpHandshakeCompletionReqNoisePayload(protobuf.MessageType): class ThpHandshakeCompletionReqNoisePayload(protobuf.MessageType):
host_pairing_credential: "bytes | None" host_pairing_credential: "bytes | None"
pairing_methods: "list[ThpPairingMethod]"
def __init__( def __init__(
self, self,
*, *,
pairing_methods: "list[ThpPairingMethod] | None" = None,
host_pairing_credential: "bytes | None" = None, host_pairing_credential: "bytes | None" = None,
) -> None: ) -> None:
pass pass
@ -6228,21 +6226,7 @@ if TYPE_CHECKING:
def is_type_of(cls, msg: Any) -> TypeGuard["ThpCreateNewSession"]: def is_type_of(cls, msg: Any) -> TypeGuard["ThpCreateNewSession"]:
return isinstance(msg, cls) return isinstance(msg, cls)
class ThpNewSession(protobuf.MessageType): class ThpPairingRequest(protobuf.MessageType):
new_session_id: "int | None"
def __init__(
self,
*,
new_session_id: "int | None" = None,
) -> None:
pass
@classmethod
def is_type_of(cls, msg: Any) -> TypeGuard["ThpNewSession"]:
return isinstance(msg, cls)
class ThpStartPairingRequest(protobuf.MessageType):
host_name: "str | None" host_name: "str | None"
def __init__( def __init__(
@ -6253,7 +6237,27 @@ if TYPE_CHECKING:
pass pass
@classmethod @classmethod
def is_type_of(cls, msg: Any) -> TypeGuard["ThpStartPairingRequest"]: def is_type_of(cls, msg: Any) -> TypeGuard["ThpPairingRequest"]:
return isinstance(msg, cls)
class ThpPairingRequestApproved(protobuf.MessageType):
@classmethod
def is_type_of(cls, msg: Any) -> TypeGuard["ThpPairingRequestApproved"]:
return isinstance(msg, cls)
class ThpSelectMethod(protobuf.MessageType):
selected_pairing_method: "ThpPairingMethod"
def __init__(
self,
*,
selected_pairing_method: "ThpPairingMethod | None" = None,
) -> None:
pass
@classmethod
def is_type_of(cls, msg: Any) -> TypeGuard["ThpSelectMethod"]:
return isinstance(msg, cls) return isinstance(msg, cls)
class ThpPairingPreparationsFinished(protobuf.MessageType): class ThpPairingPreparationsFinished(protobuf.MessageType):
@ -6290,20 +6294,6 @@ if TYPE_CHECKING:
def is_type_of(cls, msg: Any) -> TypeGuard["ThpCodeEntryChallenge"]: def is_type_of(cls, msg: Any) -> TypeGuard["ThpCodeEntryChallenge"]:
return isinstance(msg, cls) return isinstance(msg, cls)
class ThpCodeEntryCpaceHost(protobuf.MessageType):
cpace_host_public_key: "bytes | None"
def __init__(
self,
*,
cpace_host_public_key: "bytes | None" = None,
) -> None:
pass
@classmethod
def is_type_of(cls, msg: Any) -> TypeGuard["ThpCodeEntryCpaceHost"]:
return isinstance(msg, cls)
class ThpCodeEntryCpaceTrezor(protobuf.MessageType): class ThpCodeEntryCpaceTrezor(protobuf.MessageType):
cpace_trezor_public_key: "bytes | None" cpace_trezor_public_key: "bytes | None"
@ -6318,18 +6308,20 @@ if TYPE_CHECKING:
def is_type_of(cls, msg: Any) -> TypeGuard["ThpCodeEntryCpaceTrezor"]: def is_type_of(cls, msg: Any) -> TypeGuard["ThpCodeEntryCpaceTrezor"]:
return isinstance(msg, cls) return isinstance(msg, cls)
class ThpCodeEntryTag(protobuf.MessageType): class ThpCodeEntryCpaceHostTag(protobuf.MessageType):
cpace_host_public_key: "bytes | None"
tag: "bytes | None" tag: "bytes | None"
def __init__( def __init__(
self, self,
*, *,
cpace_host_public_key: "bytes | None" = None,
tag: "bytes | None" = None, tag: "bytes | None" = None,
) -> None: ) -> None:
pass pass
@classmethod @classmethod
def is_type_of(cls, msg: Any) -> TypeGuard["ThpCodeEntryTag"]: def is_type_of(cls, msg: Any) -> TypeGuard["ThpCodeEntryCpaceHostTag"]:
return isinstance(msg, cls) return isinstance(msg, cls)
class ThpCodeEntrySecret(protobuf.MessageType): class ThpCodeEntrySecret(protobuf.MessageType):
@ -6374,7 +6366,7 @@ if TYPE_CHECKING:
def is_type_of(cls, msg: Any) -> TypeGuard["ThpQrCodeSecret"]: def is_type_of(cls, msg: Any) -> TypeGuard["ThpQrCodeSecret"]:
return isinstance(msg, cls) return isinstance(msg, cls)
class ThpNfcUnidirectionalTag(protobuf.MessageType): class ThpNfcTagHost(protobuf.MessageType):
tag: "bytes | None" tag: "bytes | None"
def __init__( def __init__(
@ -6385,30 +6377,32 @@ if TYPE_CHECKING:
pass pass
@classmethod @classmethod
def is_type_of(cls, msg: Any) -> TypeGuard["ThpNfcUnidirectionalTag"]: def is_type_of(cls, msg: Any) -> TypeGuard["ThpNfcTagHost"]:
return isinstance(msg, cls) return isinstance(msg, cls)
class ThpNfcUnidirectionalSecret(protobuf.MessageType): class ThpNfcTagTrezor(protobuf.MessageType):
secret: "bytes | None" tag: "bytes | None"
def __init__( def __init__(
self, self,
*, *,
secret: "bytes | None" = None, tag: "bytes | None" = None,
) -> None: ) -> None:
pass pass
@classmethod @classmethod
def is_type_of(cls, msg: Any) -> TypeGuard["ThpNfcUnidirectionalSecret"]: def is_type_of(cls, msg: Any) -> TypeGuard["ThpNfcTagTrezor"]:
return isinstance(msg, cls) return isinstance(msg, cls)
class ThpCredentialRequest(protobuf.MessageType): class ThpCredentialRequest(protobuf.MessageType):
host_static_pubkey: "bytes | None" host_static_pubkey: "bytes | None"
autoconnect: "bool | None"
def __init__( def __init__(
self, self,
*, *,
host_static_pubkey: "bytes | None" = None, host_static_pubkey: "bytes | None" = None,
autoconnect: "bool | None" = None,
) -> None: ) -> None:
pass pass
@ -6446,11 +6440,13 @@ if TYPE_CHECKING:
class ThpCredentialMetadata(protobuf.MessageType): class ThpCredentialMetadata(protobuf.MessageType):
host_name: "str | None" host_name: "str | None"
autoconnect: "bool | None"
def __init__( def __init__(
self, self,
*, *,
host_name: "str | None" = None, host_name: "str | None" = None,
autoconnect: "bool | None" = None,
) -> None: ) -> None:
pass pass

View File

@ -66,13 +66,14 @@ class ChannelState(IntEnum):
UNALLOCATED = 0 UNALLOCATED = 0
TH1 = 1 TH1 = 1
TH2 = 2 TH2 = 2
TP1 = 3 TP0 = 3
TP2 = 4 TP1 = 4
TP3 = 5 TP2 = 5
TP4 = 6 TP3 = 6
TC1 = 7 TP4 = 7
ENCRYPTED_TRANSPORT = 8 TC1 = 8
INVALIDATED = 9 ENCRYPTED_TRANSPORT = 9
INVALIDATED = 10
class SessionState(IntEnum): class SessionState(IntEnum):
@ -134,7 +135,7 @@ class PacketHeader:
_DEFAULT_ENABLED_PAIRING_METHODS = [ _DEFAULT_ENABLED_PAIRING_METHODS = [
ThpPairingMethod.CodeEntry, ThpPairingMethod.CodeEntry,
ThpPairingMethod.QrCode, ThpPairingMethod.QrCode,
ThpPairingMethod.NFC_Unidirectional, ThpPairingMethod.NFC,
] ]
@ -149,7 +150,7 @@ def get_enabled_pairing_methods(
methods = _DEFAULT_ENABLED_PAIRING_METHODS.copy() methods = _DEFAULT_ENABLED_PAIRING_METHODS.copy()
if iface is not None and iface is usb.iface_wire: if iface is not None and iface is usb.iface_wire:
methods.append(ThpPairingMethod.NoMethod) methods.append(ThpPairingMethod.SkipPairing)
return methods return methods
@ -159,8 +160,8 @@ def _get_device_properties(iface: WireInterface) -> ThpDeviceProperties:
pairing_methods=get_enabled_pairing_methods(iface), pairing_methods=get_enabled_pairing_methods(iface),
internal_model=utils.INTERNAL_MODEL, internal_model=utils.INTERNAL_MODEL,
model_variant=0, model_variant=0,
bootloader_mode=False, protocol_version_major=2,
protocol_version=2, protocol_version_minor=0,
) )

View File

@ -69,7 +69,6 @@ class Channel:
self.bytes_read: int = 0 self.bytes_read: int = 0
self.expected_payload_length: int = 0 self.expected_payload_length: int = 0
self.is_cont_packet_expected: bool = False self.is_cont_packet_expected: bool = False
self.selected_pairing_methods = []
self.sessions: dict[int, GenericSessionContext] = {} self.sessions: dict[int, GenericSessionContext] = {}
# Objects for writing a message to a wire # Objects for writing a message to a wire

View File

@ -11,9 +11,8 @@ class Cpace:
CPace, a balanced composable PAKE: https://datatracker.ietf.org/doc/draft-irtf-cfrg-cpace/ CPace, a balanced composable PAKE: https://datatracker.ietf.org/doc/draft-irtf-cfrg-cpace/
""" """
def __init__(self, cpace_host_public_key: bytes, handshake_hash: bytes) -> None: def __init__(self, handshake_hash: bytes) -> None:
self.handshake_hash: bytes = handshake_hash self.handshake_hash: bytes = handshake_hash
self.host_public_key: bytes = cpace_host_public_key
self.shared_secret: bytes self.shared_secret: bytes
self.trezor_private_key: bytes self.trezor_private_key: bytes
self.trezor_public_key: bytes self.trezor_public_key: bytes
@ -31,6 +30,8 @@ class Cpace:
generator = elligator2.map_to_curve25519(pregenerator) generator = elligator2.map_to_curve25519(pregenerator)
self.trezor_private_key = random.bytes(32) self.trezor_private_key = random.bytes(32)
self.trezor_public_key = curve25519.multiply(self.trezor_private_key, generator) self.trezor_public_key = curve25519.multiply(self.trezor_private_key, generator)
def compute_shared_secret(self, host_public_key: bytes) -> None:
self.shared_secret = curve25519.multiply( self.shared_secret = curve25519.multiply(
self.trezor_private_key, self.host_public_key self.trezor_private_key, host_public_key
) )

View File

@ -3,16 +3,19 @@ from ubinascii import hexlify
import trezorui_api import trezorui_api
from trezor import loop, protobuf, workflow from trezor import loop, protobuf, workflow
from trezor.crypto import random from trezor.enums import ButtonRequestType
from trezor.wire import context, message_handler, protocol_common from trezor.wire import context, message_handler, protocol_common
from trezor.wire.context import UnexpectedMessageException from trezor.wire.context import UnexpectedMessageException
from trezor.wire.errors import ActionCancelled, SilentError from trezor.wire.errors import ActionCancelled, SilentError
from trezor.wire.protocol_common import Context, Message from trezor.wire.protocol_common import Context, Message
from trezor.wire.thp import get_enabled_pairing_methods
if TYPE_CHECKING: if TYPE_CHECKING:
from typing import Container from typing import Awaitable, Container
from trezor import ui from trezor import ui
from trezor.enums import ThpPairingMethod
from trezorui_api import UiResult
from .channel import Channel from .channel import Channel
from .cpace import Cpace from .cpace import Cpace
@ -28,7 +31,7 @@ class PairingDisplayData:
def __init__(self) -> None: def __init__(self) -> None:
self.code_code_entry: int | None = None self.code_code_entry: int | None = None
self.code_qr_code: bytes | None = None self.code_qr_code: bytes | None = None
self.code_nfc_unidirectional: bytes | None = None self.code_nfc: bytes | None = None
def get_display_layout(self) -> ui.Layout: def get_display_layout(self) -> ui.Layout:
from trezor import ui from trezor import ui
@ -37,9 +40,9 @@ class PairingDisplayData:
qr_str = "" qr_str = ""
code_str = "" code_str = ""
if self.code_qr_code is not None: if self.code_qr_code is not None:
qr_str = self._get_code_qr_code_str() qr_str = self.get_code_qr_code_str()
if self.code_code_entry is not None: if self.code_code_entry is not None:
code_str = self._get_code_code_entry_str() code_str = self.get_code_code_entry_str()
return ui.Layout( return ui.Layout(
trezorui_api.show_address_details( # noqa trezorui_api.show_address_details( # noqa
@ -53,7 +56,7 @@ class PairingDisplayData:
) )
) )
def _get_code_code_entry_str(self) -> str: def get_code_code_entry_str(self) -> str:
if self.code_code_entry is not None: if self.code_code_entry is not None:
code_str = f"{self.code_code_entry:06}" code_str = f"{self.code_code_entry:06}"
if __debug__: if __debug__:
@ -62,7 +65,7 @@ class PairingDisplayData:
return code_str[:3] + " " + code_str[3:] return code_str[:3] + " " + code_str[3:]
raise Exception("Code entry string is not available") raise Exception("Code entry string is not available")
def _get_code_qr_code_str(self) -> str: def get_code_qr_code_str(self) -> str:
if self.code_qr_code is not None: if self.code_qr_code is not None:
code_str = (hexlify(self.code_qr_code)).decode("utf-8") code_str = (hexlify(self.code_qr_code)).decode("utf-8")
if __debug__: if __debug__:
@ -77,20 +80,16 @@ class PairingContext(Context):
super().__init__(channel_ctx.iface, channel_ctx.channel_id) super().__init__(channel_ctx.iface, channel_ctx.channel_id)
self.channel_ctx: Channel = channel_ctx self.channel_ctx: Channel = channel_ctx
self.incoming_message = loop.mailbox() self.incoming_message = loop.mailbox()
self.secret: bytes = random.bytes(16) self.nfc_secret: bytes
self.qr_code_secret: bytes
self.code_entry_secret: bytes | None = None
self.selected_method: ThpPairingMethod
self.display_data: PairingDisplayData = PairingDisplayData() self.display_data: PairingDisplayData = PairingDisplayData()
self.cpace: Cpace self.cpace: Cpace
self.host_name: str self.host_name: str
async def handle(self, is_debug_session: bool = False) -> None: async def handle(self) -> None:
# if __debug__:
# log.debug(__name__, "handle - start")
# if is_debug_session:
# import apps.debug
# apps.debug.DEBUG_CONTEXT = self
next_message: Message | None = None next_message: Message | None = None
while True: while True:
@ -168,12 +167,17 @@ class PairingContext(Context):
async def write(self, msg: protobuf.MessageType) -> None: async def write(self, msg: protobuf.MessageType) -> None:
return await self.channel_ctx.write(msg) return await self.channel_ctx.write(msg)
def write_force(self, msg: protobuf.MessageType) -> Awaitable[None]:
return self.channel_ctx.write(msg, force=True)
async def call( async def call(
self, msg: protobuf.MessageType, expected_type: type[protobuf.MessageType] self, msg: protobuf.MessageType, expected_type: type[protobuf.MessageType]
) -> protobuf.MessageType: ) -> protobuf.MessageType:
expected_wire_type = message_handler.get_msg_type(expected_type.MESSAGE_NAME) expected_wire_type = expected_type.MESSAGE_WIRE_TYPE
if expected_wire_type is None: if expected_wire_type is None:
expected_wire_type = expected_type.MESSAGE_WIRE_TYPE expected_wire_type = message_handler.get_msg_type(
expected_type.MESSAGE_NAME
)
assert expected_wire_type is not None assert expected_wire_type is not None
@ -189,6 +193,71 @@ class PairingContext(Context):
del msg del msg
return await self.read(expected_types) return await self.read(expected_types)
def set_selected_method(self, selected_method: ThpPairingMethod) -> None:
if selected_method not in get_enabled_pairing_methods(self.iface):
raise Exception("Not allowed to set this method")
self.selected_method = selected_method
async def show_pairing_dialogue(self) -> None:
from trezor.messages import ThpPairingRequestApproved
from trezor.ui.layouts.common import interact
result = await interact(
trezorui_api.confirm_action(
title="Pairing dialogue",
action="Do you want to start pairing?",
description="Choose wisely!",
),
br_name="pairing_request",
br_code=ButtonRequestType.Other,
)
if result == trezorui_api.CONFIRMED:
await self.write(ThpPairingRequestApproved())
async def show_pairing_method_screen(
self, selected_method: ThpPairingMethod | None = None
) -> UiResult:
from trezor.enums import ThpPairingMethod
from trezor.ui.layouts.common import interact
if selected_method is None:
selected_method = self.selected_method
if selected_method is ThpPairingMethod.CodeEntry:
result = await interact(
trezorui_api.show_simple(
title="Copy the following",
text=self.display_data.get_code_code_entry_str(),
),
br_name="pairing_code_entry",
br_code=ButtonRequestType.Other,
)
elif selected_method is ThpPairingMethod.QrCode:
result = await interact(
trezorui_api.show_address_details( # noqa
qr_title="Scan QR code to pair",
address=self.display_data.get_code_qr_code_str(),
case_sensitive=True,
details_title="",
account="",
path="",
xpubs=[],
),
br_name="pairing_qr_code",
br_code=ButtonRequestType.Other,
)
elif selected_method is ThpPairingMethod.NFC:
result = await interact(
trezorui_api.show_simple(
title="NFC Pairing",
text="Move your device close to Trezor",
),
br_name="pairing_nfc",
br_code=ButtonRequestType.Other,
)
else:
raise Exception("Unknown pairing method")
return result
async def handle_pairing_request_message( async def handle_pairing_request_message(
pairing_ctx: PairingContext, pairing_ctx: PairingContext,

View File

@ -38,13 +38,7 @@ from . import (
ThpUnallocatedSessionError, ThpUnallocatedSessionError,
) )
from . import alternating_bit_protocol as ABP from . import alternating_bit_protocol as ABP
from . import ( from . import checksum, control_byte, get_encoded_device_properties, session_manager
checksum,
control_byte,
get_enabled_pairing_methods,
get_encoded_device_properties,
session_manager,
)
from .checksum import CHECKSUM_LENGTH from .checksum import CHECKSUM_LENGTH
from .crypto import PUBKEY_LENGTH, Handshake from .crypto import PUBKEY_LENGTH, Handshake
from .session_context import SeedlessSessionContext from .session_context import SeedlessSessionContext
@ -315,12 +309,7 @@ async def _handle_state_TH2(ctx: Channel, message_length: int, ctrl_byte: int) -
) )
if TYPE_CHECKING: if TYPE_CHECKING:
assert ThpHandshakeCompletionReqNoisePayload.is_type_of(noise_payload) assert ThpHandshakeCompletionReqNoisePayload.is_type_of(noise_payload)
enabled_methods = get_enabled_pairing_methods(ctx.iface)
for method in noise_payload.pairing_methods:
if method not in enabled_methods:
raise ThpInvalidDataError()
if method not in ctx.selected_pairing_methods:
ctx.selected_pairing_methods.append(method)
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug( log.debug(
__name__, __name__,
@ -357,9 +346,9 @@ async def _handle_state_TH2(ctx: Channel, message_length: int, ctrl_byte: int) -
ctx.handshake = None ctx.handshake = None
if paired: if paired:
ctx.set_channel_state(ChannelState.ENCRYPTED_TRANSPORT) ctx.set_channel_state(ChannelState.TC1)
else: else:
ctx.set_channel_state(ChannelState.TP1) ctx.set_channel_state(ChannelState.TP0)
async def _handle_state_ENCRYPTED_TRANSPORT(ctx: Channel, message_length: int) -> None: async def _handle_state_ENCRYPTED_TRANSPORT(ctx: Channel, message_length: int) -> None:
@ -457,6 +446,7 @@ def _decode_message(
def _is_channel_state_pairing(state: int) -> bool: def _is_channel_state_pairing(state: int) -> bool:
if state in ( if state in (
ChannelState.TP0,
ChannelState.TP1, ChannelState.TP1,
ChannelState.TP2, ChannelState.TP2,
ChannelState.TP3, ChannelState.TP3,

View File

@ -27,7 +27,7 @@ if utils.USE_THP:
ThpCodeEntryTag, ThpCodeEntryTag,
ThpCredentialRequest, ThpCredentialRequest,
ThpEndRequest, ThpEndRequest,
ThpStartPairingRequest, ThpPairingRequest,
) )
from trezor.wire.thp import ( from trezor.wire.thp import (
ChannelState, ChannelState,
@ -280,13 +280,13 @@ class TestTrezorHostProtocol(unittest.TestCase):
config.wipe() config.wipe()
channel = next(iter(thp_main._CHANNELS.values())) channel = next(iter(thp_main._CHANNELS.values()))
channel.selected_pairing_methods = [ channel.selected_pairing_methods = [
ThpPairingMethod.NoMethod, ThpPairingMethod.SkipPairing,
ThpPairingMethod.CodeEntry, ThpPairingMethod.CodeEntry,
ThpPairingMethod.NFC_Unidirectional, ThpPairingMethod.NFC_Unidirectional,
ThpPairingMethod.QrCode, ThpPairingMethod.QrCode,
] ]
pairing_ctx = PairingContext(channel) pairing_ctx = PairingContext(channel)
request_message = ThpStartPairingRequest() request_message = ThpPairingRequest()
channel.set_channel_state(ChannelState.TP1) channel.set_channel_state(ChannelState.TP1)
gen = pairing.handle_pairing_request(pairing_ctx, request_message) gen = pairing.handle_pairing_request(pairing_ctx, request_message)
@ -310,7 +310,7 @@ class TestTrezorHostProtocol(unittest.TestCase):
ThpPairingMethod.QrCode, ThpPairingMethod.QrCode,
] ]
pairing_ctx = PairingContext(channel) pairing_ctx = PairingContext(channel)
request_message = ThpStartPairingRequest() request_message = ThpPairingRequest()
with self.assertRaises(UnexpectedMessage) as e: with self.assertRaises(UnexpectedMessage) as e:
pairing.handle_pairing_request(pairing_ctx, request_message) pairing.handle_pairing_request(pairing_ctx, request_message)
print(e.value.message) print(e.value.message)
@ -346,7 +346,7 @@ class TestTrezorHostProtocol(unittest.TestCase):
msg = ThpCodeEntryCpaceHost(cpace_host_public_key=cpace_host_public_key) msg = ThpCodeEntryCpaceHost(cpace_host_public_key=cpace_host_public_key)
# msg = ThpQrCodeTag(tag=tag_qrc) # msg = ThpQrCodeTag(tag=tag_qrc)
# msg = ThpNfcUnidirectionalTag(tag=tag_nfc) # msg = ThpNfcTagHost(tag=tag_nfc)
buffer: bytearray = bytearray(protobuf.encoded_length(msg)) buffer: bytearray = bytearray(protobuf.encoded_length(msg))
protobuf.encode(buffer, msg) protobuf.encode(buffer, msg)

View File

@ -102,7 +102,7 @@ class TrezorClient:
if protobuf_mapping is None: if protobuf_mapping is None:
protobuf_mapping = mapping.DEFAULT_MAPPING protobuf_mapping = mapping.DEFAULT_MAPPING
protocol_v1 = ProtocolV1(transport, protobuf_mapping) protocol_v1 = ProtocolV1(transport, protobuf_mapping)
if channel_data.protocol_version == 2: if channel_data.protocol_version_major == 2:
try: try:
protocol_v1.write(messages.Ping(message="Sanity check - to resume")) protocol_v1.write(messages.Ping(message="Sanity check - to resume"))
except Exception as e: except Exception as e:

View File

@ -406,8 +406,9 @@ class TezosBallotType(IntEnum):
class ThpMessageType(IntEnum): class ThpMessageType(IntEnum):
ThpCreateNewSession = 1000 ThpCreateNewSession = 1000
ThpNewSession = 1001 ThpPairingRequest = 1006
ThpStartPairingRequest = 1008 ThpPairingRequestApproved = 1007
ThpSelectMethod = 1008
ThpPairingPreparationsFinished = 1009 ThpPairingPreparationsFinished = 1009
ThpCredentialRequest = 1010 ThpCredentialRequest = 1010
ThpCredentialResponse = 1011 ThpCredentialResponse = 1011
@ -415,21 +416,20 @@ class ThpMessageType(IntEnum):
ThpEndResponse = 1013 ThpEndResponse = 1013
ThpCodeEntryCommitment = 1016 ThpCodeEntryCommitment = 1016
ThpCodeEntryChallenge = 1017 ThpCodeEntryChallenge = 1017
ThpCodeEntryCpaceHost = 1018 ThpCodeEntryCpaceTrezor = 1018
ThpCodeEntryCpaceTrezor = 1019 ThpCodeEntryCpaceHostTag = 1019
ThpCodeEntryTag = 1020 ThpCodeEntrySecret = 1020
ThpCodeEntrySecret = 1021
ThpQrCodeTag = 1024 ThpQrCodeTag = 1024
ThpQrCodeSecret = 1025 ThpQrCodeSecret = 1025
ThpNfcUnidirectionalTag = 1032 ThpNfcTagHost = 1032
ThpNfcUnidirectionalSecret = 1033 ThpNfcTagTrezor = 1033
class ThpPairingMethod(IntEnum): class ThpPairingMethod(IntEnum):
NoMethod = 1 SkipPairing = 1
CodeEntry = 2 CodeEntry = 2
QrCode = 3 QrCode = 3
NFC_Unidirectional = 4 NFC = 4
class MessageType(IntEnum): class MessageType(IntEnum):
@ -4203,7 +4203,7 @@ class DebugLinkState(protobuf.MessageType):
13: protobuf.Field("tokens", "string", repeated=True, required=False, default=None), 13: protobuf.Field("tokens", "string", repeated=True, required=False, default=None),
14: protobuf.Field("thp_pairing_code_entry_code", "uint32", repeated=False, required=False, default=None), 14: protobuf.Field("thp_pairing_code_entry_code", "uint32", repeated=False, required=False, default=None),
15: protobuf.Field("thp_pairing_code_qr_code", "bytes", repeated=False, required=False, default=None), 15: protobuf.Field("thp_pairing_code_qr_code", "bytes", repeated=False, required=False, default=None),
16: protobuf.Field("thp_pairing_code_nfc_unidirectional", "bytes", repeated=False, required=False, default=None), 16: protobuf.Field("thp_pairing_code_nfc", "bytes", repeated=False, required=False, default=None),
} }
def __init__( def __init__(
@ -4224,7 +4224,7 @@ class DebugLinkState(protobuf.MessageType):
mnemonic_type: Optional["BackupType"] = None, mnemonic_type: Optional["BackupType"] = None,
thp_pairing_code_entry_code: Optional["int"] = None, thp_pairing_code_entry_code: Optional["int"] = None,
thp_pairing_code_qr_code: Optional["bytes"] = None, thp_pairing_code_qr_code: Optional["bytes"] = None,
thp_pairing_code_nfc_unidirectional: Optional["bytes"] = None, thp_pairing_code_nfc: Optional["bytes"] = None,
) -> None: ) -> None:
self.tokens: Sequence["str"] = tokens if tokens is not None else [] self.tokens: Sequence["str"] = tokens if tokens is not None else []
self.layout = layout self.layout = layout
@ -4241,7 +4241,7 @@ class DebugLinkState(protobuf.MessageType):
self.mnemonic_type = mnemonic_type self.mnemonic_type = mnemonic_type
self.thp_pairing_code_entry_code = thp_pairing_code_entry_code self.thp_pairing_code_entry_code = thp_pairing_code_entry_code
self.thp_pairing_code_qr_code = thp_pairing_code_qr_code self.thp_pairing_code_qr_code = thp_pairing_code_qr_code
self.thp_pairing_code_nfc_unidirectional = thp_pairing_code_nfc_unidirectional self.thp_pairing_code_nfc = thp_pairing_code_nfc
class DebugLinkStop(protobuf.MessageType): class DebugLinkStop(protobuf.MessageType):
@ -7909,8 +7909,8 @@ class ThpDeviceProperties(protobuf.MessageType):
FIELDS = { FIELDS = {
1: protobuf.Field("internal_model", "string", repeated=False, required=False, default=None), 1: protobuf.Field("internal_model", "string", repeated=False, required=False, default=None),
2: protobuf.Field("model_variant", "uint32", repeated=False, required=False, default=None), 2: protobuf.Field("model_variant", "uint32", repeated=False, required=False, default=None),
3: protobuf.Field("bootloader_mode", "bool", repeated=False, required=False, default=None), 3: protobuf.Field("protocol_version_major", "uint32", repeated=False, required=False, default=None),
4: protobuf.Field("protocol_version", "uint32", repeated=False, required=False, default=None), 4: protobuf.Field("protocol_version_minor", "uint32", repeated=False, required=False, default=None),
5: protobuf.Field("pairing_methods", "ThpPairingMethod", repeated=True, required=False, default=None), 5: protobuf.Field("pairing_methods", "ThpPairingMethod", repeated=True, required=False, default=None),
} }
@ -7920,30 +7920,27 @@ class ThpDeviceProperties(protobuf.MessageType):
pairing_methods: Optional[Sequence["ThpPairingMethod"]] = None, pairing_methods: Optional[Sequence["ThpPairingMethod"]] = None,
internal_model: Optional["str"] = None, internal_model: Optional["str"] = None,
model_variant: Optional["int"] = None, model_variant: Optional["int"] = None,
bootloader_mode: Optional["bool"] = None, protocol_version_major: Optional["int"] = None,
protocol_version: Optional["int"] = None, protocol_version_minor: Optional["int"] = None,
) -> None: ) -> None:
self.pairing_methods: Sequence["ThpPairingMethod"] = pairing_methods if pairing_methods is not None else [] self.pairing_methods: Sequence["ThpPairingMethod"] = pairing_methods if pairing_methods is not None else []
self.internal_model = internal_model self.internal_model = internal_model
self.model_variant = model_variant self.model_variant = model_variant
self.bootloader_mode = bootloader_mode self.protocol_version_major = protocol_version_major
self.protocol_version = protocol_version self.protocol_version_minor = protocol_version_minor
class ThpHandshakeCompletionReqNoisePayload(protobuf.MessageType): class ThpHandshakeCompletionReqNoisePayload(protobuf.MessageType):
MESSAGE_WIRE_TYPE = None MESSAGE_WIRE_TYPE = None
FIELDS = { FIELDS = {
1: protobuf.Field("host_pairing_credential", "bytes", repeated=False, required=False, default=None), 1: protobuf.Field("host_pairing_credential", "bytes", repeated=False, required=False, default=None),
2: protobuf.Field("pairing_methods", "ThpPairingMethod", repeated=True, required=False, default=None),
} }
def __init__( def __init__(
self, self,
*, *,
pairing_methods: Optional[Sequence["ThpPairingMethod"]] = None,
host_pairing_credential: Optional["bytes"] = None, host_pairing_credential: Optional["bytes"] = None,
) -> None: ) -> None:
self.pairing_methods: Sequence["ThpPairingMethod"] = pairing_methods if pairing_methods is not None else []
self.host_pairing_credential = host_pairing_credential self.host_pairing_credential = host_pairing_credential
@ -7967,22 +7964,8 @@ class ThpCreateNewSession(protobuf.MessageType):
self.derive_cardano = derive_cardano self.derive_cardano = derive_cardano
class ThpNewSession(protobuf.MessageType): class ThpPairingRequest(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1001 MESSAGE_WIRE_TYPE = 1006
FIELDS = {
1: protobuf.Field("new_session_id", "uint32", repeated=False, required=False, default=None),
}
def __init__(
self,
*,
new_session_id: Optional["int"] = None,
) -> None:
self.new_session_id = new_session_id
class ThpStartPairingRequest(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1008
FIELDS = { FIELDS = {
1: protobuf.Field("host_name", "string", repeated=False, required=False, default=None), 1: protobuf.Field("host_name", "string", repeated=False, required=False, default=None),
} }
@ -7995,6 +7978,24 @@ class ThpStartPairingRequest(protobuf.MessageType):
self.host_name = host_name self.host_name = host_name
class ThpPairingRequestApproved(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1007
class ThpSelectMethod(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1008
FIELDS = {
1: protobuf.Field("selected_pairing_method", "ThpPairingMethod", repeated=False, required=False, default=ThpPairingMethod.NFC),
}
def __init__(
self,
*,
selected_pairing_method: Optional["ThpPairingMethod"] = ThpPairingMethod.NFC,
) -> None:
self.selected_pairing_method = selected_pairing_method
class ThpPairingPreparationsFinished(protobuf.MessageType): class ThpPairingPreparationsFinished(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1009 MESSAGE_WIRE_TYPE = 1009
@ -8027,22 +8028,8 @@ class ThpCodeEntryChallenge(protobuf.MessageType):
self.challenge = challenge self.challenge = challenge
class ThpCodeEntryCpaceHost(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1018
FIELDS = {
1: protobuf.Field("cpace_host_public_key", "bytes", repeated=False, required=False, default=None),
}
def __init__(
self,
*,
cpace_host_public_key: Optional["bytes"] = None,
) -> None:
self.cpace_host_public_key = cpace_host_public_key
class ThpCodeEntryCpaceTrezor(protobuf.MessageType): class ThpCodeEntryCpaceTrezor(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1019 MESSAGE_WIRE_TYPE = 1018
FIELDS = { FIELDS = {
1: protobuf.Field("cpace_trezor_public_key", "bytes", repeated=False, required=False, default=None), 1: protobuf.Field("cpace_trezor_public_key", "bytes", repeated=False, required=False, default=None),
} }
@ -8055,22 +8042,25 @@ class ThpCodeEntryCpaceTrezor(protobuf.MessageType):
self.cpace_trezor_public_key = cpace_trezor_public_key self.cpace_trezor_public_key = cpace_trezor_public_key
class ThpCodeEntryTag(protobuf.MessageType): class ThpCodeEntryCpaceHostTag(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1020 MESSAGE_WIRE_TYPE = 1019
FIELDS = { FIELDS = {
1: protobuf.Field("cpace_host_public_key", "bytes", repeated=False, required=False, default=None),
2: protobuf.Field("tag", "bytes", repeated=False, required=False, default=None), 2: protobuf.Field("tag", "bytes", repeated=False, required=False, default=None),
} }
def __init__( def __init__(
self, self,
*, *,
cpace_host_public_key: Optional["bytes"] = None,
tag: Optional["bytes"] = None, tag: Optional["bytes"] = None,
) -> None: ) -> None:
self.cpace_host_public_key = cpace_host_public_key
self.tag = tag self.tag = tag
class ThpCodeEntrySecret(protobuf.MessageType): class ThpCodeEntrySecret(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1021 MESSAGE_WIRE_TYPE = 1020
FIELDS = { FIELDS = {
1: protobuf.Field("secret", "bytes", repeated=False, required=False, default=None), 1: protobuf.Field("secret", "bytes", repeated=False, required=False, default=None),
} }
@ -8111,7 +8101,7 @@ class ThpQrCodeSecret(protobuf.MessageType):
self.secret = secret self.secret = secret
class ThpNfcUnidirectionalTag(protobuf.MessageType): class ThpNfcTagHost(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1032 MESSAGE_WIRE_TYPE = 1032
FIELDS = { FIELDS = {
1: protobuf.Field("tag", "bytes", repeated=False, required=False, default=None), 1: protobuf.Field("tag", "bytes", repeated=False, required=False, default=None),
@ -8125,32 +8115,35 @@ class ThpNfcUnidirectionalTag(protobuf.MessageType):
self.tag = tag self.tag = tag
class ThpNfcUnidirectionalSecret(protobuf.MessageType): class ThpNfcTagTrezor(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1033 MESSAGE_WIRE_TYPE = 1033
FIELDS = { FIELDS = {
1: protobuf.Field("secret", "bytes", repeated=False, required=False, default=None), 1: protobuf.Field("tag", "bytes", repeated=False, required=False, default=None),
} }
def __init__( def __init__(
self, self,
*, *,
secret: Optional["bytes"] = None, tag: Optional["bytes"] = None,
) -> None: ) -> None:
self.secret = secret self.tag = tag
class ThpCredentialRequest(protobuf.MessageType): class ThpCredentialRequest(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1010 MESSAGE_WIRE_TYPE = 1010
FIELDS = { FIELDS = {
1: protobuf.Field("host_static_pubkey", "bytes", repeated=False, required=False, default=None), 1: protobuf.Field("host_static_pubkey", "bytes", repeated=False, required=False, default=None),
2: protobuf.Field("autoconnect", "bool", repeated=False, required=False, default=None),
} }
def __init__( def __init__(
self, self,
*, *,
host_static_pubkey: Optional["bytes"] = None, host_static_pubkey: Optional["bytes"] = None,
autoconnect: Optional["bool"] = None,
) -> None: ) -> None:
self.host_static_pubkey = host_static_pubkey self.host_static_pubkey = host_static_pubkey
self.autoconnect = autoconnect
class ThpCredentialResponse(protobuf.MessageType): class ThpCredentialResponse(protobuf.MessageType):
@ -8182,14 +8175,17 @@ class ThpCredentialMetadata(protobuf.MessageType):
MESSAGE_WIRE_TYPE = None MESSAGE_WIRE_TYPE = None
FIELDS = { FIELDS = {
1: protobuf.Field("host_name", "string", repeated=False, required=False, default=None), 1: protobuf.Field("host_name", "string", repeated=False, required=False, default=None),
2: protobuf.Field("autoconnect", "bool", repeated=False, required=False, default=None),
} }
def __init__( def __init__(
self, self,
*, *,
host_name: Optional["str"] = None, host_name: Optional["str"] = None,
autoconnect: Optional["bool"] = None,
) -> None: ) -> None:
self.host_name = host_name self.host_name = host_name
self.autoconnect = autoconnect
class ThpPairingCredential(protobuf.MessageType): class ThpPairingCredential(protobuf.MessageType):

View File

@ -194,6 +194,8 @@ class SessionV2(Session):
return session return session
def __init__(self, client: TrezorClient, id: bytes) -> None: def __init__(self, client: TrezorClient, id: bytes) -> None:
from ..debuglink import TrezorClientDebugLink
super().__init__(client, id) super().__init__(client, id)
assert isinstance(client.protocol, ProtocolV2) assert isinstance(client.protocol, ProtocolV2)
@ -201,7 +203,10 @@ class SessionV2(Session):
self.button_callback = client.button_callback self.button_callback = client.button_callback
if self.button_callback is None: if self.button_callback is None:
self.button_callback = _callback_button self.button_callback = _callback_button
self.channel: ProtocolV2 = client.protocol.get_channel() helper_debug = None
if isinstance(client, TrezorClientDebugLink):
helper_debug = client.debug
self.channel: ProtocolV2 = client.protocol.get_channel(helper_debug)
self.update_id_and_sid(id) self.update_id_and_sid(id)
def _write(self, msg: t.Any) -> None: def _write(self, msg: t.Any) -> None:

View File

@ -4,9 +4,11 @@ from binascii import hexlify
class ChannelData: class ChannelData:
def __init__( def __init__(
self, self,
protocol_version: int, protocol_version_major: int,
protocol_version_minor: int,
transport_path: str, transport_path: str,
channel_id: int, channel_id: int,
key_request: bytes, key_request: bytes,
@ -15,8 +17,10 @@ class ChannelData:
nonce_response: int, nonce_response: int,
sync_bit_send: int, sync_bit_send: int,
sync_bit_receive: int, sync_bit_receive: int,
handshake_hash: bytes,
) -> None: ) -> None:
self.protocol_version: int = protocol_version self.protocol_version_major: int = protocol_version_major
self.protocol_version_minor: int = protocol_version_minor
self.transport_path: str = transport_path self.transport_path: str = transport_path
self.channel_id: int = channel_id self.channel_id: int = channel_id
self.key_request: str = hexlify(key_request).decode() self.key_request: str = hexlify(key_request).decode()
@ -25,10 +29,12 @@ class ChannelData:
self.nonce_response: int = nonce_response self.nonce_response: int = nonce_response
self.sync_bit_receive: int = sync_bit_receive self.sync_bit_receive: int = sync_bit_receive
self.sync_bit_send: int = sync_bit_send self.sync_bit_send: int = sync_bit_send
self.handshake_hash: bytes = handshake_hash
def to_dict(self): def to_dict(self):
return { return {
"protocol_version": self.protocol_version, "protocol_version_major": self.protocol_version_major,
"protocol_version_minor": self.protocol_version_minor,
"transport_path": self.transport_path, "transport_path": self.transport_path,
"channel_id": self.channel_id, "channel_id": self.channel_id,
"key_request": self.key_request, "key_request": self.key_request,
@ -37,4 +43,5 @@ class ChannelData:
"nonce_response": self.nonce_response, "nonce_response": self.nonce_response,
"sync_bit_send": self.sync_bit_send, "sync_bit_send": self.sync_bit_send,
"sync_bit_receive": self.sync_bit_receive, "sync_bit_receive": self.sync_bit_receive,
"handshake_hash": self.handshake_hash,
} }

View File

@ -112,7 +112,8 @@ class JsonChannelDatabase(ChannelDatabase):
def dict_to_channel_data(dict: t.Dict) -> ChannelData: def dict_to_channel_data(dict: t.Dict) -> ChannelData:
return ChannelData( return ChannelData(
protocol_version=dict["protocol_version"], protocol_version_major=dict["protocol_version_minor"],
protocol_version_minor=dict["protocol_version_major"],
transport_path=dict["transport_path"], transport_path=dict["transport_path"],
channel_id=dict["channel_id"], channel_id=dict["channel_id"],
key_request=bytes.fromhex(dict["key_request"]), key_request=bytes.fromhex(dict["key_request"]),
@ -121,6 +122,7 @@ def dict_to_channel_data(dict: t.Dict) -> ChannelData:
nonce_response=dict["nonce_response"], nonce_response=dict["nonce_response"],
sync_bit_send=dict["sync_bit_send"], sync_bit_send=dict["sync_bit_send"],
sync_bit_receive=dict["sync_bit_receive"], sync_bit_receive=dict["sync_bit_receive"],
handshake_hash=dict["handshake_hash"],
) )

View File

@ -26,6 +26,9 @@ LOG = logging.getLogger(__name__)
MANAGEMENT_SESSION_ID: int = 0 MANAGEMENT_SESSION_ID: int = 0
if t.TYPE_CHECKING:
from ...debuglink import DebugLink
def _sha256_of_two(val_1: bytes, val_2: bytes) -> bytes: def _sha256_of_two(val_1: bytes, val_2: bytes) -> bytes:
hash = hashlib.sha256(val_1) hash = hashlib.sha256(val_1)
@ -77,16 +80,18 @@ class ProtocolV2(ProtocolAndChannel):
self.nonce_response = channel_data.nonce_response self.nonce_response = channel_data.nonce_response
self.sync_bit_receive = channel_data.sync_bit_receive self.sync_bit_receive = channel_data.sync_bit_receive
self.sync_bit_send = channel_data.sync_bit_send self.sync_bit_send = channel_data.sync_bit_send
self.handshake_hash: bytes = b""
self._has_valid_channel = True self._has_valid_channel = True
def get_channel(self) -> ProtocolV2: def get_channel(self, helper_debug: DebugLink | None = None) -> ProtocolV2:
if not self._has_valid_channel: if not self._has_valid_channel:
self._establish_new_channel() self._establish_new_channel(helper_debug)
return self return self
def get_channel_data(self) -> ChannelData: def get_channel_data(self) -> ChannelData:
return ChannelData( return ChannelData(
protocol_version=2, protocol_version_major=2,
protocol_version_minor=2,
transport_path=self.transport.get_path(), transport_path=self.transport.get_path(),
channel_id=self.channel_id, channel_id=self.channel_id,
key_request=self.key_request, key_request=self.key_request,
@ -95,6 +100,7 @@ class ProtocolV2(ProtocolAndChannel):
nonce_response=self.nonce_response, nonce_response=self.nonce_response,
sync_bit_receive=self.sync_bit_receive, sync_bit_receive=self.sync_bit_receive,
sync_bit_send=self.sync_bit_send, sync_bit_send=self.sync_bit_send,
handshake_hash=self.handshake_hash,
) )
def read(self, session_id: int) -> t.Any: def read(self, session_id: int) -> t.Any:
@ -129,7 +135,7 @@ class ProtocolV2(ProtocolAndChannel):
raise exceptions.TrezorException("Unexpected response to GetFeatures") raise exceptions.TrezorException("Unexpected response to GetFeatures")
self._features = features self._features = features
def _establish_new_channel(self) -> None: def _establish_new_channel(self, helper_debug: DebugLink | None = None) -> None:
self.sync_bit_send = 0 self.sync_bit_send = 0
self.sync_bit_receive = 0 self.sync_bit_receive = 0
@ -141,7 +147,7 @@ class ProtocolV2(ProtocolAndChannel):
self._do_handshake(host_ephemeral_privkey, host_ephemeral_pubkey) self._do_handshake(host_ephemeral_privkey, host_ephemeral_pubkey)
self._do_pairing() self._do_pairing(helper_debug)
def _do_channel_allocation(self) -> None: def _do_channel_allocation(self) -> None:
channel_allocation_nonce = os.urandom(8) channel_allocation_nonce = os.urandom(8)
@ -269,9 +275,6 @@ class ProtocolV2(ProtocolAndChannel):
) )
msg_data = self.mapping.encode_without_wire_type( msg_data = self.mapping.encode_without_wire_type(
messages.ThpHandshakeCompletionReqNoisePayload( messages.ThpHandshakeCompletionReqNoisePayload(
pairing_methods=[
messages.ThpPairingMethod.NoMethod,
],
host_pairing_credential=credential, host_pairing_credential=credential,
) )
) )
@ -279,7 +282,7 @@ class ProtocolV2(ProtocolAndChannel):
aes_ctx = AESGCM(k) aes_ctx = AESGCM(k)
encrypted_payload = aes_ctx.encrypt(IV_1, msg_data, h) encrypted_payload = aes_ctx.encrypt(IV_1, msg_data, h)
h = _sha256_of_two(h, encrypted_payload) h = _sha256_of_two(h, encrypted_payload[:-16])
ha_completion_req_header = MessageHeader( ha_completion_req_header = MessageHeader(
0x12, 0x12,
self.channel_id, self.channel_id,
@ -292,6 +295,7 @@ class ProtocolV2(ProtocolAndChannel):
ha_completion_req_header, ha_completion_req_header,
encrypted_host_static_pubkey + encrypted_payload, encrypted_host_static_pubkey + encrypted_payload,
) )
self.handshake_hash = h
return ck return ck
def _read_handshake_completion_response(self) -> None: def _read_handshake_completion_response(self) -> None:
@ -304,9 +308,9 @@ class ProtocolV2(ProtocolAndChannel):
) )
self._send_ack_1() self._send_ack_1()
def _do_pairing(self): def _do_pairing(self, helper_debug: DebugLink | None):
# Send StartPairingReqest message # Send StartPairingReqest message
message = messages.ThpStartPairingRequest() message = messages.ThpPairingRequest()
message_type, message_data = self.mapping.encode(message) message_type, message_data = self.mapping.encode(message)
self._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data) self._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
@ -314,11 +318,41 @@ class ProtocolV2(ProtocolAndChannel):
# Read ACK # Read ACK
self._read_ack() self._read_ack()
# Read ThpEndResponse # Read button request
_, msg_type, msg_data = self.read_and_decrypt()
maaa = self.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, messages.ButtonRequest)
# Send button ACK
message = messages.ButtonAck()
message_type, message_data = self.mapping.encode(message)
self._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
self._read_ack()
if helper_debug is not None:
helper_debug.press_yes()
# Read PairingRequestApproved
_, msg_type, msg_data = self.read_and_decrypt() _, msg_type, msg_data = self.read_and_decrypt()
maaa = self.mapping.decode(msg_type, msg_data) maaa = self.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, messages.ThpPairingRequestApproved)
message = messages.ThpSelectMethod(
selected_pairing_method=messages.ThpPairingMethod.SkipPairing
)
message_type, message_data = self.mapping.encode(message)
self._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
# Read ACK
self._read_ack()
# Read ThpEndResponse
_, msg_type, msg_data = self.read_and_decrypt()
maaa = self.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, messages.ThpEndResponse) assert isinstance(maaa, messages.ThpEndResponse)
self._has_valid_channel = True self._has_valid_channel = True
def _read_ack(self): def _read_ack(self):

View File

@ -1494,8 +1494,8 @@ pub struct DebugLinkState {
pub thp_pairing_code_entry_code: ::std::option::Option<u32>, pub thp_pairing_code_entry_code: ::std::option::Option<u32>,
// @@protoc_insertion_point(field:hw.trezor.messages.debug.DebugLinkState.thp_pairing_code_qr_code) // @@protoc_insertion_point(field:hw.trezor.messages.debug.DebugLinkState.thp_pairing_code_qr_code)
pub thp_pairing_code_qr_code: ::std::option::Option<::std::vec::Vec<u8>>, pub thp_pairing_code_qr_code: ::std::option::Option<::std::vec::Vec<u8>>,
// @@protoc_insertion_point(field:hw.trezor.messages.debug.DebugLinkState.thp_pairing_code_nfc_unidirectional) // @@protoc_insertion_point(field:hw.trezor.messages.debug.DebugLinkState.thp_pairing_code_nfc)
pub thp_pairing_code_nfc_unidirectional: ::std::option::Option<::std::vec::Vec<u8>>, pub thp_pairing_code_nfc: ::std::option::Option<::std::vec::Vec<u8>>,
// special fields // special fields
// @@protoc_insertion_point(special_field:hw.trezor.messages.debug.DebugLinkState.special_fields) // @@protoc_insertion_point(special_field:hw.trezor.messages.debug.DebugLinkState.special_fields)
pub special_fields: ::protobuf::SpecialFields, pub special_fields: ::protobuf::SpecialFields,
@ -1898,40 +1898,40 @@ impl DebugLinkState {
self.thp_pairing_code_qr_code.take().unwrap_or_else(|| ::std::vec::Vec::new()) self.thp_pairing_code_qr_code.take().unwrap_or_else(|| ::std::vec::Vec::new())
} }
// optional bytes thp_pairing_code_nfc_unidirectional = 16; // optional bytes thp_pairing_code_nfc = 16;
pub fn thp_pairing_code_nfc_unidirectional(&self) -> &[u8] { pub fn thp_pairing_code_nfc(&self) -> &[u8] {
match self.thp_pairing_code_nfc_unidirectional.as_ref() { match self.thp_pairing_code_nfc.as_ref() {
Some(v) => v, Some(v) => v,
None => &[], None => &[],
} }
} }
pub fn clear_thp_pairing_code_nfc_unidirectional(&mut self) { pub fn clear_thp_pairing_code_nfc(&mut self) {
self.thp_pairing_code_nfc_unidirectional = ::std::option::Option::None; self.thp_pairing_code_nfc = ::std::option::Option::None;
} }
pub fn has_thp_pairing_code_nfc_unidirectional(&self) -> bool { pub fn has_thp_pairing_code_nfc(&self) -> bool {
self.thp_pairing_code_nfc_unidirectional.is_some() self.thp_pairing_code_nfc.is_some()
} }
// Param is passed by value, moved // Param is passed by value, moved
pub fn set_thp_pairing_code_nfc_unidirectional(&mut self, v: ::std::vec::Vec<u8>) { pub fn set_thp_pairing_code_nfc(&mut self, v: ::std::vec::Vec<u8>) {
self.thp_pairing_code_nfc_unidirectional = ::std::option::Option::Some(v); self.thp_pairing_code_nfc = ::std::option::Option::Some(v);
} }
// Mutable pointer to the field. // Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first. // If field is not initialized, it is initialized with default value first.
pub fn mut_thp_pairing_code_nfc_unidirectional(&mut self) -> &mut ::std::vec::Vec<u8> { pub fn mut_thp_pairing_code_nfc(&mut self) -> &mut ::std::vec::Vec<u8> {
if self.thp_pairing_code_nfc_unidirectional.is_none() { if self.thp_pairing_code_nfc.is_none() {
self.thp_pairing_code_nfc_unidirectional = ::std::option::Option::Some(::std::vec::Vec::new()); self.thp_pairing_code_nfc = ::std::option::Option::Some(::std::vec::Vec::new());
} }
self.thp_pairing_code_nfc_unidirectional.as_mut().unwrap() self.thp_pairing_code_nfc.as_mut().unwrap()
} }
// Take field // Take field
pub fn take_thp_pairing_code_nfc_unidirectional(&mut self) -> ::std::vec::Vec<u8> { pub fn take_thp_pairing_code_nfc(&mut self) -> ::std::vec::Vec<u8> {
self.thp_pairing_code_nfc_unidirectional.take().unwrap_or_else(|| ::std::vec::Vec::new()) self.thp_pairing_code_nfc.take().unwrap_or_else(|| ::std::vec::Vec::new())
} }
fn generated_message_descriptor_data() -> ::protobuf::reflect::GeneratedMessageDescriptorData { fn generated_message_descriptor_data() -> ::protobuf::reflect::GeneratedMessageDescriptorData {
@ -2013,9 +2013,9 @@ impl DebugLinkState {
|m: &mut DebugLinkState| { &mut m.thp_pairing_code_qr_code }, |m: &mut DebugLinkState| { &mut m.thp_pairing_code_qr_code },
)); ));
fields.push(::protobuf::reflect::rt::v2::make_option_accessor::<_, _>( fields.push(::protobuf::reflect::rt::v2::make_option_accessor::<_, _>(
"thp_pairing_code_nfc_unidirectional", "thp_pairing_code_nfc",
|m: &DebugLinkState| { &m.thp_pairing_code_nfc_unidirectional }, |m: &DebugLinkState| { &m.thp_pairing_code_nfc },
|m: &mut DebugLinkState| { &mut m.thp_pairing_code_nfc_unidirectional }, |m: &mut DebugLinkState| { &mut m.thp_pairing_code_nfc },
)); ));
::protobuf::reflect::GeneratedMessageDescriptorData::new_2::<DebugLinkState>( ::protobuf::reflect::GeneratedMessageDescriptorData::new_2::<DebugLinkState>(
"DebugLinkState", "DebugLinkState",
@ -2086,7 +2086,7 @@ impl ::protobuf::Message for DebugLinkState {
self.thp_pairing_code_qr_code = ::std::option::Option::Some(is.read_bytes()?); self.thp_pairing_code_qr_code = ::std::option::Option::Some(is.read_bytes()?);
}, },
130 => { 130 => {
self.thp_pairing_code_nfc_unidirectional = ::std::option::Option::Some(is.read_bytes()?); self.thp_pairing_code_nfc = ::std::option::Option::Some(is.read_bytes()?);
}, },
tag => { tag => {
::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?; ::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?;
@ -2146,7 +2146,7 @@ impl ::protobuf::Message for DebugLinkState {
if let Some(v) = self.thp_pairing_code_qr_code.as_ref() { if let Some(v) = self.thp_pairing_code_qr_code.as_ref() {
my_size += ::protobuf::rt::bytes_size(15, &v); my_size += ::protobuf::rt::bytes_size(15, &v);
} }
if let Some(v) = self.thp_pairing_code_nfc_unidirectional.as_ref() { if let Some(v) = self.thp_pairing_code_nfc.as_ref() {
my_size += ::protobuf::rt::bytes_size(16, &v); my_size += ::protobuf::rt::bytes_size(16, &v);
} }
my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields()); my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields());
@ -2200,7 +2200,7 @@ impl ::protobuf::Message for DebugLinkState {
if let Some(v) = self.thp_pairing_code_qr_code.as_ref() { if let Some(v) = self.thp_pairing_code_qr_code.as_ref() {
os.write_bytes(15, v)?; os.write_bytes(15, v)?;
} }
if let Some(v) = self.thp_pairing_code_nfc_unidirectional.as_ref() { if let Some(v) = self.thp_pairing_code_nfc.as_ref() {
os.write_bytes(16, v)?; os.write_bytes(16, v)?;
} }
os.write_unknown_fields(self.special_fields.unknown_fields())?; os.write_unknown_fields(self.special_fields.unknown_fields())?;
@ -2235,7 +2235,7 @@ impl ::protobuf::Message for DebugLinkState {
self.tokens.clear(); self.tokens.clear();
self.thp_pairing_code_entry_code = ::std::option::Option::None; self.thp_pairing_code_entry_code = ::std::option::Option::None;
self.thp_pairing_code_qr_code = ::std::option::Option::None; self.thp_pairing_code_qr_code = ::std::option::Option::None;
self.thp_pairing_code_nfc_unidirectional = ::std::option::Option::None; self.thp_pairing_code_nfc = ::std::option::Option::None;
self.special_fields.clear(); self.special_fields.clear();
} }
@ -2256,7 +2256,7 @@ impl ::protobuf::Message for DebugLinkState {
tokens: ::std::vec::Vec::new(), tokens: ::std::vec::Vec::new(),
thp_pairing_code_entry_code: ::std::option::Option::None, thp_pairing_code_entry_code: ::std::option::Option::None,
thp_pairing_code_qr_code: ::std::option::Option::None, thp_pairing_code_qr_code: ::std::option::Option::None,
thp_pairing_code_nfc_unidirectional: ::std::option::Option::None, thp_pairing_code_nfc: ::std::option::Option::None,
special_fields: ::protobuf::SpecialFields::new(), special_fields: ::protobuf::SpecialFields::new(),
}; };
&instance &instance
@ -3856,7 +3856,7 @@ static file_descriptor_proto_data: &'static [u8] = b"\
(\x0e29.hw.trezor.messages.debug.DebugLinkGetState.DebugWaitType:\tIMMED\ (\x0e29.hw.trezor.messages.debug.DebugLinkGetState.DebugWaitType:\tIMMED\
IATER\nwaitLayout\x12$\n\x0ethp_channel_id\x18\x04\x20\x01(\x0cR\x0cthpC\ IATER\nwaitLayout\x12$\n\x0ethp_channel_id\x18\x04\x20\x01(\x0cR\x0cthpC\
hannelId\"C\n\rDebugWaitType\x12\r\n\tIMMEDIATE\x10\0\x12\x0f\n\x0bNEXT_\ hannelId\"C\n\rDebugWaitType\x12\r\n\tIMMEDIATE\x10\0\x12\x0f\n\x0bNEXT_\
LAYOUT\x10\x01\x12\x12\n\x0eCURRENT_LAYOUT\x10\x02\"\xdb\x05\n\x0eDebugL\ LAYOUT\x10\x01\x12\x12\n\x0eCURRENT_LAYOUT\x10\x02\"\xbe\x05\n\x0eDebugL\
inkState\x12\x16\n\x06layout\x18\x01\x20\x01(\x0cR\x06layout\x12\x10\n\ inkState\x12\x16\n\x06layout\x18\x01\x20\x01(\x0cR\x06layout\x12\x10\n\
\x03pin\x18\x02\x20\x01(\tR\x03pin\x12\x16\n\x06matrix\x18\x03\x20\x01(\ \x03pin\x18\x02\x20\x01(\tR\x03pin\x12\x16\n\x06matrix\x18\x03\x20\x01(\
\tR\x06matrix\x12'\n\x0fmnemonic_secret\x18\x04\x20\x01(\x0cR\x0emnemoni\ \tR\x06matrix\x12'\n\x0fmnemonic_secret\x18\x04\x20\x01(\x0cR\x0emnemoni\
@ -3871,22 +3871,22 @@ static file_descriptor_proto_data: &'static [u8] = b"\
monicType\x12\x16\n\x06tokens\x18\r\x20\x03(\tR\x06tokens\x12<\n\x1bthp_\ monicType\x12\x16\n\x06tokens\x18\r\x20\x03(\tR\x06tokens\x12<\n\x1bthp_\
pairing_code_entry_code\x18\x0e\x20\x01(\rR\x17thpPairingCodeEntryCode\ pairing_code_entry_code\x18\x0e\x20\x01(\rR\x17thpPairingCodeEntryCode\
\x126\n\x18thp_pairing_code_qr_code\x18\x0f\x20\x01(\x0cR\x14thpPairingC\ \x126\n\x18thp_pairing_code_qr_code\x18\x0f\x20\x01(\x0cR\x14thpPairingC\
odeQrCode\x12L\n#thp_pairing_code_nfc_unidirectional\x18\x10\x20\x01(\ odeQrCode\x12/\n\x14thp_pairing_code_nfc\x18\x10\x20\x01(\x0cR\x11thpPai\
\x0cR\x1fthpPairingCodeNfcUnidirectional\"\x0f\n\rDebugLinkStop\"P\n\x0c\ ringCodeNfc\"\x0f\n\rDebugLinkStop\"P\n\x0cDebugLinkLog\x12\x14\n\x05lev\
DebugLinkLog\x12\x14\n\x05level\x18\x01\x20\x01(\rR\x05level\x12\x16\n\ el\x18\x01\x20\x01(\rR\x05level\x12\x16\n\x06bucket\x18\x02\x20\x01(\tR\
\x06bucket\x18\x02\x20\x01(\tR\x06bucket\x12\x12\n\x04text\x18\x03\x20\ \x06bucket\x12\x12\n\x04text\x18\x03\x20\x01(\tR\x04text\"G\n\x13DebugLi\
\x01(\tR\x04text\"G\n\x13DebugLinkMemoryRead\x12\x18\n\x07address\x18\ nkMemoryRead\x12\x18\n\x07address\x18\x01\x20\x01(\rR\x07address\x12\x16\
\x01\x20\x01(\rR\x07address\x12\x16\n\x06length\x18\x02\x20\x01(\rR\x06l\ \n\x06length\x18\x02\x20\x01(\rR\x06length\")\n\x0fDebugLinkMemory\x12\
ength\")\n\x0fDebugLinkMemory\x12\x16\n\x06memory\x18\x01\x20\x01(\x0cR\ \x16\n\x06memory\x18\x01\x20\x01(\x0cR\x06memory\"^\n\x14DebugLinkMemory\
\x06memory\"^\n\x14DebugLinkMemoryWrite\x12\x18\n\x07address\x18\x01\x20\ Write\x12\x18\n\x07address\x18\x01\x20\x01(\rR\x07address\x12\x16\n\x06m\
\x01(\rR\x07address\x12\x16\n\x06memory\x18\x02\x20\x01(\x0cR\x06memory\ emory\x18\x02\x20\x01(\x0cR\x06memory\x12\x14\n\x05flash\x18\x03\x20\x01\
\x12\x14\n\x05flash\x18\x03\x20\x01(\x08R\x05flash\"-\n\x13DebugLinkFlas\ (\x08R\x05flash\"-\n\x13DebugLinkFlashErase\x12\x16\n\x06sector\x18\x01\
hErase\x12\x16\n\x06sector\x18\x01\x20\x01(\rR\x06sector\".\n\x14DebugLi\ \x20\x01(\rR\x06sector\".\n\x14DebugLinkEraseSdCard\x12\x16\n\x06format\
nkEraseSdCard\x12\x16\n\x06format\x18\x01\x20\x01(\x08R\x06format\"0\n\ \x18\x01\x20\x01(\x08R\x06format\"0\n\x14DebugLinkWatchLayout\x12\x14\n\
\x14DebugLinkWatchLayout\x12\x14\n\x05watch\x18\x01\x20\x01(\x08R\x05wat\ \x05watch\x18\x01\x20\x01(\x08R\x05watch:\x02\x18\x01\"\x1f\n\x19DebugLi\
ch:\x02\x18\x01\"\x1f\n\x19DebugLinkResetDebugEvents:\x02\x18\x01\"\x1a\ nkResetDebugEvents:\x02\x18\x01\"\x1a\n\x18DebugLinkOptigaSetSecMaxB=\n#\
\n\x18DebugLinkOptigaSetSecMaxB=\n#com.satoshilabs.trezor.lib.protobufB\ com.satoshilabs.trezor.lib.protobufB\x12TrezorMessageDebug\x80\xa6\x1d\
\x12TrezorMessageDebug\x80\xa6\x1d\x01\ \x01\
"; ";
/// `FileDescriptorProto` object which was a source for this generated file /// `FileDescriptorProto` object which was a source for this generated file

File diff suppressed because it is too large Load Diff

View File

@ -174,10 +174,7 @@ def _client_from_path(
def _find_client(request: pytest.FixtureRequest, interact: bool) -> Client: def _find_client(request: pytest.FixtureRequest, interact: bool) -> Client:
devices = enumerate_devices() devices = enumerate_devices()
for device in devices: for device in devices:
try: return Client(device, auto_interact=not interact)
return Client(device, auto_interact=not interact)
except Exception:
pass
request.session.shouldstop = "Failed to communicate with Trezor" request.session.shouldstop = "Failed to communicate with Trezor"
raise RuntimeError("No debuggable device found") raise RuntimeError("No debuggable device found")

View File

@ -1,3 +1,4 @@
import hashlib
import os import os
import typing as t import typing as t
@ -6,8 +7,24 @@ import typing_extensions as tx
from trezorlib.client import ProtocolV2 from trezorlib.client import ProtocolV2
from trezorlib.debuglink import TrezorClientDebugLink as Client from trezorlib.debuglink import TrezorClientDebugLink as Client
from trezorlib.messages import (
ButtonAck,
ButtonRequest,
ThpCodeEntryChallenge,
ThpCodeEntryCommitment,
ThpCodeEntryCpaceTrezor,
ThpEndRequest,
ThpEndResponse,
ThpPairingMethod,
ThpPairingPreparationsFinished,
ThpPairingRequest,
ThpPairingRequestApproved,
ThpQrCodeSecret,
ThpQrCodeTag,
ThpSelectMethod,
)
from trezorlib.transport.thp import curve25519 from trezorlib.transport.thp import curve25519
from trezorlib.transport.thp.protocol_v2 import _hkdf from trezorlib.transport.thp.protocol_v2 import MANAGEMENT_SESSION_ID, _hkdf
if t.TYPE_CHECKING: if t.TYPE_CHECKING:
P = tx.ParamSpec("P") P = tx.ParamSpec("P")
@ -65,7 +82,219 @@ def test_handshake(client: Client) -> None:
# TODO - without pairing, the client is damaged and results in fail of the following test # TODO - without pairing, the client is damaged and results in fail of the following test
# so far no luck in solving it - it should be also tackled in FW, as it causes unexpected FW error # so far no luck in solving it - it should be also tackled in FW, as it causes unexpected FW error
protocol._do_pairing() protocol._do_pairing(client.debug)
# TODO the following is just to make style checker happy # TODO the following is just to make style checker happy
assert noise_tag is not None assert noise_tag is not None
def test_pairing_qr_code(client: Client) -> None:
protocol: ProtocolV2 = client.protocol
protocol.sync_bit_send = 0
protocol.sync_bit_receive = 0
# Generate ephemeral keys
host_ephemeral_privkey = curve25519.get_private_key(os.urandom(32))
host_ephemeral_pubkey = curve25519.get_public_key(host_ephemeral_privkey)
protocol._do_channel_allocation()
protocol._do_handshake(host_ephemeral_privkey, host_ephemeral_pubkey)
# Send StartPairingReqest message
message = ThpPairingRequest()
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
# Read ACK
protocol._read_ack()
# Read button request
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ButtonRequest)
# Send button ACK
message = ButtonAck()
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
protocol._read_ack()
client.debug.press_yes()
# Read PairingRequestApproved
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ThpPairingRequestApproved)
message = ThpSelectMethod(selected_pairing_method=ThpPairingMethod.QrCode)
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
# Read ACK
protocol._read_ack()
# Read ThpPairingPreparationsFinished
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ThpPairingPreparationsFinished)
# QR Code shown
# Read button request
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ButtonRequest)
# Send button ACK
message = ButtonAck()
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
protocol._read_ack()
state = client.debug.state(thp_channel_id=protocol.channel_id.to_bytes(2, "big"))
sha_ctx = hashlib.sha256(protocol.handshake_hash)
sha_ctx.update(state.thp_pairing_code_qr_code)
tag = sha_ctx.digest()
message_type, message_data = protocol.mapping.encode(ThpQrCodeTag(tag=tag))
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
protocol._read_ack()
# Read ThpQrCodeSecret
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ThpQrCodeSecret)
message = ThpEndRequest()
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
# Read ACK
protocol._read_ack()
# Read ThpEndResponse
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ThpEndResponse)
protocol._has_valid_channel = True
@pytest.mark.skip("Cpace is not implemented yet")
def test_pairing_code_entry(client: Client) -> None:
protocol: ProtocolV2 = client.protocol
protocol.sync_bit_send = 0
protocol.sync_bit_receive = 0
# Generate ephemeral keys
host_ephemeral_privkey = curve25519.get_private_key(os.urandom(32))
host_ephemeral_pubkey = curve25519.get_public_key(host_ephemeral_privkey)
protocol._do_channel_allocation()
protocol._do_handshake(host_ephemeral_privkey, host_ephemeral_pubkey)
# Send StartPairingReqest message
message = ThpPairingRequest()
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
# Read ACK
protocol._read_ack()
# Read button request
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ButtonRequest)
# Send button ACK
message = ButtonAck()
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
protocol._read_ack()
client.debug.press_yes()
# Read PairingRequestApproved
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ThpPairingRequestApproved)
message = ThpSelectMethod(selected_pairing_method=ThpPairingMethod.CodeEntry)
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
# Read ACK
protocol._read_ack()
# Read ThpCodeEntryCommitment
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ThpCodeEntryCommitment)
challenge = b"\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xAA\xBB\xCC\xDD\xEE\xFF"
message = ThpCodeEntryChallenge(challenge=challenge)
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
# Read ACK
protocol._read_ack()
# Read ThpCodeEntryCpaceTrezor
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ThpCodeEntryCpaceTrezor)
_ = maaa.cpace_trezor_public_key
# Code Entry code shown
# Read button request
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ButtonRequest)
# Send button ACK
message = ButtonAck()
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
protocol._read_ack()
state = client.debug.state(thp_channel_id=protocol.channel_id.to_bytes(2, "big"))
sha_ctx = hashlib.sha256(protocol.handshake_hash)
sha_ctx.update(state.thp_pairing_code_entry_code)
tag = sha_ctx.digest()
message_type, message_data = protocol.mapping.encode(ThpQrCodeTag(tag=tag))
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
protocol._read_ack()
# Read ThpQrCodeSecret
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ThpQrCodeSecret)
message = ThpEndRequest()
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
# Read ACK
protocol._read_ack()
# Read ThpEndResponse
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ThpEndResponse)
protocol._has_valid_channel = True