1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-02-01 18:30:56 +00:00

temp: update pairing process, part 1

This commit is contained in:
M1nd3r 2025-01-27 14:04:51 +01:00
parent 503a19684e
commit b3cb270249
24 changed files with 1470 additions and 1035 deletions

View File

@ -134,7 +134,7 @@ message DebugLinkState {
repeated string tokens = 13; // current layout represented as a list of string tokens
optional uint32 thp_pairing_code_entry_code = 14;
optional bytes thp_pairing_code_qr_code = 15;
optional bytes thp_pairing_code_nfc_unidirectional = 16;
optional bytes thp_pairing_code_nfc = 16;
}
/**

View File

@ -13,28 +13,28 @@ option (include_in_bitcoin_only) = true;
* Mapping between Trezor wire identifier (uint) and a Thp protobuf message
*/
enum ThpMessageType {
reserved 0 to 999; // Values reserved by other messages, see messages.proto
reserved 0 to 999; // Values reserved by other messages, see messages.proto
ThpMessageType_ThpCreateNewSession = 1000[(bitcoin_only)=true];
ThpMessageType_ThpNewSession = 1001[(bitcoin_only)=true];
ThpMessageType_ThpStartPairingRequest = 1008 [(bitcoin_only) = true];
ThpMessageType_ThpCreateNewSession = 1000 [(bitcoin_only)=true];
ThpMessageType_ThpPairingRequest = 1006 [(bitcoin_only) = true];
ThpMessageType_ThpPairingRequestApproved = 1007 [(bitcoin_only) = true];
ThpMessageType_ThpSelectMethod = 1008 [(bitcoin_only) = true];
ThpMessageType_ThpPairingPreparationsFinished = 1009 [(bitcoin_only) = true];
ThpMessageType_ThpCredentialRequest = 1010 [(bitcoin_only) = true];
ThpMessageType_ThpCredentialResponse = 1011 [(bitcoin_only) = true];
ThpMessageType_ThpEndRequest = 1012 [(bitcoin_only) = true];
ThpMessageType_ThpEndResponse = 1013[(bitcoin_only) = true];
ThpMessageType_ThpCodeEntryCommitment = 1016[(bitcoin_only)=true];
ThpMessageType_ThpCodeEntryChallenge = 1017[(bitcoin_only)=true];
ThpMessageType_ThpCodeEntryCpaceHost = 1018[(bitcoin_only)=true];
ThpMessageType_ThpCodeEntryCpaceTrezor = 1019[(bitcoin_only)=true];
ThpMessageType_ThpCodeEntryTag = 1020[(bitcoin_only)=true];
ThpMessageType_ThpCodeEntrySecret = 1021[(bitcoin_only)=true];
ThpMessageType_ThpQrCodeTag = 1024[(bitcoin_only)=true];
ThpMessageType_ThpQrCodeSecret = 1025[(bitcoin_only)=true];
ThpMessageType_ThpNfcUnidirectionalTag = 1032[(bitcoin_only)=true];
ThpMessageType_ThpNfcUnidirectionalSecret = 1033[(bitcoin_only)=true];
ThpMessageType_ThpEndResponse = 1013 [(bitcoin_only) = true];
ThpMessageType_ThpCodeEntryCommitment = 1016 [(bitcoin_only)=true];
ThpMessageType_ThpCodeEntryChallenge = 1017 [(bitcoin_only)=true];
ThpMessageType_ThpCodeEntryCpaceTrezor = 1018 [(bitcoin_only)=true];
ThpMessageType_ThpCodeEntryCpaceHostTag = 1019 [(bitcoin_only)=true];
ThpMessageType_ThpCodeEntrySecret = 1020 [(bitcoin_only)=true];
ThpMessageType_ThpQrCodeTag = 1024 [(bitcoin_only)=true];
ThpMessageType_ThpQrCodeSecret = 1025 [(bitcoin_only)=true];
ThpMessageType_ThpNfcTagHost = 1032 [(bitcoin_only)=true];
ThpMessageType_ThpNfcTagTrezor = 1033 [(bitcoin_only)=true];
reserved 1100 to 2147483647; // Values reserved by other messages, see messages.proto
reserved 1100 to 2147483647; // Values reserved by other messages, see messages.proto
}
@ -43,10 +43,10 @@ enum ThpMessageType {
* @embed
*/
enum ThpPairingMethod {
NoMethod = 1; // Trust without MITM protection.
SkipPairing = 1; // Trust without MITM protection.
CodeEntry = 2; // User types code diplayed on Trezor into the host application.
QrCode = 3; // User scans code displayed on Trezor into host application.
NFC_Unidirectional = 4; // Trezor transmits an authentication key to the host device via NFC.
NFC = 4; // Trezor and host application exchange authentication secrets via NFC.
}
/**
@ -55,8 +55,8 @@ enum ThpPairingMethod {
message ThpDeviceProperties {
optional string internal_model = 1; // Internal model name e.g. "T2B1".
optional uint32 model_variant = 2; // Encodes the device properties such as color.
optional bool bootloader_mode = 3; // Indicates whether the device is in bootloader or firmware mode.
optional uint32 protocol_version = 4; // The communication protocol version supported by the firmware.
optional uint32 protocol_version_major = 3; // The major version of the communication protocol used by the firmware.
optional uint32 protocol_version_minor = 4; // The minor version of the communication protocol used by the firmware.
repeated ThpPairingMethod pairing_methods = 5; // The pairing methods supported by the Trezor.
}
@ -65,13 +65,11 @@ message ThpDeviceProperties {
*/
message ThpHandshakeCompletionReqNoisePayload {
optional bytes host_pairing_credential = 1; // Host's pairing credential
repeated ThpPairingMethod pairing_methods = 2; // The pairing methods chosen by the host
}
/**
* Request: Ask device for a new session with given passphrase.
* @start
* @next ThpNewSession
* @next Success
*/
message ThpCreateNewSession{
@ -80,31 +78,41 @@ message ThpCreateNewSession{
optional bool derive_cardano = 3; // If True, Cardano keys will be derived. Ignored with BTC-only
}
/**
* Response: Contains session_id of the newly created session.
* @end
*/
message ThpNewSession{
optional uint32 new_session_id = 1;
}
/**
* Request: Start pairing process.
* @start
* @next ThpCodeEntryCommitment
* @next ThpPairingPreparationsFinished
* @next ThpPairingRequestApproved
*/
message ThpStartPairingRequest{
optional string host_name = 1; // Human-readable host name
message ThpPairingRequest{
optional string host_name = 1; // Human-readable host name
}
/**
* Response: Host is allowed to start pairing process.
* @start
* @next ThpSelectMethod
*/
message ThpPairingRequestApproved{
}
/**
* Request: Start pairing using the method selected.
* @start
* @next ThpPairingPreparationsFinished
* @next ThpCodeEntryCommitment
*/
message ThpSelectMethod {
optional ThpPairingMethod selected_pairing_method = 1 [default=NFC];;
}
/**
* Response: Pairing is ready for user input / OOB communication.
* @next ThpCodeEntryCpace
* @next ThpQrCodeTag
* @next ThpNfcUnidirectionalTag
* @next ThpNfcTagHost
*/
message ThpPairingPreparationsFinished{
message ThpPairingPreparationsFinished{
}
/**
@ -117,34 +125,30 @@ message ThpCodeEntryCommitment {
/**
* Response: Host responds to Trezor's Code Entry commitment with a challenge.
* @next ThpPairingPreparationsFinished
*/
message ThpCodeEntryChallenge {
optional bytes challenge = 1; // host's random 32-byte challenge
}
/**
* Request: User selected Code Entry option in Host. Host starts CPACE protocol with Trezor.
* @next ThpCodeEntryCpaceTrezor
*/
message ThpCodeEntryCpaceHost {
optional bytes cpace_host_public_key = 1; // Host's ephemeral CPace public key
message ThpCodeEntryChallenge {
optional bytes challenge = 1; // Host's random 32-byte challenge
}
/**
* Response: Trezor continues with the CPACE protocol.
* @next ThpCodeEntryTag
* @next ThpCodeEntryCpaceHostTag
*/
message ThpCodeEntryCpaceTrezor {
optional bytes cpace_trezor_public_key = 1; // Trezor's ephemeral CPace public key
}
/**
* Response: Host continues with the CPACE protocol.
* Request: User selected Code Entry option in Host. Host starts CPACE protocol with Trezor.
* @next ThpCodeEntrySecret
*/
message ThpCodeEntryTag {
optional bytes tag = 2; // SHA-256 of shared secret
message ThpCodeEntryCpaceHostTag {
optional bytes cpace_host_public_key = 1; // Host's ephemeral CPace public key
optional bytes tag = 2; // SHA-256 of shared secret
}
/**
@ -175,10 +179,10 @@ message ThpQrCodeSecret {
/**
* Request: User selected Unidirectional NFC pairing option. Host sends an Unidirectional NFC Tag.
* @next ThpNfcUnidirectionalSecret
* @next ThpNfcTagTrezor
*/
message ThpNfcUnidirectionalTag {
optional bytes tag = 1; // SHA-256 of shared secret
message ThpNfcTagHost {
optional bytes tag = 1; // Host's tag
}
/**
@ -186,8 +190,8 @@ message ThpNfcUnidirectionalTag {
* @next ThpCredentialRequest
* @next ThpEndRequest
*/
message ThpNfcUnidirectionalSecret {
optional bytes secret = 1; // Trezor's secret
message ThpNfcTagTrezor {
optional bytes tag = 1; // Trezor's tag
}
/**
@ -197,6 +201,7 @@ message ThpNfcUnidirectionalSecret {
*/
message ThpCredentialRequest {
optional bytes host_static_pubkey = 1; // Host's static public key used in the handshake.
optional bool autoconnect = 2; // Whether host wants to autoconnect without user confirmation
}
/**
@ -229,6 +234,7 @@ message ThpEndResponse {}
message ThpCredentialMetadata {
option (internal_only) = true;
optional string host_name = 1; // Human-readable host name
optional bool autoconnect = 2; // Whether host is allowed to autoconnect without user confirmation
}
/**

View File

@ -252,7 +252,7 @@ if __debug__:
def _state(
thp_pairing_code_entry_code: int | None = None,
thp_pairing_code_qr_code: bytes | None = None,
thp_pairing_code_nfc_unidirectional: bytes | None = None,
thp_pairing_code_nfc: bytes | None = None,
) -> DebugLinkState:
from trezor.messages import DebugLinkState
@ -274,7 +274,7 @@ if __debug__:
tokens=tokens,
thp_pairing_code_entry_code=thp_pairing_code_entry_code,
thp_pairing_code_qr_code=thp_pairing_code_qr_code,
thp_pairing_code_nfc_unidirectional=thp_pairing_code_nfc_unidirectional,
thp_pairing_code_nfc=thp_pairing_code_nfc,
)
async def dispatch_DebugLinkGetState(
@ -283,7 +283,7 @@ if __debug__:
thp_pairing_code_entry_code: int | None = None
thp_pairing_code_qr_code: bytes | None = None
thp_pairing_code_nfc_unidirectional: bytes | None = None
thp_pairing_code_nfc: bytes | None = None
if utils.USE_THP and msg.thp_channel_id is not None:
channel_id = int.from_bytes(msg.thp_channel_id, "big")
@ -301,15 +301,15 @@ if __debug__:
if ctx is not None and isinstance(ctx, PairingContext):
thp_pairing_code_entry_code = ctx.display_data.code_code_entry
thp_pairing_code_qr_code = ctx.display_data.code_qr_code
thp_pairing_code_nfc_unidirectional = (
ctx.display_data.code_nfc_unidirectional
)
thp_pairing_code_nfc = ctx.display_data.code_nfc
# if msg.host_nfc_secret is not None:
# ctx.host_nfc_secret = msg.host_nfc_secret
if msg.wait_layout == DebugWaitType.IMMEDIATE:
return _state(
thp_pairing_code_entry_code,
thp_pairing_code_qr_code,
thp_pairing_code_nfc_unidirectional,
thp_pairing_code_nfc,
)
assert DEBUG_CONTEXT is not None
@ -324,7 +324,7 @@ if __debug__:
return _state(
thp_pairing_code_entry_code,
thp_pairing_code_qr_code,
thp_pairing_code_nfc_unidirectional,
thp_pairing_code_nfc,
)
async def dispatch_DebugLinkRecordScreen(msg: DebugLinkRecordScreen) -> Success:

View File

@ -2,30 +2,33 @@ from typing import TYPE_CHECKING
from ubinascii import hexlify
from trezor import loop, protobuf
from trezor.crypto import random
from trezor.crypto.hashlib import sha256
from trezor.enums import ThpMessageType, ThpPairingMethod
from trezor.messages import (
Cancel,
ThpCodeEntryChallenge,
ThpCodeEntryCommitment,
ThpCodeEntryCpaceHost,
ThpCodeEntryCpaceHostTag,
ThpCodeEntryCpaceTrezor,
ThpCodeEntrySecret,
ThpCodeEntryTag,
ThpCredentialMetadata,
ThpCredentialRequest,
ThpCredentialResponse,
ThpEndRequest,
ThpEndResponse,
ThpNfcUnidirectionalSecret,
ThpNfcUnidirectionalTag,
ThpNfcTagHost,
ThpNfcTagTrezor,
ThpPairingPreparationsFinished,
ThpPairingRequest,
ThpQrCodeSecret,
ThpQrCodeTag,
ThpStartPairingRequest,
ThpSelectMethod,
)
from trezor.wire import message_handler
from trezor.wire.context import UnexpectedMessageException
from trezor.wire.errors import ActionCancelled, SilentError, UnexpectedMessage
from trezor.wire.thp import ChannelState, ThpError, crypto
from trezor.wire.thp import ChannelState, ThpError, crypto, get_enabled_pairing_methods
from trezor.wire.thp.pairing_context import PairingContext
from .credential_manager import issue_credential
@ -64,12 +67,13 @@ def check_state_and_log(
return decorator
def check_method_is_allowed(
def check_method_is_allowed_and_selected(
pairing_method: ThpPairingMethod,
) -> Callable[[FuncWithContext], FuncWithContext]:
def decorator(f: FuncWithContext) -> FuncWithContext:
def inner(context: PairingContext, *args: P.args, **kwargs: P.kwargs) -> object:
_check_method_is_allowed(context, pairing_method)
_check_method_is_selected(context, pairing_method)
return f(context, *args, **kwargs)
return inner
@ -81,31 +85,58 @@ def check_method_is_allowed(
# Pairing handlers
@check_state_and_log(ChannelState.TP1)
@check_state_and_log(ChannelState.TP0)
async def handle_pairing_request(
ctx: PairingContext, message: protobuf.MessageType
) -> ThpEndResponse:
if not ThpStartPairingRequest.is_type_of(message):
if not ThpPairingRequest.is_type_of(message):
raise UnexpectedMessage("Unexpected message")
ctx.host_name = message.host_name or ""
skip_pairing = _is_method_included(ctx, ThpPairingMethod.NoMethod)
if skip_pairing:
return await _end_pairing(ctx)
await _prepare_pairing(ctx)
await ctx.write(ThpPairingPreparationsFinished())
ctx.channel_ctx.set_channel_state(ChannelState.TP3)
response = await show_display_data(
ctx, _get_possible_pairing_methods_and_cancel(ctx)
await ctx.show_pairing_dialogue()
assert ThpSelectMethod.MESSAGE_WIRE_TYPE is not None
select_method_msg = await ctx.read(
[
ThpSelectMethod.MESSAGE_WIRE_TYPE,
]
)
if Cancel.is_type_of(response):
ctx.channel_ctx.clear()
raise SilentError("Action was cancelled by the Host")
# TODO disable NFC (if enabled)
assert ThpSelectMethod.is_type_of(select_method_msg)
assert select_method_msg.selected_pairing_method is not None
ctx.set_selected_method(select_method_msg.selected_pairing_method)
if ctx.selected_method == ThpPairingMethod.SkipPairing:
return await _end_pairing(ctx)
while True:
await _prepare_pairing(ctx)
ctx.channel_ctx.set_channel_state(ChannelState.TP3)
try:
response = await ctx.show_pairing_method_screen()
except UnexpectedMessageException as e:
raw_response = e.msg
name = message_handler.get_msg_name(raw_response.type)
if name is None:
req_type = protobuf.type_for_wire(raw_response.type)
else:
req_type = protobuf.type_for_name(name)
response = message_handler.wrap_protobuf_load(raw_response.data, req_type)
if Cancel.is_type_of(response):
ctx.channel_ctx.clear()
raise SilentError("Action was cancelled by the Host")
if ThpSelectMethod.is_type_of(response):
assert response.selected_pairing_method is not None
ctx.set_selected_method(response.selected_pairing_method)
ctx.channel_ctx.set_channel_state(ChannelState.TP1)
else:
break
response = await _handle_different_pairing_methods(ctx, response)
while ThpCredentialRequest.is_type_of(response):
@ -115,15 +146,16 @@ async def handle_pairing_request(
async def _prepare_pairing(ctx: PairingContext) -> None:
ctx.channel_ctx.set_channel_state(ChannelState.TP1)
if _is_method_included(ctx, ThpPairingMethod.CodeEntry):
await _handle_code_entry_is_included(ctx)
if _is_method_included(ctx, ThpPairingMethod.QrCode):
_handle_qr_code_is_included(ctx)
if _is_method_included(ctx, ThpPairingMethod.NFC_Unidirectional):
_handle_nfc_unidirectional_is_included(ctx)
if ctx.selected_method == ThpPairingMethod.CodeEntry:
await _handle_code_entry_is_selected(ctx)
elif ctx.selected_method == ThpPairingMethod.NFC:
await _handle_nfc_is_selected(ctx)
elif ctx.selected_method == ThpPairingMethod.QrCode:
await _handle_qr_code_is_selected(ctx)
else:
raise Exception() # TODO unknown pairing method
async def show_display_data(
@ -143,10 +175,20 @@ async def show_display_data(
@check_state_and_log(ChannelState.TP1)
async def _handle_code_entry_is_included(ctx: PairingContext) -> None:
commitment = sha256(ctx.secret).digest()
async def _handle_code_entry_is_selected(ctx: PairingContext) -> None:
if ctx.code_entry_secret is None:
await _handle_code_entry_is_selected_first_time(ctx)
else:
await ctx.write_force(ThpPairingPreparationsFinished())
challenge_message = await ctx.call( # noqa: F841
async def _handle_code_entry_is_selected_first_time(ctx: PairingContext) -> None:
from trezor.wire.thp.cpace import Cpace
ctx.code_entry_secret = random.bytes(16)
commitment = sha256(ctx.code_entry_secret).digest()
challenge_message = await ctx.call(
ThpCodeEntryCommitment(commitment=commitment), ThpCodeEntryChallenge
)
ctx.channel_ctx.set_channel_state(ChannelState.TP2)
@ -157,84 +199,76 @@ async def _handle_code_entry_is_included(ctx: PairingContext) -> None:
if challenge_message.challenge is None:
raise Exception("Invalid message")
sha_ctx = sha256(ctx.channel_ctx.get_handshake_hash())
sha_ctx.update(ctx.secret)
sha_ctx.update(ctx.code_entry_secret)
sha_ctx.update(challenge_message.challenge)
sha_ctx.update(bytes("PairingMethod_CodeEntry", "utf-8"))
code_code_entry_hash = sha_ctx.digest()
ctx.display_data.code_code_entry = (
int.from_bytes(code_code_entry_hash, "big") % 1000000
)
@check_state_and_log(ChannelState.TP1, ChannelState.TP2)
def _handle_qr_code_is_included(ctx: PairingContext) -> None:
sha_ctx = sha256(ctx.channel_ctx.get_handshake_hash())
sha_ctx.update(ctx.secret)
sha_ctx.update(bytes("PairingMethod_QrCode", "utf-8"))
ctx.display_data.code_qr_code = sha_ctx.digest()[:16]
@check_state_and_log(ChannelState.TP1, ChannelState.TP2)
def _handle_nfc_unidirectional_is_included(ctx: PairingContext) -> None:
sha_ctx = sha256(ctx.channel_ctx.get_handshake_hash())
sha_ctx.update(ctx.secret)
sha_ctx.update(bytes("PairingMethod_NfcUnidirectional", "utf-8"))
ctx.display_data.code_nfc_unidirectional = sha_ctx.digest()[:16]
@check_state_and_log(ChannelState.TP3)
async def _handle_different_pairing_methods(
ctx: PairingContext, response: protobuf.MessageType
) -> protobuf.MessageType:
if ThpCodeEntryCpaceHost.is_type_of(response):
return await _handle_code_entry_cpace(ctx, response)
if ThpQrCodeTag.is_type_of(response):
return await _handle_qr_code_tag(ctx, response)
if ThpNfcUnidirectionalTag.is_type_of(response):
return await _handle_nfc_unidirectional_tag(ctx, response)
raise UnexpectedMessage("Unexpected message")
@check_state_and_log(ChannelState.TP3)
@check_method_is_allowed(ThpPairingMethod.CodeEntry)
async def _handle_code_entry_cpace(
ctx: PairingContext, message: protobuf.MessageType
) -> protobuf.MessageType:
from trezor.wire.thp.cpace import Cpace
# TODO check that ThpCodeEntryCpaceHost message is valid
if TYPE_CHECKING:
assert isinstance(message, ThpCodeEntryCpaceHost)
if message.cpace_host_public_key is None:
raise ThpError("Message ThpCodeEntryCpaceHost has no public key")
ctx.cpace = Cpace(
message.cpace_host_public_key,
ctx.channel_ctx.get_handshake_hash(),
)
assert ctx.display_data.code_code_entry is not None
ctx.cpace.generate_keys_and_secret(
ctx.display_data.code_code_entry.to_bytes(6, "big")
)
ctx.channel_ctx.set_channel_state(ChannelState.TP4)
response = await ctx.call(
ThpCodeEntryCpaceTrezor(cpace_trezor_public_key=ctx.cpace.trezor_public_key),
ThpCodeEntryTag,
await ctx.write_force(
ThpCodeEntryCpaceTrezor(cpace_trezor_public_key=ctx.cpace.trezor_public_key)
)
return await _handle_code_entry_tag(ctx, response)
@check_state_and_log(ChannelState.TP4)
@check_method_is_allowed(ThpPairingMethod.CodeEntry)
async def _handle_code_entry_tag(
@check_state_and_log(ChannelState.TP1)
async def _handle_nfc_is_selected(ctx: PairingContext) -> None:
ctx.nfc_secret = random.bytes(16)
sha_ctx = sha256(ctx.channel_ctx.get_handshake_hash())
sha_ctx.update(ctx.nfc_secret)
sha_ctx.update(bytes("PairingMethod_NfcUnidirectional", "utf-8"))
ctx.display_data.code_nfc = sha_ctx.digest()[:16]
await ctx.write_force(ThpPairingPreparationsFinished())
@check_state_and_log(ChannelState.TP1)
async def _handle_qr_code_is_selected(ctx: PairingContext) -> None:
ctx.qr_code_secret = random.bytes(16)
sha_ctx = sha256(ThpPairingMethod.QrCode.to_bytes(1, "big"))
sha_ctx.update(ctx.channel_ctx.get_handshake_hash())
sha_ctx.update(ctx.qr_code_secret)
ctx.display_data.code_qr_code = sha_ctx.digest()[:16]
await ctx.write_force(ThpPairingPreparationsFinished())
@check_state_and_log(ChannelState.TP3)
async def _handle_different_pairing_methods(
ctx: PairingContext, response: protobuf.MessageType
) -> protobuf.MessageType:
if ThpCodeEntryCpaceHostTag.is_type_of(response):
return await _handle_code_entry_cpace(ctx, response)
if ThpQrCodeTag.is_type_of(response):
return await _handle_qr_code_tag(ctx, response)
if ThpNfcTagHost.is_type_of(response):
return await _handle_nfc_tag(ctx, response)
raise UnexpectedMessage("Unexpected message" + str(response))
@check_state_and_log(ChannelState.TP3)
@check_method_is_allowed_and_selected(ThpPairingMethod.CodeEntry)
async def _handle_code_entry_cpace(
ctx: PairingContext, message: protobuf.MessageType
) -> protobuf.MessageType:
if TYPE_CHECKING:
assert isinstance(message, ThpCodeEntryTag)
assert ThpCodeEntryCpaceHostTag.is_type_of(message)
if message.cpace_host_public_key is None:
raise ThpError(
"Message ThpCodeEntryCpaceHostTag is missing cpace_host_public_key"
)
if message.tag is None:
raise ThpError("Message ThpCodeEntryCpaceHostTag is missing tag")
ctx.cpace.compute_shared_secret(message.cpace_host_public_key)
expected_tag = sha256(ctx.cpace.shared_secret).digest()
if expected_tag != message.tag:
print(
@ -248,56 +282,69 @@ async def _handle_code_entry_tag(
return await _handle_secret_reveal(
ctx,
msg=ThpCodeEntrySecret(secret=ctx.secret),
msg=ThpCodeEntrySecret(secret=ctx.code_entry_secret),
)
@check_state_and_log(ChannelState.TP3)
@check_method_is_allowed(ThpPairingMethod.QrCode)
@check_method_is_allowed_and_selected(ThpPairingMethod.QrCode)
async def _handle_qr_code_tag(
ctx: PairingContext, message: protobuf.MessageType
) -> protobuf.MessageType:
if TYPE_CHECKING:
assert isinstance(message, ThpQrCodeTag)
assert ctx.display_data.code_qr_code is not None
expected_tag = sha256(ctx.display_data.code_qr_code).digest()
sha_ctx = sha256(ctx.channel_ctx.get_handshake_hash())
sha_ctx.update(ctx.display_data.code_qr_code)
expected_tag = sha_ctx.digest()
if expected_tag != message.tag:
print(
"expected qr code tag:", hexlify(expected_tag).decode()
) # TODO remove after testing
print(
"expected code qr code tag:",
"expected handshake hash:",
hexlify(ctx.channel_ctx.get_handshake_hash()).decode(),
) # TODO remove after testing
print(
"expected code qr code:",
hexlify(ctx.display_data.code_qr_code).decode(),
) # TODO remove after testing
print(
"expected secret:", hexlify(ctx.secret).decode()
"expected secret:", hexlify(ctx.qr_code_secret).decode()
) # TODO remove after testing
raise ThpError("Unexpected QR Code Tag")
return await _handle_secret_reveal(
ctx,
msg=ThpQrCodeSecret(secret=ctx.secret),
msg=ThpQrCodeSecret(secret=ctx.qr_code_secret),
)
@check_state_and_log(ChannelState.TP3)
@check_method_is_allowed(ThpPairingMethod.NFC_Unidirectional)
async def _handle_nfc_unidirectional_tag(
@check_method_is_allowed_and_selected(ThpPairingMethod.NFC)
async def _handle_nfc_tag(
ctx: PairingContext, message: protobuf.MessageType
) -> protobuf.MessageType:
if TYPE_CHECKING:
assert isinstance(message, ThpNfcUnidirectionalTag)
expected_tag = sha256(ctx.display_data.code_nfc_unidirectional).digest()
assert isinstance(message, ThpNfcTagHost)
sha_ctx = sha256(ThpPairingMethod.NFC.to_bytes(1, "big"))
sha_ctx.update(ctx.channel_ctx.get_handshake_hash())
sha_ctx.update(ctx.nfc_secret)
expected_tag = sha_ctx.digest()
if expected_tag != message.tag:
print(
"expected nfc tag:", hexlify(expected_tag).decode()
) # TODO remove after testing
raise ThpError("Unexpected NFC Unidirectional Tag")
sha_ctx = sha256(ThpPairingMethod.NFC.to_bytes(1, "big"))
sha_ctx.update(ctx.channel_ctx.get_handshake_hash())
# TODO add Host's secret from NFC message transferred over NFC
# sha_ctx.update(host's secret)
trezor_tag = sha_ctx.digest()
return await _handle_secret_reveal(
ctx,
msg=ThpNfcUnidirectionalSecret(secret=ctx.secret),
msg=ThpNfcTagTrezor(tag=trezor_tag),
)
@ -318,7 +365,6 @@ async def _handle_secret_reveal(
async def _handle_credential_request(
ctx: PairingContext, message: protobuf.MessageType
) -> protobuf.MessageType:
ctx.secret
if not ThpCredentialRequest.is_type_of(message):
raise UnexpectedMessage("Unexpected message")
@ -362,42 +408,43 @@ def _check_state(ctx: PairingContext, *allowed_states: ChannelState) -> None:
def _check_method_is_allowed(ctx: PairingContext, method: ThpPairingMethod) -> None:
if not _is_method_included(ctx, method):
if method not in get_enabled_pairing_methods(ctx.iface):
raise ThpError("Unexpected pairing method")
def _is_method_included(ctx: PairingContext, method: ThpPairingMethod) -> bool:
return method in ctx.channel_ctx.selected_pairing_methods
def _check_method_is_selected(ctx: PairingContext, method: ThpPairingMethod) -> None:
if method is not ctx.selected_method:
raise ThpError("Not selected pairing method")
#
# Helpers - getters
def _get_possible_pairing_methods_and_cancel(ctx: PairingContext) -> Tuple[int, ...]:
def _get_accepted_messages(ctx: PairingContext) -> Tuple[int, ...]:
r = _get_possible_pairing_methods(ctx)
mtype = Cancel.MESSAGE_WIRE_TYPE
return r + ((mtype,) if mtype is not None else ())
r += (mtype,) if mtype is not None else ()
mtype = ThpSelectMethod.MESSAGE_WIRE_TYPE
r += (mtype,) if mtype is not None else ()
return r
def _get_possible_pairing_methods(ctx: PairingContext) -> Tuple[int, ...]:
r = tuple(
_get_message_type_for_method(method)
for method in ctx.channel_ctx.selected_pairing_methods
[
_get_message_type_for_method(ctx.selected_method),
]
)
if __debug__:
from trezor.messages import DebugLinkGetState
mtype = DebugLinkGetState.MESSAGE_WIRE_TYPE
return r + ((mtype,) if mtype is not None else ())
return r
def _get_message_type_for_method(method: int) -> int:
if method is ThpPairingMethod.CodeEntry:
return ThpMessageType.ThpCodeEntryCpaceHost
if method is ThpPairingMethod.NFC_Unidirectional:
return ThpMessageType.ThpNfcUnidirectionalTag
return ThpMessageType.ThpCodeEntryCpaceHostTag
if method is ThpPairingMethod.NFC:
return ThpMessageType.ThpNfcTagHost
if method is ThpPairingMethod.QrCode:
return ThpMessageType.ThpQrCodeTag
raise ValueError("Unexpected pairing method - no message type available")

View File

@ -3,8 +3,9 @@
# isort:skip_file
ThpCreateNewSession = 1000
ThpNewSession = 1001
ThpStartPairingRequest = 1008
ThpPairingRequest = 1006
ThpPairingRequestApproved = 1007
ThpSelectMethod = 1008
ThpPairingPreparationsFinished = 1009
ThpCredentialRequest = 1010
ThpCredentialResponse = 1011
@ -12,11 +13,10 @@ ThpEndRequest = 1012
ThpEndResponse = 1013
ThpCodeEntryCommitment = 1016
ThpCodeEntryChallenge = 1017
ThpCodeEntryCpaceHost = 1018
ThpCodeEntryCpaceTrezor = 1019
ThpCodeEntryTag = 1020
ThpCodeEntrySecret = 1021
ThpCodeEntryCpaceTrezor = 1018
ThpCodeEntryCpaceHostTag = 1019
ThpCodeEntrySecret = 1020
ThpQrCodeTag = 1024
ThpQrCodeSecret = 1025
ThpNfcUnidirectionalTag = 1032
ThpNfcUnidirectionalSecret = 1033
ThpNfcTagHost = 1032
ThpNfcTagTrezor = 1033

View File

@ -2,7 +2,7 @@
# fmt: off
# isort:skip_file
NoMethod = 1
SkipPairing = 1
CodeEntry = 2
QrCode = 3
NFC_Unidirectional = 4
NFC = 4

View File

@ -353,8 +353,9 @@ if TYPE_CHECKING:
class ThpMessageType(IntEnum):
ThpCreateNewSession = 1000
ThpNewSession = 1001
ThpStartPairingRequest = 1008
ThpPairingRequest = 1006
ThpPairingRequestApproved = 1007
ThpSelectMethod = 1008
ThpPairingPreparationsFinished = 1009
ThpCredentialRequest = 1010
ThpCredentialResponse = 1011
@ -362,20 +363,19 @@ if TYPE_CHECKING:
ThpEndResponse = 1013
ThpCodeEntryCommitment = 1016
ThpCodeEntryChallenge = 1017
ThpCodeEntryCpaceHost = 1018
ThpCodeEntryCpaceTrezor = 1019
ThpCodeEntryTag = 1020
ThpCodeEntrySecret = 1021
ThpCodeEntryCpaceTrezor = 1018
ThpCodeEntryCpaceHostTag = 1019
ThpCodeEntrySecret = 1020
ThpQrCodeTag = 1024
ThpQrCodeSecret = 1025
ThpNfcUnidirectionalTag = 1032
ThpNfcUnidirectionalSecret = 1033
ThpNfcTagHost = 1032
ThpNfcTagTrezor = 1033
class ThpPairingMethod(IntEnum):
NoMethod = 1
SkipPairing = 1
CodeEntry = 2
QrCode = 3
NFC_Unidirectional = 4
NFC = 4
class MessageType(IntEnum):
Initialize = 0

View File

@ -2930,7 +2930,7 @@ if TYPE_CHECKING:
tokens: "list[str]"
thp_pairing_code_entry_code: "int | None"
thp_pairing_code_qr_code: "bytes | None"
thp_pairing_code_nfc_unidirectional: "bytes | None"
thp_pairing_code_nfc: "bytes | None"
def __init__(
self,
@ -2950,7 +2950,7 @@ if TYPE_CHECKING:
mnemonic_type: "BackupType | None" = None,
thp_pairing_code_entry_code: "int | None" = None,
thp_pairing_code_qr_code: "bytes | None" = None,
thp_pairing_code_nfc_unidirectional: "bytes | None" = None,
thp_pairing_code_nfc: "bytes | None" = None,
) -> None:
pass
@ -6175,8 +6175,8 @@ if TYPE_CHECKING:
class ThpDeviceProperties(protobuf.MessageType):
internal_model: "str | None"
model_variant: "int | None"
bootloader_mode: "bool | None"
protocol_version: "int | None"
protocol_version_major: "int | None"
protocol_version_minor: "int | None"
pairing_methods: "list[ThpPairingMethod]"
def __init__(
@ -6185,8 +6185,8 @@ if TYPE_CHECKING:
pairing_methods: "list[ThpPairingMethod] | None" = None,
internal_model: "str | None" = None,
model_variant: "int | None" = None,
bootloader_mode: "bool | None" = None,
protocol_version: "int | None" = None,
protocol_version_major: "int | None" = None,
protocol_version_minor: "int | None" = None,
) -> None:
pass
@ -6196,12 +6196,10 @@ if TYPE_CHECKING:
class ThpHandshakeCompletionReqNoisePayload(protobuf.MessageType):
host_pairing_credential: "bytes | None"
pairing_methods: "list[ThpPairingMethod]"
def __init__(
self,
*,
pairing_methods: "list[ThpPairingMethod] | None" = None,
host_pairing_credential: "bytes | None" = None,
) -> None:
pass
@ -6228,21 +6226,7 @@ if TYPE_CHECKING:
def is_type_of(cls, msg: Any) -> TypeGuard["ThpCreateNewSession"]:
return isinstance(msg, cls)
class ThpNewSession(protobuf.MessageType):
new_session_id: "int | None"
def __init__(
self,
*,
new_session_id: "int | None" = None,
) -> None:
pass
@classmethod
def is_type_of(cls, msg: Any) -> TypeGuard["ThpNewSession"]:
return isinstance(msg, cls)
class ThpStartPairingRequest(protobuf.MessageType):
class ThpPairingRequest(protobuf.MessageType):
host_name: "str | None"
def __init__(
@ -6253,7 +6237,27 @@ if TYPE_CHECKING:
pass
@classmethod
def is_type_of(cls, msg: Any) -> TypeGuard["ThpStartPairingRequest"]:
def is_type_of(cls, msg: Any) -> TypeGuard["ThpPairingRequest"]:
return isinstance(msg, cls)
class ThpPairingRequestApproved(protobuf.MessageType):
@classmethod
def is_type_of(cls, msg: Any) -> TypeGuard["ThpPairingRequestApproved"]:
return isinstance(msg, cls)
class ThpSelectMethod(protobuf.MessageType):
selected_pairing_method: "ThpPairingMethod"
def __init__(
self,
*,
selected_pairing_method: "ThpPairingMethod | None" = None,
) -> None:
pass
@classmethod
def is_type_of(cls, msg: Any) -> TypeGuard["ThpSelectMethod"]:
return isinstance(msg, cls)
class ThpPairingPreparationsFinished(protobuf.MessageType):
@ -6290,20 +6294,6 @@ if TYPE_CHECKING:
def is_type_of(cls, msg: Any) -> TypeGuard["ThpCodeEntryChallenge"]:
return isinstance(msg, cls)
class ThpCodeEntryCpaceHost(protobuf.MessageType):
cpace_host_public_key: "bytes | None"
def __init__(
self,
*,
cpace_host_public_key: "bytes | None" = None,
) -> None:
pass
@classmethod
def is_type_of(cls, msg: Any) -> TypeGuard["ThpCodeEntryCpaceHost"]:
return isinstance(msg, cls)
class ThpCodeEntryCpaceTrezor(protobuf.MessageType):
cpace_trezor_public_key: "bytes | None"
@ -6318,18 +6308,20 @@ if TYPE_CHECKING:
def is_type_of(cls, msg: Any) -> TypeGuard["ThpCodeEntryCpaceTrezor"]:
return isinstance(msg, cls)
class ThpCodeEntryTag(protobuf.MessageType):
class ThpCodeEntryCpaceHostTag(protobuf.MessageType):
cpace_host_public_key: "bytes | None"
tag: "bytes | None"
def __init__(
self,
*,
cpace_host_public_key: "bytes | None" = None,
tag: "bytes | None" = None,
) -> None:
pass
@classmethod
def is_type_of(cls, msg: Any) -> TypeGuard["ThpCodeEntryTag"]:
def is_type_of(cls, msg: Any) -> TypeGuard["ThpCodeEntryCpaceHostTag"]:
return isinstance(msg, cls)
class ThpCodeEntrySecret(protobuf.MessageType):
@ -6374,7 +6366,7 @@ if TYPE_CHECKING:
def is_type_of(cls, msg: Any) -> TypeGuard["ThpQrCodeSecret"]:
return isinstance(msg, cls)
class ThpNfcUnidirectionalTag(protobuf.MessageType):
class ThpNfcTagHost(protobuf.MessageType):
tag: "bytes | None"
def __init__(
@ -6385,30 +6377,32 @@ if TYPE_CHECKING:
pass
@classmethod
def is_type_of(cls, msg: Any) -> TypeGuard["ThpNfcUnidirectionalTag"]:
def is_type_of(cls, msg: Any) -> TypeGuard["ThpNfcTagHost"]:
return isinstance(msg, cls)
class ThpNfcUnidirectionalSecret(protobuf.MessageType):
secret: "bytes | None"
class ThpNfcTagTrezor(protobuf.MessageType):
tag: "bytes | None"
def __init__(
self,
*,
secret: "bytes | None" = None,
tag: "bytes | None" = None,
) -> None:
pass
@classmethod
def is_type_of(cls, msg: Any) -> TypeGuard["ThpNfcUnidirectionalSecret"]:
def is_type_of(cls, msg: Any) -> TypeGuard["ThpNfcTagTrezor"]:
return isinstance(msg, cls)
class ThpCredentialRequest(protobuf.MessageType):
host_static_pubkey: "bytes | None"
autoconnect: "bool | None"
def __init__(
self,
*,
host_static_pubkey: "bytes | None" = None,
autoconnect: "bool | None" = None,
) -> None:
pass
@ -6446,11 +6440,13 @@ if TYPE_CHECKING:
class ThpCredentialMetadata(protobuf.MessageType):
host_name: "str | None"
autoconnect: "bool | None"
def __init__(
self,
*,
host_name: "str | None" = None,
autoconnect: "bool | None" = None,
) -> None:
pass

View File

@ -66,13 +66,14 @@ class ChannelState(IntEnum):
UNALLOCATED = 0
TH1 = 1
TH2 = 2
TP1 = 3
TP2 = 4
TP3 = 5
TP4 = 6
TC1 = 7
ENCRYPTED_TRANSPORT = 8
INVALIDATED = 9
TP0 = 3
TP1 = 4
TP2 = 5
TP3 = 6
TP4 = 7
TC1 = 8
ENCRYPTED_TRANSPORT = 9
INVALIDATED = 10
class SessionState(IntEnum):
@ -134,7 +135,7 @@ class PacketHeader:
_DEFAULT_ENABLED_PAIRING_METHODS = [
ThpPairingMethod.CodeEntry,
ThpPairingMethod.QrCode,
ThpPairingMethod.NFC_Unidirectional,
ThpPairingMethod.NFC,
]
@ -149,7 +150,7 @@ def get_enabled_pairing_methods(
methods = _DEFAULT_ENABLED_PAIRING_METHODS.copy()
if iface is not None and iface is usb.iface_wire:
methods.append(ThpPairingMethod.NoMethod)
methods.append(ThpPairingMethod.SkipPairing)
return methods
@ -159,8 +160,8 @@ def _get_device_properties(iface: WireInterface) -> ThpDeviceProperties:
pairing_methods=get_enabled_pairing_methods(iface),
internal_model=utils.INTERNAL_MODEL,
model_variant=0,
bootloader_mode=False,
protocol_version=2,
protocol_version_major=2,
protocol_version_minor=0,
)

View File

@ -69,7 +69,6 @@ class Channel:
self.bytes_read: int = 0
self.expected_payload_length: int = 0
self.is_cont_packet_expected: bool = False
self.selected_pairing_methods = []
self.sessions: dict[int, GenericSessionContext] = {}
# Objects for writing a message to a wire

View File

@ -11,9 +11,8 @@ class Cpace:
CPace, a balanced composable PAKE: https://datatracker.ietf.org/doc/draft-irtf-cfrg-cpace/
"""
def __init__(self, cpace_host_public_key: bytes, handshake_hash: bytes) -> None:
def __init__(self, handshake_hash: bytes) -> None:
self.handshake_hash: bytes = handshake_hash
self.host_public_key: bytes = cpace_host_public_key
self.shared_secret: bytes
self.trezor_private_key: bytes
self.trezor_public_key: bytes
@ -31,6 +30,8 @@ class Cpace:
generator = elligator2.map_to_curve25519(pregenerator)
self.trezor_private_key = random.bytes(32)
self.trezor_public_key = curve25519.multiply(self.trezor_private_key, generator)
def compute_shared_secret(self, host_public_key: bytes) -> None:
self.shared_secret = curve25519.multiply(
self.trezor_private_key, self.host_public_key
self.trezor_private_key, host_public_key
)

View File

@ -3,16 +3,19 @@ from ubinascii import hexlify
import trezorui_api
from trezor import loop, protobuf, workflow
from trezor.crypto import random
from trezor.enums import ButtonRequestType
from trezor.wire import context, message_handler, protocol_common
from trezor.wire.context import UnexpectedMessageException
from trezor.wire.errors import ActionCancelled, SilentError
from trezor.wire.protocol_common import Context, Message
from trezor.wire.thp import get_enabled_pairing_methods
if TYPE_CHECKING:
from typing import Container
from typing import Awaitable, Container
from trezor import ui
from trezor.enums import ThpPairingMethod
from trezorui_api import UiResult
from .channel import Channel
from .cpace import Cpace
@ -28,7 +31,7 @@ class PairingDisplayData:
def __init__(self) -> None:
self.code_code_entry: int | None = None
self.code_qr_code: bytes | None = None
self.code_nfc_unidirectional: bytes | None = None
self.code_nfc: bytes | None = None
def get_display_layout(self) -> ui.Layout:
from trezor import ui
@ -37,9 +40,9 @@ class PairingDisplayData:
qr_str = ""
code_str = ""
if self.code_qr_code is not None:
qr_str = self._get_code_qr_code_str()
qr_str = self.get_code_qr_code_str()
if self.code_code_entry is not None:
code_str = self._get_code_code_entry_str()
code_str = self.get_code_code_entry_str()
return ui.Layout(
trezorui_api.show_address_details( # noqa
@ -53,7 +56,7 @@ class PairingDisplayData:
)
)
def _get_code_code_entry_str(self) -> str:
def get_code_code_entry_str(self) -> str:
if self.code_code_entry is not None:
code_str = f"{self.code_code_entry:06}"
if __debug__:
@ -62,7 +65,7 @@ class PairingDisplayData:
return code_str[:3] + " " + code_str[3:]
raise Exception("Code entry string is not available")
def _get_code_qr_code_str(self) -> str:
def get_code_qr_code_str(self) -> str:
if self.code_qr_code is not None:
code_str = (hexlify(self.code_qr_code)).decode("utf-8")
if __debug__:
@ -77,20 +80,16 @@ class PairingContext(Context):
super().__init__(channel_ctx.iface, channel_ctx.channel_id)
self.channel_ctx: Channel = channel_ctx
self.incoming_message = loop.mailbox()
self.secret: bytes = random.bytes(16)
self.nfc_secret: bytes
self.qr_code_secret: bytes
self.code_entry_secret: bytes | None = None
self.selected_method: ThpPairingMethod
self.display_data: PairingDisplayData = PairingDisplayData()
self.cpace: Cpace
self.host_name: str
async def handle(self, is_debug_session: bool = False) -> None:
# if __debug__:
# log.debug(__name__, "handle - start")
# if is_debug_session:
# import apps.debug
# apps.debug.DEBUG_CONTEXT = self
async def handle(self) -> None:
next_message: Message | None = None
while True:
@ -168,12 +167,17 @@ class PairingContext(Context):
async def write(self, msg: protobuf.MessageType) -> None:
return await self.channel_ctx.write(msg)
def write_force(self, msg: protobuf.MessageType) -> Awaitable[None]:
return self.channel_ctx.write(msg, force=True)
async def call(
self, msg: protobuf.MessageType, expected_type: type[protobuf.MessageType]
) -> protobuf.MessageType:
expected_wire_type = message_handler.get_msg_type(expected_type.MESSAGE_NAME)
expected_wire_type = expected_type.MESSAGE_WIRE_TYPE
if expected_wire_type is None:
expected_wire_type = expected_type.MESSAGE_WIRE_TYPE
expected_wire_type = message_handler.get_msg_type(
expected_type.MESSAGE_NAME
)
assert expected_wire_type is not None
@ -189,6 +193,71 @@ class PairingContext(Context):
del msg
return await self.read(expected_types)
def set_selected_method(self, selected_method: ThpPairingMethod) -> None:
if selected_method not in get_enabled_pairing_methods(self.iface):
raise Exception("Not allowed to set this method")
self.selected_method = selected_method
async def show_pairing_dialogue(self) -> None:
from trezor.messages import ThpPairingRequestApproved
from trezor.ui.layouts.common import interact
result = await interact(
trezorui_api.confirm_action(
title="Pairing dialogue",
action="Do you want to start pairing?",
description="Choose wisely!",
),
br_name="pairing_request",
br_code=ButtonRequestType.Other,
)
if result == trezorui_api.CONFIRMED:
await self.write(ThpPairingRequestApproved())
async def show_pairing_method_screen(
self, selected_method: ThpPairingMethod | None = None
) -> UiResult:
from trezor.enums import ThpPairingMethod
from trezor.ui.layouts.common import interact
if selected_method is None:
selected_method = self.selected_method
if selected_method is ThpPairingMethod.CodeEntry:
result = await interact(
trezorui_api.show_simple(
title="Copy the following",
text=self.display_data.get_code_code_entry_str(),
),
br_name="pairing_code_entry",
br_code=ButtonRequestType.Other,
)
elif selected_method is ThpPairingMethod.QrCode:
result = await interact(
trezorui_api.show_address_details( # noqa
qr_title="Scan QR code to pair",
address=self.display_data.get_code_qr_code_str(),
case_sensitive=True,
details_title="",
account="",
path="",
xpubs=[],
),
br_name="pairing_qr_code",
br_code=ButtonRequestType.Other,
)
elif selected_method is ThpPairingMethod.NFC:
result = await interact(
trezorui_api.show_simple(
title="NFC Pairing",
text="Move your device close to Trezor",
),
br_name="pairing_nfc",
br_code=ButtonRequestType.Other,
)
else:
raise Exception("Unknown pairing method")
return result
async def handle_pairing_request_message(
pairing_ctx: PairingContext,

View File

@ -38,13 +38,7 @@ from . import (
ThpUnallocatedSessionError,
)
from . import alternating_bit_protocol as ABP
from . import (
checksum,
control_byte,
get_enabled_pairing_methods,
get_encoded_device_properties,
session_manager,
)
from . import checksum, control_byte, get_encoded_device_properties, session_manager
from .checksum import CHECKSUM_LENGTH
from .crypto import PUBKEY_LENGTH, Handshake
from .session_context import SeedlessSessionContext
@ -315,12 +309,7 @@ async def _handle_state_TH2(ctx: Channel, message_length: int, ctrl_byte: int) -
)
if TYPE_CHECKING:
assert ThpHandshakeCompletionReqNoisePayload.is_type_of(noise_payload)
enabled_methods = get_enabled_pairing_methods(ctx.iface)
for method in noise_payload.pairing_methods:
if method not in enabled_methods:
raise ThpInvalidDataError()
if method not in ctx.selected_pairing_methods:
ctx.selected_pairing_methods.append(method)
if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug(
__name__,
@ -357,9 +346,9 @@ async def _handle_state_TH2(ctx: Channel, message_length: int, ctrl_byte: int) -
ctx.handshake = None
if paired:
ctx.set_channel_state(ChannelState.ENCRYPTED_TRANSPORT)
ctx.set_channel_state(ChannelState.TC1)
else:
ctx.set_channel_state(ChannelState.TP1)
ctx.set_channel_state(ChannelState.TP0)
async def _handle_state_ENCRYPTED_TRANSPORT(ctx: Channel, message_length: int) -> None:
@ -457,6 +446,7 @@ def _decode_message(
def _is_channel_state_pairing(state: int) -> bool:
if state in (
ChannelState.TP0,
ChannelState.TP1,
ChannelState.TP2,
ChannelState.TP3,

View File

@ -27,7 +27,7 @@ if utils.USE_THP:
ThpCodeEntryTag,
ThpCredentialRequest,
ThpEndRequest,
ThpStartPairingRequest,
ThpPairingRequest,
)
from trezor.wire.thp import (
ChannelState,
@ -280,13 +280,13 @@ class TestTrezorHostProtocol(unittest.TestCase):
config.wipe()
channel = next(iter(thp_main._CHANNELS.values()))
channel.selected_pairing_methods = [
ThpPairingMethod.NoMethod,
ThpPairingMethod.SkipPairing,
ThpPairingMethod.CodeEntry,
ThpPairingMethod.NFC_Unidirectional,
ThpPairingMethod.QrCode,
]
pairing_ctx = PairingContext(channel)
request_message = ThpStartPairingRequest()
request_message = ThpPairingRequest()
channel.set_channel_state(ChannelState.TP1)
gen = pairing.handle_pairing_request(pairing_ctx, request_message)
@ -310,7 +310,7 @@ class TestTrezorHostProtocol(unittest.TestCase):
ThpPairingMethod.QrCode,
]
pairing_ctx = PairingContext(channel)
request_message = ThpStartPairingRequest()
request_message = ThpPairingRequest()
with self.assertRaises(UnexpectedMessage) as e:
pairing.handle_pairing_request(pairing_ctx, request_message)
print(e.value.message)
@ -346,7 +346,7 @@ class TestTrezorHostProtocol(unittest.TestCase):
msg = ThpCodeEntryCpaceHost(cpace_host_public_key=cpace_host_public_key)
# msg = ThpQrCodeTag(tag=tag_qrc)
# msg = ThpNfcUnidirectionalTag(tag=tag_nfc)
# msg = ThpNfcTagHost(tag=tag_nfc)
buffer: bytearray = bytearray(protobuf.encoded_length(msg))
protobuf.encode(buffer, msg)

View File

@ -102,7 +102,7 @@ class TrezorClient:
if protobuf_mapping is None:
protobuf_mapping = mapping.DEFAULT_MAPPING
protocol_v1 = ProtocolV1(transport, protobuf_mapping)
if channel_data.protocol_version == 2:
if channel_data.protocol_version_major == 2:
try:
protocol_v1.write(messages.Ping(message="Sanity check - to resume"))
except Exception as e:

View File

@ -406,8 +406,9 @@ class TezosBallotType(IntEnum):
class ThpMessageType(IntEnum):
ThpCreateNewSession = 1000
ThpNewSession = 1001
ThpStartPairingRequest = 1008
ThpPairingRequest = 1006
ThpPairingRequestApproved = 1007
ThpSelectMethod = 1008
ThpPairingPreparationsFinished = 1009
ThpCredentialRequest = 1010
ThpCredentialResponse = 1011
@ -415,21 +416,20 @@ class ThpMessageType(IntEnum):
ThpEndResponse = 1013
ThpCodeEntryCommitment = 1016
ThpCodeEntryChallenge = 1017
ThpCodeEntryCpaceHost = 1018
ThpCodeEntryCpaceTrezor = 1019
ThpCodeEntryTag = 1020
ThpCodeEntrySecret = 1021
ThpCodeEntryCpaceTrezor = 1018
ThpCodeEntryCpaceHostTag = 1019
ThpCodeEntrySecret = 1020
ThpQrCodeTag = 1024
ThpQrCodeSecret = 1025
ThpNfcUnidirectionalTag = 1032
ThpNfcUnidirectionalSecret = 1033
ThpNfcTagHost = 1032
ThpNfcTagTrezor = 1033
class ThpPairingMethod(IntEnum):
NoMethod = 1
SkipPairing = 1
CodeEntry = 2
QrCode = 3
NFC_Unidirectional = 4
NFC = 4
class MessageType(IntEnum):
@ -4203,7 +4203,7 @@ class DebugLinkState(protobuf.MessageType):
13: protobuf.Field("tokens", "string", repeated=True, required=False, default=None),
14: protobuf.Field("thp_pairing_code_entry_code", "uint32", repeated=False, required=False, default=None),
15: protobuf.Field("thp_pairing_code_qr_code", "bytes", repeated=False, required=False, default=None),
16: protobuf.Field("thp_pairing_code_nfc_unidirectional", "bytes", repeated=False, required=False, default=None),
16: protobuf.Field("thp_pairing_code_nfc", "bytes", repeated=False, required=False, default=None),
}
def __init__(
@ -4224,7 +4224,7 @@ class DebugLinkState(protobuf.MessageType):
mnemonic_type: Optional["BackupType"] = None,
thp_pairing_code_entry_code: Optional["int"] = None,
thp_pairing_code_qr_code: Optional["bytes"] = None,
thp_pairing_code_nfc_unidirectional: Optional["bytes"] = None,
thp_pairing_code_nfc: Optional["bytes"] = None,
) -> None:
self.tokens: Sequence["str"] = tokens if tokens is not None else []
self.layout = layout
@ -4241,7 +4241,7 @@ class DebugLinkState(protobuf.MessageType):
self.mnemonic_type = mnemonic_type
self.thp_pairing_code_entry_code = thp_pairing_code_entry_code
self.thp_pairing_code_qr_code = thp_pairing_code_qr_code
self.thp_pairing_code_nfc_unidirectional = thp_pairing_code_nfc_unidirectional
self.thp_pairing_code_nfc = thp_pairing_code_nfc
class DebugLinkStop(protobuf.MessageType):
@ -7909,8 +7909,8 @@ class ThpDeviceProperties(protobuf.MessageType):
FIELDS = {
1: protobuf.Field("internal_model", "string", repeated=False, required=False, default=None),
2: protobuf.Field("model_variant", "uint32", repeated=False, required=False, default=None),
3: protobuf.Field("bootloader_mode", "bool", repeated=False, required=False, default=None),
4: protobuf.Field("protocol_version", "uint32", repeated=False, required=False, default=None),
3: protobuf.Field("protocol_version_major", "uint32", repeated=False, required=False, default=None),
4: protobuf.Field("protocol_version_minor", "uint32", repeated=False, required=False, default=None),
5: protobuf.Field("pairing_methods", "ThpPairingMethod", repeated=True, required=False, default=None),
}
@ -7920,30 +7920,27 @@ class ThpDeviceProperties(protobuf.MessageType):
pairing_methods: Optional[Sequence["ThpPairingMethod"]] = None,
internal_model: Optional["str"] = None,
model_variant: Optional["int"] = None,
bootloader_mode: Optional["bool"] = None,
protocol_version: Optional["int"] = None,
protocol_version_major: Optional["int"] = None,
protocol_version_minor: Optional["int"] = None,
) -> None:
self.pairing_methods: Sequence["ThpPairingMethod"] = pairing_methods if pairing_methods is not None else []
self.internal_model = internal_model
self.model_variant = model_variant
self.bootloader_mode = bootloader_mode
self.protocol_version = protocol_version
self.protocol_version_major = protocol_version_major
self.protocol_version_minor = protocol_version_minor
class ThpHandshakeCompletionReqNoisePayload(protobuf.MessageType):
MESSAGE_WIRE_TYPE = None
FIELDS = {
1: protobuf.Field("host_pairing_credential", "bytes", repeated=False, required=False, default=None),
2: protobuf.Field("pairing_methods", "ThpPairingMethod", repeated=True, required=False, default=None),
}
def __init__(
self,
*,
pairing_methods: Optional[Sequence["ThpPairingMethod"]] = None,
host_pairing_credential: Optional["bytes"] = None,
) -> None:
self.pairing_methods: Sequence["ThpPairingMethod"] = pairing_methods if pairing_methods is not None else []
self.host_pairing_credential = host_pairing_credential
@ -7967,22 +7964,8 @@ class ThpCreateNewSession(protobuf.MessageType):
self.derive_cardano = derive_cardano
class ThpNewSession(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1001
FIELDS = {
1: protobuf.Field("new_session_id", "uint32", repeated=False, required=False, default=None),
}
def __init__(
self,
*,
new_session_id: Optional["int"] = None,
) -> None:
self.new_session_id = new_session_id
class ThpStartPairingRequest(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1008
class ThpPairingRequest(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1006
FIELDS = {
1: protobuf.Field("host_name", "string", repeated=False, required=False, default=None),
}
@ -7995,6 +7978,24 @@ class ThpStartPairingRequest(protobuf.MessageType):
self.host_name = host_name
class ThpPairingRequestApproved(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1007
class ThpSelectMethod(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1008
FIELDS = {
1: protobuf.Field("selected_pairing_method", "ThpPairingMethod", repeated=False, required=False, default=ThpPairingMethod.NFC),
}
def __init__(
self,
*,
selected_pairing_method: Optional["ThpPairingMethod"] = ThpPairingMethod.NFC,
) -> None:
self.selected_pairing_method = selected_pairing_method
class ThpPairingPreparationsFinished(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1009
@ -8027,22 +8028,8 @@ class ThpCodeEntryChallenge(protobuf.MessageType):
self.challenge = challenge
class ThpCodeEntryCpaceHost(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1018
FIELDS = {
1: protobuf.Field("cpace_host_public_key", "bytes", repeated=False, required=False, default=None),
}
def __init__(
self,
*,
cpace_host_public_key: Optional["bytes"] = None,
) -> None:
self.cpace_host_public_key = cpace_host_public_key
class ThpCodeEntryCpaceTrezor(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1019
MESSAGE_WIRE_TYPE = 1018
FIELDS = {
1: protobuf.Field("cpace_trezor_public_key", "bytes", repeated=False, required=False, default=None),
}
@ -8055,22 +8042,25 @@ class ThpCodeEntryCpaceTrezor(protobuf.MessageType):
self.cpace_trezor_public_key = cpace_trezor_public_key
class ThpCodeEntryTag(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1020
class ThpCodeEntryCpaceHostTag(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1019
FIELDS = {
1: protobuf.Field("cpace_host_public_key", "bytes", repeated=False, required=False, default=None),
2: protobuf.Field("tag", "bytes", repeated=False, required=False, default=None),
}
def __init__(
self,
*,
cpace_host_public_key: Optional["bytes"] = None,
tag: Optional["bytes"] = None,
) -> None:
self.cpace_host_public_key = cpace_host_public_key
self.tag = tag
class ThpCodeEntrySecret(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1021
MESSAGE_WIRE_TYPE = 1020
FIELDS = {
1: protobuf.Field("secret", "bytes", repeated=False, required=False, default=None),
}
@ -8111,7 +8101,7 @@ class ThpQrCodeSecret(protobuf.MessageType):
self.secret = secret
class ThpNfcUnidirectionalTag(protobuf.MessageType):
class ThpNfcTagHost(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1032
FIELDS = {
1: protobuf.Field("tag", "bytes", repeated=False, required=False, default=None),
@ -8125,32 +8115,35 @@ class ThpNfcUnidirectionalTag(protobuf.MessageType):
self.tag = tag
class ThpNfcUnidirectionalSecret(protobuf.MessageType):
class ThpNfcTagTrezor(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1033
FIELDS = {
1: protobuf.Field("secret", "bytes", repeated=False, required=False, default=None),
1: protobuf.Field("tag", "bytes", repeated=False, required=False, default=None),
}
def __init__(
self,
*,
secret: Optional["bytes"] = None,
tag: Optional["bytes"] = None,
) -> None:
self.secret = secret
self.tag = tag
class ThpCredentialRequest(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 1010
FIELDS = {
1: protobuf.Field("host_static_pubkey", "bytes", repeated=False, required=False, default=None),
2: protobuf.Field("autoconnect", "bool", repeated=False, required=False, default=None),
}
def __init__(
self,
*,
host_static_pubkey: Optional["bytes"] = None,
autoconnect: Optional["bool"] = None,
) -> None:
self.host_static_pubkey = host_static_pubkey
self.autoconnect = autoconnect
class ThpCredentialResponse(protobuf.MessageType):
@ -8182,14 +8175,17 @@ class ThpCredentialMetadata(protobuf.MessageType):
MESSAGE_WIRE_TYPE = None
FIELDS = {
1: protobuf.Field("host_name", "string", repeated=False, required=False, default=None),
2: protobuf.Field("autoconnect", "bool", repeated=False, required=False, default=None),
}
def __init__(
self,
*,
host_name: Optional["str"] = None,
autoconnect: Optional["bool"] = None,
) -> None:
self.host_name = host_name
self.autoconnect = autoconnect
class ThpPairingCredential(protobuf.MessageType):

View File

@ -194,6 +194,8 @@ class SessionV2(Session):
return session
def __init__(self, client: TrezorClient, id: bytes) -> None:
from ..debuglink import TrezorClientDebugLink
super().__init__(client, id)
assert isinstance(client.protocol, ProtocolV2)
@ -201,7 +203,10 @@ class SessionV2(Session):
self.button_callback = client.button_callback
if self.button_callback is None:
self.button_callback = _callback_button
self.channel: ProtocolV2 = client.protocol.get_channel()
helper_debug = None
if isinstance(client, TrezorClientDebugLink):
helper_debug = client.debug
self.channel: ProtocolV2 = client.protocol.get_channel(helper_debug)
self.update_id_and_sid(id)
def _write(self, msg: t.Any) -> None:

View File

@ -4,9 +4,11 @@ from binascii import hexlify
class ChannelData:
def __init__(
self,
protocol_version: int,
protocol_version_major: int,
protocol_version_minor: int,
transport_path: str,
channel_id: int,
key_request: bytes,
@ -15,8 +17,10 @@ class ChannelData:
nonce_response: int,
sync_bit_send: int,
sync_bit_receive: int,
handshake_hash: bytes,
) -> None:
self.protocol_version: int = protocol_version
self.protocol_version_major: int = protocol_version_major
self.protocol_version_minor: int = protocol_version_minor
self.transport_path: str = transport_path
self.channel_id: int = channel_id
self.key_request: str = hexlify(key_request).decode()
@ -25,10 +29,12 @@ class ChannelData:
self.nonce_response: int = nonce_response
self.sync_bit_receive: int = sync_bit_receive
self.sync_bit_send: int = sync_bit_send
self.handshake_hash: bytes = handshake_hash
def to_dict(self):
return {
"protocol_version": self.protocol_version,
"protocol_version_major": self.protocol_version_major,
"protocol_version_minor": self.protocol_version_minor,
"transport_path": self.transport_path,
"channel_id": self.channel_id,
"key_request": self.key_request,
@ -37,4 +43,5 @@ class ChannelData:
"nonce_response": self.nonce_response,
"sync_bit_send": self.sync_bit_send,
"sync_bit_receive": self.sync_bit_receive,
"handshake_hash": self.handshake_hash,
}

View File

@ -112,7 +112,8 @@ class JsonChannelDatabase(ChannelDatabase):
def dict_to_channel_data(dict: t.Dict) -> ChannelData:
return ChannelData(
protocol_version=dict["protocol_version"],
protocol_version_major=dict["protocol_version_minor"],
protocol_version_minor=dict["protocol_version_major"],
transport_path=dict["transport_path"],
channel_id=dict["channel_id"],
key_request=bytes.fromhex(dict["key_request"]),
@ -121,6 +122,7 @@ def dict_to_channel_data(dict: t.Dict) -> ChannelData:
nonce_response=dict["nonce_response"],
sync_bit_send=dict["sync_bit_send"],
sync_bit_receive=dict["sync_bit_receive"],
handshake_hash=dict["handshake_hash"],
)

View File

@ -26,6 +26,9 @@ LOG = logging.getLogger(__name__)
MANAGEMENT_SESSION_ID: int = 0
if t.TYPE_CHECKING:
from ...debuglink import DebugLink
def _sha256_of_two(val_1: bytes, val_2: bytes) -> bytes:
hash = hashlib.sha256(val_1)
@ -77,16 +80,18 @@ class ProtocolV2(ProtocolAndChannel):
self.nonce_response = channel_data.nonce_response
self.sync_bit_receive = channel_data.sync_bit_receive
self.sync_bit_send = channel_data.sync_bit_send
self.handshake_hash: bytes = b""
self._has_valid_channel = True
def get_channel(self) -> ProtocolV2:
def get_channel(self, helper_debug: DebugLink | None = None) -> ProtocolV2:
if not self._has_valid_channel:
self._establish_new_channel()
self._establish_new_channel(helper_debug)
return self
def get_channel_data(self) -> ChannelData:
return ChannelData(
protocol_version=2,
protocol_version_major=2,
protocol_version_minor=2,
transport_path=self.transport.get_path(),
channel_id=self.channel_id,
key_request=self.key_request,
@ -95,6 +100,7 @@ class ProtocolV2(ProtocolAndChannel):
nonce_response=self.nonce_response,
sync_bit_receive=self.sync_bit_receive,
sync_bit_send=self.sync_bit_send,
handshake_hash=self.handshake_hash,
)
def read(self, session_id: int) -> t.Any:
@ -129,7 +135,7 @@ class ProtocolV2(ProtocolAndChannel):
raise exceptions.TrezorException("Unexpected response to GetFeatures")
self._features = features
def _establish_new_channel(self) -> None:
def _establish_new_channel(self, helper_debug: DebugLink | None = None) -> None:
self.sync_bit_send = 0
self.sync_bit_receive = 0
@ -141,7 +147,7 @@ class ProtocolV2(ProtocolAndChannel):
self._do_handshake(host_ephemeral_privkey, host_ephemeral_pubkey)
self._do_pairing()
self._do_pairing(helper_debug)
def _do_channel_allocation(self) -> None:
channel_allocation_nonce = os.urandom(8)
@ -269,9 +275,6 @@ class ProtocolV2(ProtocolAndChannel):
)
msg_data = self.mapping.encode_without_wire_type(
messages.ThpHandshakeCompletionReqNoisePayload(
pairing_methods=[
messages.ThpPairingMethod.NoMethod,
],
host_pairing_credential=credential,
)
)
@ -279,7 +282,7 @@ class ProtocolV2(ProtocolAndChannel):
aes_ctx = AESGCM(k)
encrypted_payload = aes_ctx.encrypt(IV_1, msg_data, h)
h = _sha256_of_two(h, encrypted_payload)
h = _sha256_of_two(h, encrypted_payload[:-16])
ha_completion_req_header = MessageHeader(
0x12,
self.channel_id,
@ -292,6 +295,7 @@ class ProtocolV2(ProtocolAndChannel):
ha_completion_req_header,
encrypted_host_static_pubkey + encrypted_payload,
)
self.handshake_hash = h
return ck
def _read_handshake_completion_response(self) -> None:
@ -304,9 +308,9 @@ class ProtocolV2(ProtocolAndChannel):
)
self._send_ack_1()
def _do_pairing(self):
def _do_pairing(self, helper_debug: DebugLink | None):
# Send StartPairingReqest message
message = messages.ThpStartPairingRequest()
message = messages.ThpPairingRequest()
message_type, message_data = self.mapping.encode(message)
self._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
@ -314,11 +318,41 @@ class ProtocolV2(ProtocolAndChannel):
# Read ACK
self._read_ack()
# Read ThpEndResponse
# Read button request
_, msg_type, msg_data = self.read_and_decrypt()
maaa = self.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, messages.ButtonRequest)
# Send button ACK
message = messages.ButtonAck()
message_type, message_data = self.mapping.encode(message)
self._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
self._read_ack()
if helper_debug is not None:
helper_debug.press_yes()
# Read PairingRequestApproved
_, msg_type, msg_data = self.read_and_decrypt()
maaa = self.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, messages.ThpPairingRequestApproved)
message = messages.ThpSelectMethod(
selected_pairing_method=messages.ThpPairingMethod.SkipPairing
)
message_type, message_data = self.mapping.encode(message)
self._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
# Read ACK
self._read_ack()
# Read ThpEndResponse
_, msg_type, msg_data = self.read_and_decrypt()
maaa = self.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, messages.ThpEndResponse)
self._has_valid_channel = True
def _read_ack(self):

View File

@ -1494,8 +1494,8 @@ pub struct DebugLinkState {
pub thp_pairing_code_entry_code: ::std::option::Option<u32>,
// @@protoc_insertion_point(field:hw.trezor.messages.debug.DebugLinkState.thp_pairing_code_qr_code)
pub thp_pairing_code_qr_code: ::std::option::Option<::std::vec::Vec<u8>>,
// @@protoc_insertion_point(field:hw.trezor.messages.debug.DebugLinkState.thp_pairing_code_nfc_unidirectional)
pub thp_pairing_code_nfc_unidirectional: ::std::option::Option<::std::vec::Vec<u8>>,
// @@protoc_insertion_point(field:hw.trezor.messages.debug.DebugLinkState.thp_pairing_code_nfc)
pub thp_pairing_code_nfc: ::std::option::Option<::std::vec::Vec<u8>>,
// special fields
// @@protoc_insertion_point(special_field:hw.trezor.messages.debug.DebugLinkState.special_fields)
pub special_fields: ::protobuf::SpecialFields,
@ -1898,40 +1898,40 @@ impl DebugLinkState {
self.thp_pairing_code_qr_code.take().unwrap_or_else(|| ::std::vec::Vec::new())
}
// optional bytes thp_pairing_code_nfc_unidirectional = 16;
// optional bytes thp_pairing_code_nfc = 16;
pub fn thp_pairing_code_nfc_unidirectional(&self) -> &[u8] {
match self.thp_pairing_code_nfc_unidirectional.as_ref() {
pub fn thp_pairing_code_nfc(&self) -> &[u8] {
match self.thp_pairing_code_nfc.as_ref() {
Some(v) => v,
None => &[],
}
}
pub fn clear_thp_pairing_code_nfc_unidirectional(&mut self) {
self.thp_pairing_code_nfc_unidirectional = ::std::option::Option::None;
pub fn clear_thp_pairing_code_nfc(&mut self) {
self.thp_pairing_code_nfc = ::std::option::Option::None;
}
pub fn has_thp_pairing_code_nfc_unidirectional(&self) -> bool {
self.thp_pairing_code_nfc_unidirectional.is_some()
pub fn has_thp_pairing_code_nfc(&self) -> bool {
self.thp_pairing_code_nfc.is_some()
}
// Param is passed by value, moved
pub fn set_thp_pairing_code_nfc_unidirectional(&mut self, v: ::std::vec::Vec<u8>) {
self.thp_pairing_code_nfc_unidirectional = ::std::option::Option::Some(v);
pub fn set_thp_pairing_code_nfc(&mut self, v: ::std::vec::Vec<u8>) {
self.thp_pairing_code_nfc = ::std::option::Option::Some(v);
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_thp_pairing_code_nfc_unidirectional(&mut self) -> &mut ::std::vec::Vec<u8> {
if self.thp_pairing_code_nfc_unidirectional.is_none() {
self.thp_pairing_code_nfc_unidirectional = ::std::option::Option::Some(::std::vec::Vec::new());
pub fn mut_thp_pairing_code_nfc(&mut self) -> &mut ::std::vec::Vec<u8> {
if self.thp_pairing_code_nfc.is_none() {
self.thp_pairing_code_nfc = ::std::option::Option::Some(::std::vec::Vec::new());
}
self.thp_pairing_code_nfc_unidirectional.as_mut().unwrap()
self.thp_pairing_code_nfc.as_mut().unwrap()
}
// Take field
pub fn take_thp_pairing_code_nfc_unidirectional(&mut self) -> ::std::vec::Vec<u8> {
self.thp_pairing_code_nfc_unidirectional.take().unwrap_or_else(|| ::std::vec::Vec::new())
pub fn take_thp_pairing_code_nfc(&mut self) -> ::std::vec::Vec<u8> {
self.thp_pairing_code_nfc.take().unwrap_or_else(|| ::std::vec::Vec::new())
}
fn generated_message_descriptor_data() -> ::protobuf::reflect::GeneratedMessageDescriptorData {
@ -2013,9 +2013,9 @@ impl DebugLinkState {
|m: &mut DebugLinkState| { &mut m.thp_pairing_code_qr_code },
));
fields.push(::protobuf::reflect::rt::v2::make_option_accessor::<_, _>(
"thp_pairing_code_nfc_unidirectional",
|m: &DebugLinkState| { &m.thp_pairing_code_nfc_unidirectional },
|m: &mut DebugLinkState| { &mut m.thp_pairing_code_nfc_unidirectional },
"thp_pairing_code_nfc",
|m: &DebugLinkState| { &m.thp_pairing_code_nfc },
|m: &mut DebugLinkState| { &mut m.thp_pairing_code_nfc },
));
::protobuf::reflect::GeneratedMessageDescriptorData::new_2::<DebugLinkState>(
"DebugLinkState",
@ -2086,7 +2086,7 @@ impl ::protobuf::Message for DebugLinkState {
self.thp_pairing_code_qr_code = ::std::option::Option::Some(is.read_bytes()?);
},
130 => {
self.thp_pairing_code_nfc_unidirectional = ::std::option::Option::Some(is.read_bytes()?);
self.thp_pairing_code_nfc = ::std::option::Option::Some(is.read_bytes()?);
},
tag => {
::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?;
@ -2146,7 +2146,7 @@ impl ::protobuf::Message for DebugLinkState {
if let Some(v) = self.thp_pairing_code_qr_code.as_ref() {
my_size += ::protobuf::rt::bytes_size(15, &v);
}
if let Some(v) = self.thp_pairing_code_nfc_unidirectional.as_ref() {
if let Some(v) = self.thp_pairing_code_nfc.as_ref() {
my_size += ::protobuf::rt::bytes_size(16, &v);
}
my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields());
@ -2200,7 +2200,7 @@ impl ::protobuf::Message for DebugLinkState {
if let Some(v) = self.thp_pairing_code_qr_code.as_ref() {
os.write_bytes(15, v)?;
}
if let Some(v) = self.thp_pairing_code_nfc_unidirectional.as_ref() {
if let Some(v) = self.thp_pairing_code_nfc.as_ref() {
os.write_bytes(16, v)?;
}
os.write_unknown_fields(self.special_fields.unknown_fields())?;
@ -2235,7 +2235,7 @@ impl ::protobuf::Message for DebugLinkState {
self.tokens.clear();
self.thp_pairing_code_entry_code = ::std::option::Option::None;
self.thp_pairing_code_qr_code = ::std::option::Option::None;
self.thp_pairing_code_nfc_unidirectional = ::std::option::Option::None;
self.thp_pairing_code_nfc = ::std::option::Option::None;
self.special_fields.clear();
}
@ -2256,7 +2256,7 @@ impl ::protobuf::Message for DebugLinkState {
tokens: ::std::vec::Vec::new(),
thp_pairing_code_entry_code: ::std::option::Option::None,
thp_pairing_code_qr_code: ::std::option::Option::None,
thp_pairing_code_nfc_unidirectional: ::std::option::Option::None,
thp_pairing_code_nfc: ::std::option::Option::None,
special_fields: ::protobuf::SpecialFields::new(),
};
&instance
@ -3856,7 +3856,7 @@ static file_descriptor_proto_data: &'static [u8] = b"\
(\x0e29.hw.trezor.messages.debug.DebugLinkGetState.DebugWaitType:\tIMMED\
IATER\nwaitLayout\x12$\n\x0ethp_channel_id\x18\x04\x20\x01(\x0cR\x0cthpC\
hannelId\"C\n\rDebugWaitType\x12\r\n\tIMMEDIATE\x10\0\x12\x0f\n\x0bNEXT_\
LAYOUT\x10\x01\x12\x12\n\x0eCURRENT_LAYOUT\x10\x02\"\xdb\x05\n\x0eDebugL\
LAYOUT\x10\x01\x12\x12\n\x0eCURRENT_LAYOUT\x10\x02\"\xbe\x05\n\x0eDebugL\
inkState\x12\x16\n\x06layout\x18\x01\x20\x01(\x0cR\x06layout\x12\x10\n\
\x03pin\x18\x02\x20\x01(\tR\x03pin\x12\x16\n\x06matrix\x18\x03\x20\x01(\
\tR\x06matrix\x12'\n\x0fmnemonic_secret\x18\x04\x20\x01(\x0cR\x0emnemoni\
@ -3871,22 +3871,22 @@ static file_descriptor_proto_data: &'static [u8] = b"\
monicType\x12\x16\n\x06tokens\x18\r\x20\x03(\tR\x06tokens\x12<\n\x1bthp_\
pairing_code_entry_code\x18\x0e\x20\x01(\rR\x17thpPairingCodeEntryCode\
\x126\n\x18thp_pairing_code_qr_code\x18\x0f\x20\x01(\x0cR\x14thpPairingC\
odeQrCode\x12L\n#thp_pairing_code_nfc_unidirectional\x18\x10\x20\x01(\
\x0cR\x1fthpPairingCodeNfcUnidirectional\"\x0f\n\rDebugLinkStop\"P\n\x0c\
DebugLinkLog\x12\x14\n\x05level\x18\x01\x20\x01(\rR\x05level\x12\x16\n\
\x06bucket\x18\x02\x20\x01(\tR\x06bucket\x12\x12\n\x04text\x18\x03\x20\
\x01(\tR\x04text\"G\n\x13DebugLinkMemoryRead\x12\x18\n\x07address\x18\
\x01\x20\x01(\rR\x07address\x12\x16\n\x06length\x18\x02\x20\x01(\rR\x06l\
ength\")\n\x0fDebugLinkMemory\x12\x16\n\x06memory\x18\x01\x20\x01(\x0cR\
\x06memory\"^\n\x14DebugLinkMemoryWrite\x12\x18\n\x07address\x18\x01\x20\
\x01(\rR\x07address\x12\x16\n\x06memory\x18\x02\x20\x01(\x0cR\x06memory\
\x12\x14\n\x05flash\x18\x03\x20\x01(\x08R\x05flash\"-\n\x13DebugLinkFlas\
hErase\x12\x16\n\x06sector\x18\x01\x20\x01(\rR\x06sector\".\n\x14DebugLi\
nkEraseSdCard\x12\x16\n\x06format\x18\x01\x20\x01(\x08R\x06format\"0\n\
\x14DebugLinkWatchLayout\x12\x14\n\x05watch\x18\x01\x20\x01(\x08R\x05wat\
ch:\x02\x18\x01\"\x1f\n\x19DebugLinkResetDebugEvents:\x02\x18\x01\"\x1a\
\n\x18DebugLinkOptigaSetSecMaxB=\n#com.satoshilabs.trezor.lib.protobufB\
\x12TrezorMessageDebug\x80\xa6\x1d\x01\
odeQrCode\x12/\n\x14thp_pairing_code_nfc\x18\x10\x20\x01(\x0cR\x11thpPai\
ringCodeNfc\"\x0f\n\rDebugLinkStop\"P\n\x0cDebugLinkLog\x12\x14\n\x05lev\
el\x18\x01\x20\x01(\rR\x05level\x12\x16\n\x06bucket\x18\x02\x20\x01(\tR\
\x06bucket\x12\x12\n\x04text\x18\x03\x20\x01(\tR\x04text\"G\n\x13DebugLi\
nkMemoryRead\x12\x18\n\x07address\x18\x01\x20\x01(\rR\x07address\x12\x16\
\n\x06length\x18\x02\x20\x01(\rR\x06length\")\n\x0fDebugLinkMemory\x12\
\x16\n\x06memory\x18\x01\x20\x01(\x0cR\x06memory\"^\n\x14DebugLinkMemory\
Write\x12\x18\n\x07address\x18\x01\x20\x01(\rR\x07address\x12\x16\n\x06m\
emory\x18\x02\x20\x01(\x0cR\x06memory\x12\x14\n\x05flash\x18\x03\x20\x01\
(\x08R\x05flash\"-\n\x13DebugLinkFlashErase\x12\x16\n\x06sector\x18\x01\
\x20\x01(\rR\x06sector\".\n\x14DebugLinkEraseSdCard\x12\x16\n\x06format\
\x18\x01\x20\x01(\x08R\x06format\"0\n\x14DebugLinkWatchLayout\x12\x14\n\
\x05watch\x18\x01\x20\x01(\x08R\x05watch:\x02\x18\x01\"\x1f\n\x19DebugLi\
nkResetDebugEvents:\x02\x18\x01\"\x1a\n\x18DebugLinkOptigaSetSecMaxB=\n#\
com.satoshilabs.trezor.lib.protobufB\x12TrezorMessageDebug\x80\xa6\x1d\
\x01\
";
/// `FileDescriptorProto` object which was a source for this generated file

File diff suppressed because it is too large Load Diff

View File

@ -174,10 +174,7 @@ def _client_from_path(
def _find_client(request: pytest.FixtureRequest, interact: bool) -> Client:
devices = enumerate_devices()
for device in devices:
try:
return Client(device, auto_interact=not interact)
except Exception:
pass
return Client(device, auto_interact=not interact)
request.session.shouldstop = "Failed to communicate with Trezor"
raise RuntimeError("No debuggable device found")

View File

@ -1,3 +1,4 @@
import hashlib
import os
import typing as t
@ -6,8 +7,24 @@ import typing_extensions as tx
from trezorlib.client import ProtocolV2
from trezorlib.debuglink import TrezorClientDebugLink as Client
from trezorlib.messages import (
ButtonAck,
ButtonRequest,
ThpCodeEntryChallenge,
ThpCodeEntryCommitment,
ThpCodeEntryCpaceTrezor,
ThpEndRequest,
ThpEndResponse,
ThpPairingMethod,
ThpPairingPreparationsFinished,
ThpPairingRequest,
ThpPairingRequestApproved,
ThpQrCodeSecret,
ThpQrCodeTag,
ThpSelectMethod,
)
from trezorlib.transport.thp import curve25519
from trezorlib.transport.thp.protocol_v2 import _hkdf
from trezorlib.transport.thp.protocol_v2 import MANAGEMENT_SESSION_ID, _hkdf
if t.TYPE_CHECKING:
P = tx.ParamSpec("P")
@ -65,7 +82,219 @@ def test_handshake(client: Client) -> None:
# TODO - without pairing, the client is damaged and results in fail of the following test
# so far no luck in solving it - it should be also tackled in FW, as it causes unexpected FW error
protocol._do_pairing()
protocol._do_pairing(client.debug)
# TODO the following is just to make style checker happy
assert noise_tag is not None
def test_pairing_qr_code(client: Client) -> None:
protocol: ProtocolV2 = client.protocol
protocol.sync_bit_send = 0
protocol.sync_bit_receive = 0
# Generate ephemeral keys
host_ephemeral_privkey = curve25519.get_private_key(os.urandom(32))
host_ephemeral_pubkey = curve25519.get_public_key(host_ephemeral_privkey)
protocol._do_channel_allocation()
protocol._do_handshake(host_ephemeral_privkey, host_ephemeral_pubkey)
# Send StartPairingReqest message
message = ThpPairingRequest()
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
# Read ACK
protocol._read_ack()
# Read button request
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ButtonRequest)
# Send button ACK
message = ButtonAck()
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
protocol._read_ack()
client.debug.press_yes()
# Read PairingRequestApproved
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ThpPairingRequestApproved)
message = ThpSelectMethod(selected_pairing_method=ThpPairingMethod.QrCode)
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
# Read ACK
protocol._read_ack()
# Read ThpPairingPreparationsFinished
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ThpPairingPreparationsFinished)
# QR Code shown
# Read button request
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ButtonRequest)
# Send button ACK
message = ButtonAck()
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
protocol._read_ack()
state = client.debug.state(thp_channel_id=protocol.channel_id.to_bytes(2, "big"))
sha_ctx = hashlib.sha256(protocol.handshake_hash)
sha_ctx.update(state.thp_pairing_code_qr_code)
tag = sha_ctx.digest()
message_type, message_data = protocol.mapping.encode(ThpQrCodeTag(tag=tag))
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
protocol._read_ack()
# Read ThpQrCodeSecret
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ThpQrCodeSecret)
message = ThpEndRequest()
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
# Read ACK
protocol._read_ack()
# Read ThpEndResponse
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ThpEndResponse)
protocol._has_valid_channel = True
@pytest.mark.skip("Cpace is not implemented yet")
def test_pairing_code_entry(client: Client) -> None:
protocol: ProtocolV2 = client.protocol
protocol.sync_bit_send = 0
protocol.sync_bit_receive = 0
# Generate ephemeral keys
host_ephemeral_privkey = curve25519.get_private_key(os.urandom(32))
host_ephemeral_pubkey = curve25519.get_public_key(host_ephemeral_privkey)
protocol._do_channel_allocation()
protocol._do_handshake(host_ephemeral_privkey, host_ephemeral_pubkey)
# Send StartPairingReqest message
message = ThpPairingRequest()
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
# Read ACK
protocol._read_ack()
# Read button request
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ButtonRequest)
# Send button ACK
message = ButtonAck()
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
protocol._read_ack()
client.debug.press_yes()
# Read PairingRequestApproved
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ThpPairingRequestApproved)
message = ThpSelectMethod(selected_pairing_method=ThpPairingMethod.CodeEntry)
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
# Read ACK
protocol._read_ack()
# Read ThpCodeEntryCommitment
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ThpCodeEntryCommitment)
challenge = b"\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xAA\xBB\xCC\xDD\xEE\xFF"
message = ThpCodeEntryChallenge(challenge=challenge)
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
# Read ACK
protocol._read_ack()
# Read ThpCodeEntryCpaceTrezor
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ThpCodeEntryCpaceTrezor)
_ = maaa.cpace_trezor_public_key
# Code Entry code shown
# Read button request
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ButtonRequest)
# Send button ACK
message = ButtonAck()
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
protocol._read_ack()
state = client.debug.state(thp_channel_id=protocol.channel_id.to_bytes(2, "big"))
sha_ctx = hashlib.sha256(protocol.handshake_hash)
sha_ctx.update(state.thp_pairing_code_entry_code)
tag = sha_ctx.digest()
message_type, message_data = protocol.mapping.encode(ThpQrCodeTag(tag=tag))
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
protocol._read_ack()
# Read ThpQrCodeSecret
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ThpQrCodeSecret)
message = ThpEndRequest()
message_type, message_data = protocol.mapping.encode(message)
protocol._encrypt_and_write(MANAGEMENT_SESSION_ID, message_type, message_data)
# Read ACK
protocol._read_ack()
# Read ThpEndResponse
_, msg_type, msg_data = protocol.read_and_decrypt()
maaa = protocol.mapping.decode(msg_type, msg_data)
assert isinstance(maaa, ThpEndResponse)
protocol._has_valid_channel = True