diff --git a/core/src/all_modules.py b/core/src/all_modules.py index 2ea5f6978..9fcdeff06 100644 --- a/core/src/all_modules.py +++ b/core/src/all_modules.py @@ -448,6 +448,8 @@ if utils.BITCOIN_ONLY: import apps.cardano.helpers apps.cardano.helpers.bech32 import apps.cardano.helpers.bech32 + apps.cardano.helpers.hash_builder_collection + import apps.cardano.helpers.hash_builder_collection apps.cardano.helpers.network_ids import apps.cardano.helpers.network_ids apps.cardano.helpers.paths diff --git a/core/src/apps/cardano/auxiliary_data.py b/core/src/apps/cardano/auxiliary_data.py index 3c12a8bb6..c8638edc2 100644 --- a/core/src/apps/cardano/auxiliary_data.py +++ b/core/src/apps/cardano/auxiliary_data.py @@ -1,6 +1,7 @@ from trezor.crypto import hashlib from trezor.crypto.curve import ed25519 -from trezor.enums import CardanoAddressType +from trezor.enums import CardanoAddressType, CardanoTxAuxiliaryDataSupplementType +from trezor.messages import CardanoTxAuxiliaryDataSupplement from apps.common import cbor @@ -21,10 +22,11 @@ if False: from trezor.messages import ( CardanoCatalystRegistrationParametersType, - CardanoTxAuxiliaryDataType, + CardanoTxAuxiliaryData, ) CatalystRegistrationPayload = dict[int, Union[bytes, int]] + SignedCatalystRegistrationPayload = tuple[CatalystRegistrationPayload, bytes] CatalystRegistrationSignature = dict[int, bytes] CatalystRegistration = dict[ int, Union[CatalystRegistrationPayload, CatalystRegistrationSignature] @@ -40,14 +42,11 @@ METADATA_KEY_CATALYST_REGISTRATION = 61284 METADATA_KEY_CATALYST_REGISTRATION_SIGNATURE = 61285 -def validate_auxiliary_data(auxiliary_data: CardanoTxAuxiliaryDataType | None) -> None: - if not auxiliary_data: - return - +def validate_auxiliary_data(auxiliary_data: CardanoTxAuxiliaryData) -> None: fields_provided = 0 - if auxiliary_data.blob: + if auxiliary_data.hash: fields_provided += 1 - _validate_auxiliary_data_blob(auxiliary_data.blob) + _validate_auxiliary_data_hash(auxiliary_data.hash) if auxiliary_data.catalyst_registration_parameters: fields_provided += 1 _validate_catalyst_registration_parameters( @@ -58,12 +57,8 @@ def validate_auxiliary_data(auxiliary_data: CardanoTxAuxiliaryDataType | None) - raise INVALID_AUXILIARY_DATA -def _validate_auxiliary_data_blob(auxiliary_data_blob: bytes) -> None: - try: - # validation to prevent CBOR injection and invalid CBOR - # we don't validate data format, just that it's a valid CBOR - cbor.decode(auxiliary_data_blob) - except Exception: +def _validate_auxiliary_data_hash(auxiliary_data_hash: bytes) -> None: + if len(auxiliary_data_hash) != AUXILIARY_DATA_HASH_SIZE: raise INVALID_AUXILIARY_DATA @@ -91,27 +86,20 @@ def _validate_catalyst_registration_parameters( async def show_auxiliary_data( ctx: wire.Context, keychain: seed.Keychain, - auxiliary_data: CardanoTxAuxiliaryDataType | None, + auxiliary_data_hash: bytes, + catalyst_registration_parameters: CardanoCatalystRegistrationParametersType | None, protocol_magic: int, network_id: int, ) -> None: - if not auxiliary_data: - return - - if auxiliary_data.catalyst_registration_parameters: + if catalyst_registration_parameters: await _show_catalyst_registration( ctx, keychain, - auxiliary_data.catalyst_registration_parameters, + catalyst_registration_parameters, protocol_magic, network_id, ) - auxiliary_data_bytes = get_auxiliary_data_cbor( - keychain, auxiliary_data, protocol_magic, network_id - ) - - auxiliary_data_hash = hash_auxiliary_data(bytes(auxiliary_data_bytes)) await show_auxiliary_data_hash(ctx, auxiliary_data_hash) @@ -138,37 +126,71 @@ async def _show_catalyst_registration( ) -def get_auxiliary_data_cbor( +def get_auxiliary_data_hash_and_supplement( keychain: seed.Keychain, - auxiliary_data: CardanoTxAuxiliaryDataType, + auxiliary_data: CardanoTxAuxiliaryData, protocol_magic: int, network_id: int, -) -> bytes: - if auxiliary_data.blob: - return auxiliary_data.blob - elif auxiliary_data.catalyst_registration_parameters: - cborized_catalyst_registration = _cborize_catalyst_registration( - keychain, - auxiliary_data.catalyst_registration_parameters, - protocol_magic, - network_id, +) -> tuple[bytes, CardanoTxAuxiliaryDataSupplement]: + if parameters := auxiliary_data.catalyst_registration_parameters: + ( + catalyst_registration_payload, + catalyst_signature, + ) = _get_signed_catalyst_registration_payload( + keychain, parameters, protocol_magic, network_id ) - return cbor.encode(_wrap_metadata(cborized_catalyst_registration)) + auxiliary_data_hash = _get_catalyst_registration_auxiliary_data_hash( + catalyst_registration_payload, catalyst_signature + ) + auxiliary_data_supplement = CardanoTxAuxiliaryDataSupplement( + type=CardanoTxAuxiliaryDataSupplementType.CATALYST_REGISTRATION_SIGNATURE, + auxiliary_data_hash=auxiliary_data_hash, + catalyst_signature=catalyst_signature, + ) + return auxiliary_data_hash, auxiliary_data_supplement else: - raise INVALID_AUXILIARY_DATA + assert auxiliary_data.hash is not None # validate_auxiliary_data + return auxiliary_data.hash, CardanoTxAuxiliaryDataSupplement( + type=CardanoTxAuxiliaryDataSupplementType.NONE + ) + + +def _get_catalyst_registration_auxiliary_data_hash( + catalyst_registration_payload: CatalystRegistrationPayload, + catalyst_registration_payload_signature: bytes, +) -> bytes: + cborized_catalyst_registration = _cborize_catalyst_registration( + catalyst_registration_payload, + catalyst_registration_payload_signature, + ) + return _hash_auxiliary_data( + cbor.encode(_wrap_metadata(cborized_catalyst_registration)) + ) def _cborize_catalyst_registration( + catalyst_registration_payload: CatalystRegistrationPayload, + catalyst_registration_payload_signature: bytes, +) -> CatalystRegistration: + catalyst_registration_signature = {1: catalyst_registration_payload_signature} + + return { + METADATA_KEY_CATALYST_REGISTRATION: catalyst_registration_payload, + METADATA_KEY_CATALYST_REGISTRATION_SIGNATURE: catalyst_registration_signature, + } + + +def _get_signed_catalyst_registration_payload( keychain: seed.Keychain, catalyst_registration_parameters: CardanoCatalystRegistrationParametersType, protocol_magic: int, network_id: int, -) -> CatalystRegistration: +) -> SignedCatalystRegistrationPayload: staking_key = derive_public_key( keychain, catalyst_registration_parameters.staking_path ) - catalyst_registration_payload: CatalystRegistrationPayload = { + payload: CatalystRegistrationPayload = { 1: catalyst_registration_parameters.voting_public_key, 2: staking_key, 3: derive_address_bytes( @@ -180,19 +202,13 @@ def _cborize_catalyst_registration( 4: catalyst_registration_parameters.nonce, } - catalyst_registration_payload_signature = ( - _create_catalyst_registration_payload_signature( - keychain, - catalyst_registration_payload, - catalyst_registration_parameters.staking_path, - ) + signature = _create_catalyst_registration_payload_signature( + keychain, + payload, + catalyst_registration_parameters.staking_path, ) - catalyst_registration_signature = {1: catalyst_registration_payload_signature} - return { - METADATA_KEY_CATALYST_REGISTRATION: catalyst_registration_payload, - METADATA_KEY_CATALYST_REGISTRATION_SIGNATURE: catalyst_registration_signature, - } + return payload, signature def _create_catalyst_registration_payload_signature( @@ -228,7 +244,7 @@ def _wrap_metadata(metadata: dict) -> tuple[dict, tuple]: return metadata, () -def hash_auxiliary_data(auxiliary_data: bytes) -> bytes: +def _hash_auxiliary_data(auxiliary_data: bytes) -> bytes: return hashlib.blake2b( data=auxiliary_data, outlen=AUXILIARY_DATA_HASH_SIZE ).digest() diff --git a/core/src/apps/cardano/certificates.py b/core/src/apps/cardano/certificates.py index 8860481a7..9cb0334d8 100644 --- a/core/src/apps/cardano/certificates.py +++ b/core/src/apps/cardano/certificates.py @@ -1,4 +1,8 @@ -from trezor.enums import CardanoCertificateType, CardanoPoolRelayType +from trezor.enums import ( + CardanoCertificateType, + CardanoPoolRelayType, + CardanoTxSigningMode, +) from apps.common import cbor @@ -13,10 +17,10 @@ from .helpers.paths import SCHEMA_STAKING_ANY_ACCOUNT if False: from trezor.messages import ( CardanoPoolMetadataType, - CardanoPoolOwnerType, + CardanoPoolOwner, CardanoPoolParametersType, - CardanoPoolRelayParametersType, - CardanoTxCertificateType, + CardanoPoolRelayParameters, + CardanoTxCertificate, ) from apps.common.cbor import CborSequence @@ -34,8 +38,22 @@ MAX_PORT_NUMBER = 65535 def validate_certificate( - certificate: CardanoTxCertificateType, protocol_magic: int, network_id: int + certificate: CardanoTxCertificate, + signing_mode: CardanoTxSigningMode, + protocol_magic: int, + network_id: int, ) -> None: + if ( + signing_mode == CardanoTxSigningMode.ORDINARY_TRANSACTION + and certificate.type == CardanoCertificateType.STAKE_POOL_REGISTRATION + ): + raise INVALID_CERTIFICATE + elif ( + signing_mode == CardanoTxSigningMode.POOL_REGISTRATION_AS_OWNER + and certificate.type != CardanoCertificateType.STAKE_POOL_REGISTRATION + ): + raise INVALID_CERTIFICATE + if certificate.type in ( CardanoCertificateType.STAKE_DELEGATION, CardanoCertificateType.STAKE_REGISTRATION, @@ -57,7 +75,7 @@ def validate_certificate( def cborize_certificate( - keychain: seed.Keychain, certificate: CardanoTxCertificateType + keychain: seed.Keychain, certificate: CardanoTxCertificate ) -> CborSequence: if certificate.type in ( CardanoCertificateType.STAKE_REGISTRATION, @@ -73,33 +91,35 @@ def cborize_certificate( (0, get_public_key_hash(keychain, certificate.path)), certificate.pool, ) - elif certificate.type == CardanoCertificateType.STAKE_POOL_REGISTRATION: - pool_parameters = certificate.pool_parameters + else: + raise INVALID_CERTIFICATE - assert pool_parameters is not None - return ( - certificate.type, - pool_parameters.pool_id, - pool_parameters.vrf_key_hash, - pool_parameters.pledge, - pool_parameters.cost, - cbor.Tagged( - 30, - ( - pool_parameters.margin_numerator, - pool_parameters.margin_denominator, - ), +def cborize_initial_pool_registration_certificate_fields( + certificate: CardanoTxCertificate, +) -> CborSequence: + assert certificate.type == CardanoCertificateType.STAKE_POOL_REGISTRATION + + pool_parameters = certificate.pool_parameters + assert pool_parameters is not None + + return ( + certificate.type, + pool_parameters.pool_id, + pool_parameters.vrf_key_hash, + pool_parameters.pledge, + pool_parameters.cost, + cbor.Tagged( + 30, + ( + pool_parameters.margin_numerator, + pool_parameters.margin_denominator, ), - # this relies on pool_parameters.reward_account being validated beforehand - # in _validate_pool_parameters - get_address_bytes_unsafe(pool_parameters.reward_account), - _cborize_pool_owners(keychain, pool_parameters.owners), - _cborize_pool_relays(pool_parameters.relays), - _cborize_pool_metadata(pool_parameters.metadata), - ) - else: - raise INVALID_CERTIFICATE + ), + # this relies on pool_parameters.reward_account being validated beforehand + # in _validate_pool_parameters + get_address_bytes_unsafe(pool_parameters.reward_account), + ) def assert_certificate_cond(condition: bool) -> None: @@ -119,41 +139,27 @@ def _validate_pool_parameters( assert_certificate_cond( pool_parameters.margin_numerator <= pool_parameters.margin_denominator ) - assert_certificate_cond(len(pool_parameters.owners) > 0) + assert_certificate_cond(pool_parameters.owners_count > 0) validate_reward_address(pool_parameters.reward_account, protocol_magic, network_id) - for pool_relay in pool_parameters.relays: - _validate_pool_relay(pool_relay) - - _validate_pool_owners(pool_parameters.owners) - if pool_parameters.metadata: _validate_pool_metadata(pool_parameters.metadata) -def _validate_pool_owners(owners: list[CardanoPoolOwnerType]) -> None: - owners_as_path_count = 0 - for owner in owners: +def validate_pool_owner(owner: CardanoPoolOwner) -> None: + assert_certificate_cond( + owner.staking_key_hash is not None or owner.staking_key_path is not None + ) + if owner.staking_key_hash is not None: + assert_certificate_cond(len(owner.staking_key_hash) == ADDRESS_KEY_HASH_SIZE) + if owner.staking_key_path: assert_certificate_cond( - owner.staking_key_hash is not None or owner.staking_key_path is not None + SCHEMA_STAKING_ANY_ACCOUNT.match(owner.staking_key_path) ) - if owner.staking_key_hash is not None: - assert_certificate_cond( - len(owner.staking_key_hash) == ADDRESS_KEY_HASH_SIZE - ) - if owner.staking_key_path: - assert_certificate_cond( - SCHEMA_STAKING_ANY_ACCOUNT.match(owner.staking_key_path) - ) - - if owner.staking_key_path: - owners_as_path_count += 1 - assert_certificate_cond(owners_as_path_count == 1) - -def _validate_pool_relay(pool_relay: CardanoPoolRelayParametersType) -> None: +def validate_pool_relay(pool_relay: CardanoPoolRelayParameters) -> None: if pool_relay.type == CardanoPoolRelayType.SINGLE_HOST_IP: assert_certificate_cond( pool_relay.ipv4_address is not None or pool_relay.ipv6_address is not None @@ -188,20 +194,13 @@ def _validate_pool_metadata(pool_metadata: CardanoPoolMetadataType) -> None: assert_certificate_cond(all((32 <= ord(c) < 127) for c in pool_metadata.url)) -def _cborize_pool_owners( - keychain: seed.Keychain, pool_owners: list[CardanoPoolOwnerType] -) -> list[bytes]: - result = [] - - for pool_owner in pool_owners: - if pool_owner.staking_key_path: - result.append(get_public_key_hash(keychain, pool_owner.staking_key_path)) - elif pool_owner.staking_key_hash: - result.append(pool_owner.staking_key_hash) - else: - raise ValueError - - return result +def cborize_pool_owner(keychain: seed.Keychain, pool_owner: CardanoPoolOwner) -> bytes: + if pool_owner.staking_key_path: + return get_public_key_hash(keychain, pool_owner.staking_key_path) + elif pool_owner.staking_key_hash: + return pool_owner.staking_key_hash + else: + raise ValueError def _cborize_ipv6_address(ipv6_address: bytes | None) -> bytes | None: @@ -218,41 +217,32 @@ def _cborize_ipv6_address(ipv6_address: bytes | None) -> bytes | None: return result -def _cborize_pool_relays( - pool_relays: list[CardanoPoolRelayParametersType], -) -> list[CborSequence]: - result: list[CborSequence] = [] - - for pool_relay in pool_relays: - if pool_relay.type == CardanoPoolRelayType.SINGLE_HOST_IP: - result.append( - ( - pool_relay.type, - pool_relay.port, - pool_relay.ipv4_address, - _cborize_ipv6_address(pool_relay.ipv6_address), - ) - ) - elif pool_relay.type == CardanoPoolRelayType.SINGLE_HOST_NAME: - result.append( - ( - pool_relay.type, - pool_relay.port, - pool_relay.host_name, - ) - ) - elif pool_relay.type == CardanoPoolRelayType.MULTIPLE_HOST_NAME: - result.append( - ( - pool_relay.type, - pool_relay.host_name, - ) - ) - - return result +def cborize_pool_relay( + pool_relay: CardanoPoolRelayParameters, +) -> CborSequence: + if pool_relay.type == CardanoPoolRelayType.SINGLE_HOST_IP: + return ( + pool_relay.type, + pool_relay.port, + pool_relay.ipv4_address, + _cborize_ipv6_address(pool_relay.ipv6_address), + ) + elif pool_relay.type == CardanoPoolRelayType.SINGLE_HOST_NAME: + return ( + pool_relay.type, + pool_relay.port, + pool_relay.host_name, + ) + elif pool_relay.type == CardanoPoolRelayType.MULTIPLE_HOST_NAME: + return ( + pool_relay.type, + pool_relay.host_name, + ) + else: + raise INVALID_CERTIFICATE -def _cborize_pool_metadata( +def cborize_pool_metadata( pool_metadata: CardanoPoolMetadataType | None, ) -> CborSequence | None: if not pool_metadata: diff --git a/core/src/apps/cardano/helpers/__init__.py b/core/src/apps/cardano/helpers/__init__.py index ef27610cc..16a05e5e4 100644 --- a/core/src/apps/cardano/helpers/__init__.py +++ b/core/src/apps/cardano/helpers/__init__.py @@ -10,8 +10,8 @@ INVALID_AUXILIARY_DATA = wire.ProcessError("Invalid auxiliary data") INVALID_STAKE_POOL_REGISTRATION_TX_STRUCTURE = wire.ProcessError( "Stakepool registration transaction cannot contain other certificates nor withdrawals" ) -INVALID_STAKEPOOL_REGISTRATION_TX_INPUTS = wire.ProcessError( - "Stakepool registration transaction can contain only external inputs" +INVALID_STAKEPOOL_REGISTRATION_TX_WITNESSES = wire.ProcessError( + "Stakepool registration transaction can only contain staking witnesses" ) LOVELACE_MAX_SUPPLY = 45_000_000_000 * 1_000_000 diff --git a/core/src/apps/cardano/helpers/hash_builder_collection.py b/core/src/apps/cardano/helpers/hash_builder_collection.py new file mode 100644 index 000000000..6a582468c --- /dev/null +++ b/core/src/apps/cardano/helpers/hash_builder_collection.py @@ -0,0 +1,98 @@ +from apps.common import cbor + +if False: + from typing import Any, Generic, TypeVar + from trezor.utils import HashContext + + T = TypeVar("T") + K = TypeVar("K") + V = TypeVar("V") +else: + T = 0 # type: ignore + K = 0 # type: ignore + V = 0 # type: ignore + Generic = {T: object, (K, V): object} # type: ignore + + +class HashBuilderCollection: + def __init__(self, size: int) -> None: + self.size = size + self.remaining = size + self.hash_fn: HashContext | None = None + self.parent: "HashBuilderCollection" | None = None + self.has_unfinished_child = False + + def start(self, hash_fn: HashContext) -> "HashBuilderCollection": + self.hash_fn = hash_fn + self.hash_fn.update(self._header_bytes()) + return self + + def _insert_child(self, child: "HashBuilderCollection") -> None: + child.parent = self + assert self.hash_fn is not None + child.start(self.hash_fn) + self.has_unfinished_child = True + + def _do_enter_item(self) -> None: + assert self.hash_fn is not None + assert self.remaining > 0 + if self.has_unfinished_child: + raise RuntimeError # can't add item until child is finished + + self.remaining -= 1 + + def _hash_item(self, item: Any) -> None: + assert self.hash_fn is not None + for chunk in cbor.encode_streamed(item): + self.hash_fn.update(chunk) + + def _header_bytes(self) -> bytes: + raise NotImplementedError + + def finish(self) -> None: + if self.remaining != 0: + raise RuntimeError # not all items were added + if self.parent is not None: + self.parent.has_unfinished_child = False + self.hash_fn = None + self.parent = None + + def __enter__(self) -> "HashBuilderCollection": + assert self.hash_fn is not None + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + if exc_type is None: + self.finish() + + +class HashBuilderList(HashBuilderCollection, Generic[T]): + def append(self, item: T) -> T: + self._do_enter_item() + if isinstance(item, HashBuilderCollection): + self._insert_child(item) + else: + self._hash_item(item) + + return item + + def _header_bytes(self) -> bytes: + return cbor.create_array_header(self.size) + + +class HashBuilderDict(HashBuilderCollection, Generic[K, V]): + def add(self, key: K, value: V) -> V: + self._do_enter_item() + # enter key, this must not nest + assert not isinstance(key, HashBuilderCollection) + self._hash_item(key) + # enter value, this can nest + if isinstance(value, HashBuilderCollection): + self._insert_child(value) + else: + self._hash_item(value) + + return value + + def _header_bytes(self) -> bytes: + return cbor.create_map_header(self.size) diff --git a/core/src/apps/cardano/helpers/paths.py b/core/src/apps/cardano/helpers/paths.py index 5f69854e4..a7141a017 100644 --- a/core/src/apps/cardano/helpers/paths.py +++ b/core/src/apps/cardano/helpers/paths.py @@ -26,6 +26,7 @@ CHANGE_OUTPUT_PATH_NAME = "Change output path" CHANGE_OUTPUT_STAKING_PATH_NAME = "Change output staking path" CERTIFICATE_PATH_NAME = "Certificate path" POOL_OWNER_STAKING_PATH_NAME = "Pool owner staking path" +WITNESS_PATH_NAME = "Witness path" def unharden(item: int) -> int: diff --git a/core/src/apps/cardano/layout.py b/core/src/apps/cardano/layout.py index 94f7534a5..52c59696d 100644 --- a/core/src/apps/cardano/layout.py +++ b/core/src/apps/cardano/layout.py @@ -29,12 +29,12 @@ if False: from trezor import wire from trezor.messages import ( CardanoBlockchainPointerType, - CardanoTxCertificateType, - CardanoTxWithdrawalType, + CardanoTxCertificate, + CardanoTxWithdrawal, CardanoPoolParametersType, - CardanoPoolOwnerType, + CardanoPoolOwner, CardanoPoolMetadataType, - CardanoAssetGroupType, + CardanoToken, ) from trezor.ui.layouts import PropertyType @@ -67,11 +67,8 @@ def is_printable_ascii_bytestring(bytestr: bytes) -> bool: async def confirm_sending( ctx: wire.Context, ada_amount: int, - token_bundle: list[CardanoAssetGroupType], to: str, ) -> None: - await confirm_sending_token_bundle(ctx, token_bundle) - await confirm_output( ctx, to, @@ -86,27 +83,25 @@ async def confirm_sending( ) -async def confirm_sending_token_bundle( - ctx: wire.Context, token_bundle: list[CardanoAssetGroupType] +async def confirm_sending_token( + ctx: wire.Context, policy_id: bytes, token: CardanoToken ) -> None: - for token_group in token_bundle: - for token in token_group.tokens: - await confirm_properties( - ctx, - "confirm_token", - title="Confirm transaction", - props=[ - ( - "Asset fingerprint:", - format_asset_fingerprint( - policy_id=token_group.policy_id, - asset_name_bytes=token.asset_name_bytes, - ), - ), - ("Amount sent:", format_amount(token.amount, 0)), - ], - br_code=ButtonRequestType.Other, - ) + await confirm_properties( + ctx, + "confirm_token", + title="Confirm transaction", + props=[ + ( + "Asset fingerprint:", + format_asset_fingerprint( + policy_id=policy_id, + asset_name_bytes=token.asset_name_bytes, + ), + ), + ("Amount sent:", format_amount(token.amount, 0)), + ], + br_code=ButtonRequestType.Other, + ) async def show_warning_tx_output_contains_tokens(ctx: wire.Context) -> None: @@ -212,7 +207,6 @@ async def show_warning_tx_staking_key_hash( async def confirm_transaction( ctx: wire.Context, - amount: int, fee: int, protocol_magic: int, ttl: int | None, @@ -220,12 +214,13 @@ async def confirm_transaction( is_network_id_verifiable: bool, ) -> None: props: list[PropertyType] = [ - ("Transaction amount:", format_coin_amount(amount)), ("Transaction fee:", format_coin_amount(fee)), ] if is_network_id_verifiable: - props.append(("Network:", protocol_magics.to_ui_string(protocol_magic))) + props.append( + ("Network: %s" % protocol_magics.to_ui_string(protocol_magic), None) + ) props.append( ("Valid since: %s" % format_optional_int(validity_interval_start), None) @@ -243,7 +238,7 @@ async def confirm_transaction( async def confirm_certificate( - ctx: wire.Context, certificate: CardanoTxCertificateType + ctx: wire.Context, certificate: CardanoTxCertificate ) -> None: # stake pool registration requires custom confirmation logic not covered # in this call @@ -270,10 +265,7 @@ async def confirm_certificate( async def confirm_stake_pool_parameters( - ctx: wire.Context, - pool_parameters: CardanoPoolParametersType, - network_id: int, - protocol_magic: int, + ctx: wire.Context, pool_parameters: CardanoPoolParametersType ) -> None: margin_percentage = ( 100.0 * pool_parameters.margin_numerator / pool_parameters.margin_denominator @@ -302,39 +294,36 @@ async def confirm_stake_pool_parameters( ) -async def confirm_stake_pool_owners( +async def confirm_stake_pool_owner( ctx: wire.Context, keychain: seed.Keychain, - owners: list[CardanoPoolOwnerType], + owner: CardanoPoolOwner, network_id: int, ) -> None: props: list[tuple[str, str | None]] = [] - for index, owner in enumerate(owners, 1): - if owner.staking_key_path: - props.append( - ("Pool owner #%d:" % index, address_n_to_str(owner.staking_key_path)) - ) - props.append( - ( - encode_human_readable_address( - pack_reward_address_bytes( - get_public_key_hash(keychain, owner.staking_key_path), - network_id, - ) - ), - None, - ) + if owner.staking_key_path: + props.append(("Pool owner:", address_n_to_str(owner.staking_key_path))) + props.append( + ( + encode_human_readable_address( + pack_reward_address_bytes( + get_public_key_hash(keychain, owner.staking_key_path), + network_id, + ) + ), + None, ) - else: - assert owner.staking_key_hash is not None # validate_pool_owners - props.append( - ( - "Pool owner #%d:" % index, - encode_human_readable_address( - pack_reward_address_bytes(owner.staking_key_hash, network_id) - ), - ) + ) + else: + assert owner.staking_key_hash is not None # validate_pool_owners + props.append( + ( + "Pool owner:", + encode_human_readable_address( + pack_reward_address_bytes(owner.staking_key_hash, network_id) + ), ) + ) await confirm_properties( ctx, @@ -371,7 +360,7 @@ async def confirm_stake_pool_metadata( ) -async def confirm_transaction_network_ttl( +async def confirm_stake_pool_registration_final( ctx: wire.Context, protocol_magic: int, ttl: int | None, @@ -379,36 +368,21 @@ async def confirm_transaction_network_ttl( ) -> None: await confirm_properties( ctx, - "confirm_pool_network", + "confirm_pool_final", title="Confirm transaction", props=[ + ("Confirm signing the stake pool registration as an owner.", None), ("Network:", protocol_magics.to_ui_string(protocol_magic)), - ( - "Valid since: %s" % format_optional_int(validity_interval_start), - None, - ), - ("TTL: %s" % format_optional_int(ttl), None), + ("Valid since:", format_optional_int(validity_interval_start)), + ("TTL:", format_optional_int(ttl)), ], - br_code=ButtonRequestType.Other, - ) - - -async def confirm_stake_pool_registration_final( - ctx: wire.Context, -) -> None: - await confirm_metadata( - ctx, - "confirm_pool_final", - title="Confirm transaction", - content="Confirm signing the stake pool registration as an owner", - hide_continue=True, hold=True, br_code=ButtonRequestType.Other, ) async def confirm_withdrawal( - ctx: wire.Context, withdrawal: CardanoTxWithdrawalType + ctx: wire.Context, withdrawal: CardanoTxWithdrawal ) -> None: await confirm_properties( ctx, diff --git a/core/src/apps/cardano/sign_tx.py b/core/src/apps/cardano/sign_tx.py index 12ef559c5..0bb042c75 100644 --- a/core/src/apps/cardano/sign_tx.py +++ b/core/src/apps/cardano/sign_tx.py @@ -1,16 +1,35 @@ +from micropython import const + from trezor import log, wire from trezor.crypto import hashlib from trezor.crypto.curve import ed25519 -from trezor.enums import CardanoAddressType, CardanoCertificateType +from trezor.enums import ( + CardanoAddressType, + CardanoCertificateType, + CardanoTxSigningMode, + CardanoTxWitnessType, +) from trezor.messages import ( CardanoAddressParametersType, - CardanoSignedTx, - CardanoSignedTxChunk, - CardanoSignedTxChunkAck, + CardanoAssetGroup, + CardanoPoolOwner, + CardanoPoolRelayParameters, + CardanoSignTxFinished, + CardanoSignTxInit, + CardanoToken, + CardanoTxAuxiliaryData, + CardanoTxBodyHash, + CardanoTxCertificate, + CardanoTxHostAck, + CardanoTxInput, + CardanoTxItemAck, + CardanoTxOutput, + CardanoTxWithdrawal, + CardanoTxWitnessRequest, + CardanoTxWitnessResponse, ) from apps.common import cbor, safety_checks -from apps.common.paths import validate_path from . import seed from .address import ( @@ -21,16 +40,24 @@ from .address import ( validate_output_address, ) from .auxiliary_data import ( - get_auxiliary_data_cbor, - hash_auxiliary_data, + get_auxiliary_data_hash_and_supplement, show_auxiliary_data, validate_auxiliary_data, ) -from .byron_address import get_address_attributes -from .certificates import cborize_certificate, validate_certificate +from .certificates import ( + assert_certificate_cond, + cborize_certificate, + cborize_initial_pool_registration_certificate_fields, + cborize_pool_metadata, + cborize_pool_owner, + cborize_pool_relay, + validate_certificate, + validate_pool_owner, + validate_pool_relay, +) from .helpers import ( INVALID_STAKE_POOL_REGISTRATION_TX_STRUCTURE, - INVALID_STAKEPOOL_REGISTRATION_TX_INPUTS, + INVALID_STAKEPOOL_REGISTRATION_TX_WITNESSES, INVALID_TOKEN_BUNDLE_OUTPUT, INVALID_WITHDRAWAL, LOVELACE_MAX_SUPPLY, @@ -38,6 +65,7 @@ from .helpers import ( protocol_magics, staking_use_cases, ) +from .helpers.hash_builder_collection import HashBuilderDict, HashBuilderList from .helpers.paths import ( ACCOUNT_PATH_INDEX, BIP_PATH_LENGTH, @@ -50,17 +78,18 @@ from .helpers.paths import ( SCHEMA_ADDRESS, SCHEMA_STAKING, SCHEMA_STAKING_ANY_ACCOUNT, + WITNESS_PATH_NAME, ) from .helpers.utils import derive_public_key, to_account_path from .layout import ( confirm_certificate, confirm_sending, + confirm_sending_token, confirm_stake_pool_metadata, - confirm_stake_pool_owners, + confirm_stake_pool_owner, confirm_stake_pool_parameters, confirm_stake_pool_registration_final, confirm_transaction, - confirm_transaction_network_ttl, confirm_withdrawal, show_warning_path, show_warning_tx_different_staking_account, @@ -70,58 +99,63 @@ from .layout import ( show_warning_tx_pointer_address, show_warning_tx_staking_key_hash, ) -from .seed import is_byron_path, is_shelley_path +from .seed import is_byron_path if False: - from typing import Any, Optional, Union - - from trezor.messages import ( - CardanoAssetGroupType, - CardanoSignTx, - CardanoTxCertificateType, - CardanoTxInputType, - CardanoTxOutputType, - CardanoTxWithdrawalType, - ) - - from apps.common.cbor import CborSequence + from typing import Any, Union from apps.common.paths import PathSchema - CborizedTokenBundle = cbor.OrderedMap[bytes, cbor.OrderedMap[bytes, int]] - CborizedTxOutput = tuple[bytes, Union[int, tuple[int, CborizedTokenBundle]]] - CborizedSignedTx = tuple[dict, dict, Optional[cbor.Raw]] - TxHash = bytes + CardanoTxResponseType = Union[CardanoTxItemAck, CardanoTxWitnessResponse] MINTING_POLICY_ID_LENGTH = 28 MAX_ASSET_NAME_LENGTH = 32 -MAX_TX_CHUNK_SIZE = 256 -MAX_TX_OUTPUT_SIZE = 4000 + +TX_BODY_KEY_INPUTS = const(0) +TX_BODY_KEY_OUTPUTS = const(1) +TX_BODY_KEY_FEE = const(2) +TX_BODY_KEY_TTL = const(3) +TX_BODY_KEY_CERTIFICATES = const(4) +TX_BODY_KEY_WITHDRAWALS = const(5) +TX_BODY_KEY_AUXILIARY_DATA = const(7) +TX_BODY_KEY_VALIDITY_INTERVAL_START = const(8) + +POOL_REGISTRATION_CERTIFICATE_ITEMS_COUNT = 10 @seed.with_keychain async def sign_tx( - ctx: wire.Context, msg: CardanoSignTx, keychain: seed.Keychain -) -> CardanoSignedTx: - if msg.fee > LOVELACE_MAX_SUPPLY: - raise wire.ProcessError("Fee is out of range!") + ctx: wire.Context, msg: CardanoSignTxInit, keychain: seed.Keychain +) -> CardanoSignTxFinished: + is_network_id_verifiable = await _validate_tx_signing_request(ctx, msg) + + # inputs, outputs and fee are mandatory fields, count the number of optional fields present + tx_body_map_item_count = 3 + sum( + ( + msg.ttl is not None, + msg.certificates_count > 0, + msg.withdrawals_count > 0, + msg.has_auxiliary_data, + msg.validity_interval_start is not None, + ) + ) - validate_network_info(msg.network_id, msg.protocol_magic) + hash_fn = hashlib.blake2b(outlen=32) + tx_dict: HashBuilderDict[int, Any] = HashBuilderDict(tx_body_map_item_count) + tx_dict.start(hash_fn) + with tx_dict: + await _process_transaction(ctx, msg, keychain, tx_dict) - try: - if _has_stake_pool_registration(msg): - cborized_tx, tx_hash = await _sign_stake_pool_registration_tx( - ctx, msg, keychain - ) - else: - cborized_tx, tx_hash = await _sign_ordinary_tx(ctx, msg, keychain) - - signed_tx_chunks = cbor.encode_chunked(cborized_tx, MAX_TX_CHUNK_SIZE) + await _confirm_transaction(ctx, msg, is_network_id_verifiable) - for signed_tx_chunk in signed_tx_chunks: - response = CardanoSignedTxChunk(signed_tx_chunk=signed_tx_chunk) - await ctx.call(response, CardanoSignedTxChunkAck) + try: + tx_hash = hash_fn.digest() + response_after_witness_requests = await _process_witness_requests( + ctx, keychain, tx_hash, msg.witness_requests_count, msg.signing_mode + ) + await ctx.call(response_after_witness_requests, CardanoTxHostAck) - return CardanoSignedTx(tx_hash=tx_hash, serialized_tx=None) + await ctx.call(CardanoTxBodyHash(tx_hash=tx_hash), CardanoTxHostAck) + return CardanoSignTxFinished() except ValueError as e: if __debug__: @@ -129,333 +163,341 @@ async def sign_tx( raise wire.ProcessError("Signing failed") -async def _sign_ordinary_tx( - ctx: wire.Context, msg: CardanoSignTx, keychain: seed.Keychain -) -> tuple[CborizedSignedTx, TxHash]: - for i in msg.inputs: - await validate_path( - ctx, keychain, i.address_n, SCHEMA_ADDRESS.match(i.address_n) - ) +async def _validate_tx_signing_request( + ctx: wire.Context, msg: CardanoSignTxInit +) -> bool: + """Validate the data in the signing request and return whether the provided network id is verifiable.""" + if msg.fee > LOVELACE_MAX_SUPPLY: + raise wire.ProcessError("Fee is out of range!") + validate_network_info(msg.network_id, msg.protocol_magic) - _validate_outputs(keychain, msg.outputs, msg.protocol_magic, msg.network_id) - _validate_certificates(msg.certificates, msg.protocol_magic, msg.network_id) - _validate_withdrawals(msg.withdrawals) - validate_auxiliary_data(msg.auxiliary_data) + is_network_id_verifiable = _is_network_id_verifiable(msg) + if msg.signing_mode == CardanoTxSigningMode.ORDINARY_TRANSACTION: + if not is_network_id_verifiable: + await show_warning_tx_network_unverifiable(ctx) + elif msg.signing_mode == CardanoTxSigningMode.POOL_REGISTRATION_AS_OWNER: + _validate_stake_pool_registration_tx_structure(msg) - # display the transaction in UI - await _show_standard_tx(ctx, keychain, msg) + return is_network_id_verifiable - return _cborize_signed_tx(keychain, msg) +async def _process_transaction( + ctx: wire.Context, + msg: CardanoSignTxInit, + keychain: seed.Keychain, + tx_dict: HashBuilderDict, +) -> None: + inputs_list: HashBuilderList[tuple[bytes, int]] = HashBuilderList(msg.inputs_count) + with tx_dict.add(TX_BODY_KEY_INPUTS, inputs_list): + await _process_inputs(ctx, inputs_list, msg.inputs_count) -async def _sign_stake_pool_registration_tx( - ctx: wire.Context, msg: CardanoSignTx, keychain: seed.Keychain -) -> tuple[CborizedSignedTx, TxHash]: - """ - We have a separate tx signing flow for stake pool registration because it's a - transaction where the witnessable entries (i.e. inputs, withdrawals, etc.) - in the transaction are not supposed to be controlled by the HW wallet, which - means the user is vulnerable to unknowingly supplying a witness for an UTXO - or other tx entry they think is external, resulting in the co-signers - gaining control over their funds (Something SLIP-0019 is dealing with for - BTC but no similar standard is currently available for Cardano). Hence we - completely forbid witnessing inputs and other entries of the transaction - except the stake pool certificate itself and we provide a witness only to the - user's staking key in the list of pool owners. - """ - _validate_stake_pool_registration_tx_structure(msg) + outputs_list: HashBuilderList = HashBuilderList(msg.outputs_count) + with tx_dict.add(TX_BODY_KEY_OUTPUTS, outputs_list): + await _process_outputs( + ctx, + keychain, + outputs_list, + msg.outputs_count, + msg.signing_mode, + msg.protocol_magic, + msg.network_id, + ) - _ensure_no_signing_inputs(msg.inputs) - _validate_outputs(keychain, msg.outputs, msg.protocol_magic, msg.network_id) - _validate_certificates(msg.certificates, msg.protocol_magic, msg.network_id) - validate_auxiliary_data(msg.auxiliary_data) + tx_dict.add(TX_BODY_KEY_FEE, msg.fee) - await _show_stake_pool_registration_tx(ctx, keychain, msg) + if msg.ttl is not None: + tx_dict.add(TX_BODY_KEY_TTL, msg.ttl) - return _cborize_signed_tx(keychain, msg) + if msg.certificates_count > 0: + certificates_list: HashBuilderList = HashBuilderList(msg.certificates_count) + with tx_dict.add(TX_BODY_KEY_CERTIFICATES, certificates_list): + await _process_certificates( + ctx, + keychain, + certificates_list, + msg.certificates_count, + msg.signing_mode, + msg.protocol_magic, + msg.network_id, + ) + if msg.withdrawals_count > 0: + withdrawals_dict: HashBuilderDict[bytes, int] = HashBuilderDict( + msg.withdrawals_count + ) + with tx_dict.add(TX_BODY_KEY_WITHDRAWALS, withdrawals_dict): + await _process_withdrawals( + ctx, + keychain, + withdrawals_dict, + msg.withdrawals_count, + msg.protocol_magic, + msg.network_id, + ) -def _has_stake_pool_registration(msg: CardanoSignTx) -> bool: - return any( - cert.type == CardanoCertificateType.STAKE_POOL_REGISTRATION - for cert in msg.certificates - ) + if msg.has_auxiliary_data: + await _process_auxiliary_data( + ctx, + keychain, + tx_dict, + msg.protocol_magic, + msg.network_id, + ) + if msg.validity_interval_start is not None: + tx_dict.add(TX_BODY_KEY_VALIDITY_INTERVAL_START, msg.validity_interval_start) -def validate_network_info(network_id: int, protocol_magic: int) -> None: - """ - We are only concerned about checking that both network_id and protocol_magic - belong to the mainnet or that both belong to a testnet. We don't need to check for - consistency between various testnets (at least for now). - """ - is_mainnet_network_id = network_ids.is_mainnet(network_id) - is_mainnet_protocol_magic = protocol_magics.is_mainnet(protocol_magic) - if is_mainnet_network_id != is_mainnet_protocol_magic: - raise wire.ProcessError("Invalid network id/protocol magic combination!") +async def _confirm_transaction( + ctx: wire.Context, + msg: CardanoSignTxInit, + is_network_id_verifiable: bool, +) -> None: + if msg.signing_mode == CardanoTxSigningMode.ORDINARY_TRANSACTION: + await confirm_transaction( + ctx, + msg.fee, + msg.protocol_magic, + msg.ttl, + msg.validity_interval_start, + is_network_id_verifiable, + ) + elif msg.signing_mode == CardanoTxSigningMode.POOL_REGISTRATION_AS_OWNER: + await confirm_stake_pool_registration_final( + ctx, msg.protocol_magic, msg.ttl, msg.validity_interval_start + ) -def _validate_stake_pool_registration_tx_structure(msg: CardanoSignTx) -> None: - # ensures that there is exactly one certificate, which is stake pool registration, - # and no withdrawals - if ( - len(msg.certificates) != 1 - or not _has_stake_pool_registration(msg) - or len(msg.withdrawals) != 0 - ): - raise INVALID_STAKE_POOL_REGISTRATION_TX_STRUCTURE +async def _process_inputs( + ctx: wire.Context, + inputs_list: HashBuilderList[tuple[bytes, int]], + inputs_count: int, +) -> None: + """Read, validate and serialize the inputs.""" + for index in range(inputs_count): + input: CardanoTxInput = await ctx.call(CardanoTxItemAck(), CardanoTxInput) + inputs_list.append((input.prev_hash, input.prev_index)) -def _validate_outputs( +async def _process_outputs( + ctx: wire.Context, keychain: seed.Keychain, - outputs: list[CardanoTxOutputType], + outputs_list: HashBuilderList, + outputs_count: int, + signing_mode: CardanoTxSigningMode, protocol_magic: int, network_id: int, ) -> None: + """Read, validate, confirm and serialize the outputs, return the total non-change output amount.""" total_amount = 0 - for output in outputs: - total_amount += output.amount - - if output.address_parameters and output.address is not None: - raise wire.ProcessError( - "Outputs can not contain both address and address_parameters fields!" + for _ in range(outputs_count): + output: CardanoTxOutput = await ctx.call(CardanoTxItemAck(), CardanoTxOutput) + _validate_output(output, protocol_magic, network_id) + if signing_mode == CardanoTxSigningMode.ORDINARY_TRANSACTION: + await _show_output( + ctx, + keychain, + output, + signing_mode, + protocol_magic, + network_id, ) - if output.address_parameters: - validate_address_parameters(output.address_parameters) - elif output.address is not None: - validate_output_address(output.address, protocol_magic, network_id) + output_address = _get_output_address( + keychain, protocol_magic, network_id, output + ) + + if output.asset_groups_count == 0: + outputs_list.append((output_address, output.amount)) else: - raise wire.ProcessError( - "Each output must have an address field or address_parameters!" - ) + # output structure is: [address, [amount, asset_groups]] + output_list: HashBuilderList = HashBuilderList(2) + with outputs_list.append(output_list): + output_list.append(output_address) + output_value_list: HashBuilderList = HashBuilderList(2) + with output_list.append(output_value_list): + output_value_list.append(output.amount) + asset_groups_dict: HashBuilderDict[ + bytes, HashBuilderDict[bytes, int] + ] = HashBuilderDict(output.asset_groups_count) + with output_value_list.append(asset_groups_dict): + await _process_asset_groups( + ctx, + asset_groups_dict, + output.asset_groups_count, + _should_show_tokens(output, signing_mode), + ) - _validate_token_bundle(output.token_bundle) - _validate_max_tx_output_size(keychain, output, protocol_magic, network_id) + total_amount += output.amount if total_amount > LOVELACE_MAX_SUPPLY: raise wire.ProcessError("Total transaction amount is out of range!") -def _validate_max_tx_output_size( - keychain: seed.Keychain, - output: CardanoTxOutputType, - protocol_magic: int, - network_id: int, +async def _process_asset_groups( + ctx: wire.Context, + asset_groups_dict: HashBuilderDict[bytes, HashBuilderDict[bytes, int]], + asset_groups_count: int, + should_show_tokens: bool, ) -> None: - """ - Output value size is currently limited to 4000 bytes at protocol level. Given - the maximum transaction size Trezor can handle (~9kB), we also want to enforce - this size limit here so that when the limit is raised at protocol level again, - Trezor would still not be able to produce larger outputs than it could - reliably spend. Once Cardano-transaction signing is refactored to be completely - streamed and maximum supported transaction size is thus raised, this limit can be lifted. - """ - cborized_output = _cborize_output(keychain, output, protocol_magic, network_id) - - assert len(cborized_output) == 2 - # only the output value is used for counting the size in cardano-ledger-specs - cborized_output_value = cborized_output[1] - serialized_output_chunks = cbor.encode_streamed(cborized_output_value) - - serialized_output_size = 0 - for chunk in serialized_output_chunks: - serialized_output_size += len(chunk) - - if serialized_output_size > MAX_TX_OUTPUT_SIZE: - raise wire.ProcessError( - "Maximum tx output value size (%s bytes) exceeded!" % MAX_TX_OUTPUT_SIZE + """Read, validate and serialize the asset groups of an output.""" + # until the CIP with canonical CBOR is finalized storing the seen_policy_ids is the only way we can check for + # duplicate policy_ids + seen_policy_ids: set[bytes] = set() + for _ in range(asset_groups_count): + asset_group: CardanoAssetGroup = await ctx.call( + CardanoTxItemAck(), CardanoAssetGroup ) + asset_group.policy_id = bytes(asset_group.policy_id) + _validate_asset_group(asset_group, seen_policy_ids) + seen_policy_ids.add(asset_group.policy_id) - -def _validate_token_bundle(token_bundle: list[CardanoAssetGroupType]) -> None: - seen_policy_ids = set() - for token_group in token_bundle: - policy_id = bytes(token_group.policy_id) - - if len(policy_id) != MINTING_POLICY_ID_LENGTH: - raise INVALID_TOKEN_BUNDLE_OUTPUT - - if policy_id in seen_policy_ids: - raise INVALID_TOKEN_BUNDLE_OUTPUT - else: - seen_policy_ids.add(policy_id) - - if not token_group.tokens: - raise INVALID_TOKEN_BUNDLE_OUTPUT - - seen_asset_name_bytes = set() - for token in token_group.tokens: - asset_name_bytes = bytes(token.asset_name_bytes) - if len(asset_name_bytes) > MAX_ASSET_NAME_LENGTH: - raise INVALID_TOKEN_BUNDLE_OUTPUT - - if asset_name_bytes in seen_asset_name_bytes: - raise INVALID_TOKEN_BUNDLE_OUTPUT - else: - seen_asset_name_bytes.add(asset_name_bytes) - - -def _ensure_no_signing_inputs(inputs: list[CardanoTxInputType]) -> None: - if any(i.address_n for i in inputs): - raise INVALID_STAKEPOOL_REGISTRATION_TX_INPUTS + tokens: HashBuilderDict[bytes, int] = HashBuilderDict(asset_group.tokens_count) + with asset_groups_dict.add(asset_group.policy_id, tokens): + await _process_tokens( + ctx, + tokens, + asset_group.policy_id, + asset_group.tokens_count, + should_show_tokens, + ) -def _validate_certificates( - certificates: list[CardanoTxCertificateType], protocol_magic: int, network_id: int +async def _process_tokens( + ctx: wire.Context, + tokens_dict: HashBuilderDict[bytes, int], + policy_id: bytes, + tokens_count: int, + should_show_tokens: bool, ) -> None: - for certificate in certificates: - validate_certificate(certificate, protocol_magic, network_id) - - -def _validate_withdrawals(withdrawals: list[CardanoTxWithdrawalType]) -> None: - seen_withdrawals = set() - for withdrawal in withdrawals: - if not SCHEMA_STAKING_ANY_ACCOUNT.match(withdrawal.path): - raise INVALID_WITHDRAWAL - - if not 0 <= withdrawal.amount < LOVELACE_MAX_SUPPLY: - raise INVALID_WITHDRAWAL - - path_tuple = tuple(withdrawal.path) - if path_tuple in seen_withdrawals: - raise wire.ProcessError("Duplicate withdrawals") - else: - seen_withdrawals.add(path_tuple) - - -def _cborize_signed_tx( - keychain: seed.Keychain, msg: CardanoSignTx -) -> tuple[CborizedSignedTx, TxHash]: - tx_body = _cborize_tx_body(keychain, msg) - tx_hash = _hash_tx_body(tx_body) - - witnesses = _cborize_witnesses( - keychain, - msg.inputs, - msg.certificates, - msg.withdrawals, - tx_hash, - msg.protocol_magic, - ) - - auxiliary_data = None - if msg.auxiliary_data: - auxiliary_data_cbor = get_auxiliary_data_cbor( - keychain, msg.auxiliary_data, msg.protocol_magic, msg.network_id - ) - auxiliary_data = cbor.Raw(auxiliary_data_cbor) - - return (tx_body, witnesses, auxiliary_data), tx_hash - - -def _cborize_tx_body(keychain: seed.Keychain, msg: CardanoSignTx) -> dict: - inputs_for_cbor = _cborize_inputs(msg.inputs) - outputs_for_cbor = _cborize_outputs( - keychain, msg.outputs, msg.protocol_magic, msg.network_id - ) - - tx_body = { - 0: inputs_for_cbor, - 1: outputs_for_cbor, - 2: msg.fee, - } - - if msg.ttl: - tx_body[3] = msg.ttl - - if msg.certificates: - certificates_for_cbor = _cborize_certificates(keychain, msg.certificates) - tx_body[4] = certificates_for_cbor - - if msg.withdrawals: - withdrawals_for_cbor = _cborize_withdrawals( - keychain, msg.withdrawals, msg.protocol_magic, msg.network_id - ) - tx_body[5] = withdrawals_for_cbor - - # tx_body[6] is for protocol updates, which we don't support - - if msg.auxiliary_data: - auxiliary_data_cbor = get_auxiliary_data_cbor( - keychain, msg.auxiliary_data, msg.protocol_magic, msg.network_id - ) - tx_body[7] = hash_auxiliary_data(bytes(auxiliary_data_cbor)) - - if msg.validity_interval_start: - tx_body[8] = msg.validity_interval_start - - return tx_body - - -def _cborize_inputs(inputs: list[CardanoTxInputType]) -> list[tuple[bytes, int]]: - return [(tx_input.prev_hash, tx_input.prev_index) for tx_input in inputs] - - -def _cborize_outputs( + """Read, validate, confirm and serialize the tokens of an asset group.""" + # until the CIP with canonical CBOR is finalized storing the seen_asset_name_bytes is the only way we can check for + # duplicate tokens + seen_asset_name_bytes: set[bytes] = set() + for _ in range(tokens_count): + token: CardanoToken = await ctx.call(CardanoTxItemAck(), CardanoToken) + token.asset_name_bytes = bytes(token.asset_name_bytes) + _validate_token(token, seen_asset_name_bytes) + seen_asset_name_bytes.add(token.asset_name_bytes) + if should_show_tokens: + await confirm_sending_token(ctx, policy_id, token) + + tokens_dict.add(token.asset_name_bytes, token.amount) + + +async def _process_certificates( + ctx: wire.Context, keychain: seed.Keychain, - outputs: list[CardanoTxOutputType], + certificates_list: HashBuilderList, + certificates_count: int, + signing_mode: CardanoTxSigningMode, protocol_magic: int, network_id: int, -) -> list[CborizedTxOutput]: - return [ - _cborize_output(keychain, output, protocol_magic, network_id) - for output in outputs - ] - +) -> None: + """Read, validate, confirm and serialize the certificates.""" + if certificates_count == 0: + return -def _cborize_output( - keychain: seed.Keychain, - output: CardanoTxOutputType, - protocol_magic: int, - network_id: int, -) -> CborizedTxOutput: - amount = output.amount - if output.address_parameters: - address = derive_address_bytes( - keychain, output.address_parameters, protocol_magic, network_id + for _ in range(certificates_count): + certificate: CardanoTxCertificate = await ctx.call( + CardanoTxItemAck(), CardanoTxCertificate ) - else: - assert output.address is not None # _validate_outputs - address = get_address_bytes_unsafe(output.address) + validate_certificate(certificate, signing_mode, protocol_magic, network_id) + await _show_certificate(ctx, certificate, signing_mode) - if not output.token_bundle: - return (address, amount) - else: - return (address, (amount, _cborize_token_bundle(output.token_bundle))) + if certificate.type == CardanoCertificateType.STAKE_POOL_REGISTRATION: + pool_parameters = certificate.pool_parameters + assert pool_parameters is not None # validate_certificate + pool_items_list: HashBuilderList = HashBuilderList( + POOL_REGISTRATION_CERTIFICATE_ITEMS_COUNT + ) + with certificates_list.append(pool_items_list): + for item in cborize_initial_pool_registration_certificate_fields( + certificate + ): + pool_items_list.append(item) + + pool_owners_list: HashBuilderList[bytes] = HashBuilderList( + pool_parameters.owners_count + ) + with pool_items_list.append(pool_owners_list): + await _process_pool_owners( + ctx, + keychain, + pool_owners_list, + pool_parameters.owners_count, + network_id, + ) + + relays_list: HashBuilderList[cbor.CborSequence] = HashBuilderList( + pool_parameters.relays_count + ) + with pool_items_list.append(relays_list): + await _process_pool_relays( + ctx, relays_list, pool_parameters.relays_count + ) + + pool_items_list.append(cborize_pool_metadata(pool_parameters.metadata)) + else: + certificates_list.append(cborize_certificate(keychain, certificate)) -def _cborize_token_bundle( - token_bundle: list[CardanoAssetGroupType], -) -> CborizedTokenBundle: - result: CborizedTokenBundle = cbor.OrderedMap() - for token_group in token_bundle: - cborized_policy_id = bytes(token_group.policy_id) - cborized_token_group = result[cborized_policy_id] = cbor.OrderedMap() +async def _process_pool_owners( + ctx: wire.Context, + keychain: seed.Keychain, + pool_owners_list: HashBuilderList[bytes], + owners_count: int, + network_id: int, +) -> None: + owners_as_path_count = 0 + for _ in range(owners_count): + owner: CardanoPoolOwner = await ctx.call(CardanoTxItemAck(), CardanoPoolOwner) + validate_pool_owner(owner) + await _show_pool_owner(ctx, keychain, owner, network_id) - for token in token_group.tokens: - cborized_asset_name = bytes(token.asset_name_bytes) - cborized_token_group[cborized_asset_name] = token.amount + pool_owners_list.append(cborize_pool_owner(keychain, owner)) - return result + if owner.staking_key_path: + owners_as_path_count += 1 + assert_certificate_cond(owners_as_path_count == 1) -def _cborize_certificates( - keychain: seed.Keychain, - certificates: list[CardanoTxCertificateType], -) -> list[CborSequence]: - return [cborize_certificate(keychain, cert) for cert in certificates] + +async def _process_pool_relays( + ctx: wire.Context, + relays_list: HashBuilderList[cbor.CborSequence], + relays_count: int, +) -> None: + for _ in range(relays_count): + relay: CardanoPoolRelayParameters = await ctx.call( + CardanoTxItemAck(), CardanoPoolRelayParameters + ) + validate_pool_relay(relay) + relays_list.append(cborize_pool_relay(relay)) -def _cborize_withdrawals( +async def _process_withdrawals( + ctx: wire.Context, keychain: seed.Keychain, - withdrawals: list[CardanoTxWithdrawalType], + withdrawals_dict: HashBuilderDict[bytes, int], + withdrawals_count: int, protocol_magic: int, network_id: int, -) -> cbor.OrderedMap[bytes, int]: - result: cbor.OrderedMap[bytes, int] = cbor.OrderedMap() - for withdrawal in withdrawals: +) -> None: + """Read, validate, confirm and serialize the withdrawals.""" + if withdrawals_count == 0: + return + + # until the CIP with canonical CBOR is finalized storing the seen_withdrawals is the only way we can check for + # duplicate withdrawals + seen_withdrawals: set[tuple[int, ...]] = set() + for _ in range(withdrawals_count): + withdrawal: CardanoTxWithdrawal = await ctx.call( + CardanoTxItemAck(), CardanoTxWithdrawal + ) + _validate_withdrawal(withdrawal, seen_withdrawals) + await confirm_withdrawal(ctx, withdrawal) reward_address = derive_address_bytes( keychain, CardanoAddressParametersType( @@ -466,234 +508,303 @@ def _cborize_withdrawals( network_id, ) - result[reward_address] = withdrawal.amount + withdrawals_dict.add(reward_address, withdrawal.amount) + + +async def _process_auxiliary_data( + ctx: wire.Context, + keychain: seed.Keychain, + tx_body_builder_dict: HashBuilderDict, + protocol_magic: int, + network_id: int, +) -> None: + """Read, validate, confirm and serialize the auxiliary data.""" + auxiliary_data: CardanoTxAuxiliaryData = await ctx.call( + CardanoTxItemAck(), CardanoTxAuxiliaryData + ) + validate_auxiliary_data(auxiliary_data) + + ( + auxiliary_data_hash, + auxiliary_data_supplement, + ) = get_auxiliary_data_hash_and_supplement( + keychain, auxiliary_data, protocol_magic, network_id + ) - return result + await show_auxiliary_data( + ctx, + keychain, + auxiliary_data_hash, + auxiliary_data.catalyst_registration_parameters, + protocol_magic, + network_id, + ) + tx_body_builder_dict.add(TX_BODY_KEY_AUXILIARY_DATA, auxiliary_data_hash) -def _hash_tx_body(tx_body: dict) -> bytes: - tx_body_cbor_chunks = cbor.encode_streamed(tx_body) + await ctx.call(auxiliary_data_supplement, CardanoTxHostAck) - hashfn = hashlib.blake2b(outlen=32) - for chunk in tx_body_cbor_chunks: - hashfn.update(chunk) - return hashfn.digest() +async def _process_witness_requests( + ctx: wire.Context, + keychain: seed.Keychain, + tx_hash: bytes, + witness_requests_count: int, + signing_mode: CardanoTxSigningMode, +) -> CardanoTxResponseType: + response: CardanoTxResponseType = CardanoTxItemAck() + for _ in range(witness_requests_count): + witness_request = await ctx.call(response, CardanoTxWitnessRequest) + _validate_witness(signing_mode, witness_request) + await _show_witness(ctx, witness_request.path) + + response = ( + _get_byron_witness(keychain, witness_request.path, tx_hash) + if is_byron_path(witness_request.path) + else _get_shelley_witness(keychain, witness_request.path, tx_hash) + ) + return response -def _cborize_witnesses( +def _get_byron_witness( keychain: seed.Keychain, - inputs: list[CardanoTxInputType], - certificates: list[CardanoTxCertificateType], - withdrawals: list[CardanoTxWithdrawalType], - tx_body_hash: bytes, - protocol_magic: int, -) -> dict: - shelley_witnesses = _cborize_shelley_witnesses( - keychain, inputs, certificates, withdrawals, tx_body_hash + path: list[int], + tx_hash: bytes, +) -> CardanoTxWitnessResponse: + node = keychain.derive(path) + return CardanoTxWitnessResponse( + type=CardanoTxWitnessType.BYRON_WITNESS, + pub_key=derive_public_key(keychain, path), + signature=_sign_tx_hash(keychain, tx_hash, path), + chain_code=node.chain_code(), ) - byron_witnesses = _cborize_byron_witnesses( - keychain, inputs, tx_body_hash, protocol_magic + + +def _get_shelley_witness( + keychain: seed.Keychain, + path: list[int], + tx_hash: bytes, +) -> CardanoTxWitnessResponse: + return CardanoTxWitnessResponse( + type=CardanoTxWitnessType.SHELLEY_WITNESS, + pub_key=derive_public_key(keychain, path), + signature=_sign_tx_hash(keychain, tx_hash, path), ) - # use key 0 for shelley witnesses and key 2 for byron witnesses - # according to the spec in shelley.cddl in cardano-ledger-specs - witnesses: dict[Any, Any] = {} - if shelley_witnesses: - witnesses[0] = shelley_witnesses - if byron_witnesses: - witnesses[2] = byron_witnesses - return witnesses +def validate_network_info(network_id: int, protocol_magic: int) -> None: + """ + We are only concerned about checking that both network_id and protocol_magic + belong to the mainnet or that both belong to a testnet. We don't need to check for + consistency between various testnets (at least for now). + """ + is_mainnet_network_id = network_ids.is_mainnet(network_id) + is_mainnet_protocol_magic = protocol_magics.is_mainnet(protocol_magic) + if is_mainnet_network_id != is_mainnet_protocol_magic: + raise wire.ProcessError("Invalid network id/protocol magic combination!") -def _cborize_shelley_witnesses( - keychain: seed.Keychain, - inputs: list[CardanoTxInputType], - certificates: list[CardanoTxCertificateType], - withdrawals: list[CardanoTxWithdrawalType], - tx_body_hash: bytes, -) -> list[tuple[bytes, bytes]]: - shelley_witnesses = [] - - # include only one witness for each path - paths = set() - for tx_input in inputs: - if is_shelley_path(tx_input.address_n): - paths.add(tuple(tx_input.address_n)) - for certificate in certificates: - if certificate.type in ( - CardanoCertificateType.STAKE_DEREGISTRATION, - CardanoCertificateType.STAKE_DELEGATION, - ): - paths.add(tuple(certificate.path)) - elif certificate.type == CardanoCertificateType.STAKE_POOL_REGISTRATION: - # ensured by validate_certificate: - assert certificate.pool_parameters is not None # validate_certificate - for pool_owner in certificate.pool_parameters.owners: - if pool_owner.staking_key_path: - paths.add(tuple(pool_owner.staking_key_path)) - for withdrawal in withdrawals: - paths.add(tuple(withdrawal.path)) - - for path in paths: - witness = _cborize_shelley_witness(keychain, tx_body_hash, list(path)) - shelley_witnesses.append(witness) - - shelley_witnesses.sort() - - return shelley_witnesses - - -def _cborize_shelley_witness( - keychain: seed.Keychain, tx_body_hash: bytes, path: list[int] -) -> tuple[bytes, bytes]: - node = keychain.derive(path) - signature = ed25519.sign_ext( - node.private_key(), node.private_key_ext(), tx_body_hash - ) - public_key = derive_public_key(keychain, path) +def _validate_stake_pool_registration_tx_structure(msg: CardanoSignTxInit) -> None: + """Ensure that there is exactly one certificate, which is stake pool registration, and no withdrawals""" + if ( + msg.certificates_count != 1 + or msg.signing_mode != CardanoTxSigningMode.POOL_REGISTRATION_AS_OWNER + or msg.withdrawals_count != 0 + ): + raise INVALID_STAKE_POOL_REGISTRATION_TX_STRUCTURE - return public_key, signature +def _validate_output( + output: CardanoTxOutput, protocol_magic: int, network_id: int +) -> None: + if output.address_parameters and output.address is not None: + raise wire.ProcessError( + "Outputs can not contain both address and address_parameters fields!" + ) -def _cborize_byron_witnesses( + if output.address_parameters: + validate_address_parameters(output.address_parameters) + elif output.address is not None: + validate_output_address(output.address, protocol_magic, network_id) + else: + raise wire.ProcessError( + "Each output must have an address field or address_parameters!" + ) + + +async def _show_output( + ctx: wire.Context, keychain: seed.Keychain, - inputs: list[CardanoTxInputType], - tx_body_hash: bytes, + output: CardanoTxOutput, + signing_mode: CardanoTxSigningMode, protocol_magic: int, -) -> list[tuple[bytes, bytes, bytes, bytes]]: - byron_witnesses = [] + network_id: int, +) -> None: + if output.address_parameters: + await _fail_or_warn_if_invalid_path( + ctx, + SCHEMA_ADDRESS, + output.address_parameters.address_n, + CHANGE_OUTPUT_PATH_NAME, + ) - # include only one witness for each path - paths = set() - for tx_input in inputs: - if is_byron_path(tx_input.address_n): - paths.add(tuple(tx_input.address_n)) + await _show_change_output_staking_warnings( + ctx, keychain, output.address_parameters, output.amount + ) - for path in paths: - node = keychain.derive(list(path)) + if _should_hide_output(output.address_parameters.address_n): + return - public_key = derive_public_key(keychain, list(path)) - signature = ed25519.sign_ext( - node.private_key(), node.private_key_ext(), tx_body_hash + address = derive_human_readable_address( + keychain, output.address_parameters, protocol_magic, network_id ) - chain_code = node.chain_code() - address_attributes = cbor.encode(get_address_attributes(protocol_magic)) - - byron_witnesses.append((public_key, signature, chain_code, address_attributes)) + else: + assert output.address is not None # _validate_output + address = output.address - byron_witnesses.sort() + if output.asset_groups_count > 0: + await show_warning_tx_output_contains_tokens(ctx) - return byron_witnesses + if signing_mode == CardanoTxSigningMode.ORDINARY_TRANSACTION: + await confirm_sending(ctx, output.amount, address) -async def _show_standard_tx( - ctx: wire.Context, keychain: seed.Keychain, msg: CardanoSignTx +def _validate_asset_group( + asset_group: CardanoAssetGroup, seen_policy_ids: set[bytes] ) -> None: - is_network_id_verifiable = _is_network_id_verifiable(msg) + if len(asset_group.policy_id) != MINTING_POLICY_ID_LENGTH: + raise INVALID_TOKEN_BUNDLE_OUTPUT + if asset_group.tokens_count == 0: + raise INVALID_TOKEN_BUNDLE_OUTPUT + if asset_group.policy_id in seen_policy_ids: + raise INVALID_TOKEN_BUNDLE_OUTPUT + - if not is_network_id_verifiable: - await show_warning_tx_network_unverifiable(ctx) +def _validate_token(token: CardanoToken, seen_asset_name_bytes: set[bytes]) -> None: + if len(token.asset_name_bytes) > MAX_ASSET_NAME_LENGTH: + raise INVALID_TOKEN_BUNDLE_OUTPUT + if token.asset_name_bytes in seen_asset_name_bytes: + raise INVALID_TOKEN_BUNDLE_OUTPUT - total_amount = await _show_outputs(ctx, keychain, msg) - for certificate in msg.certificates: +async def _show_certificate( + ctx: wire.Context, + certificate: CardanoTxCertificate, + signing_mode: CardanoTxSigningMode, +) -> None: + if signing_mode == CardanoTxSigningMode.ORDINARY_TRANSACTION: await _fail_or_warn_if_invalid_path( ctx, SCHEMA_STAKING, certificate.path, CERTIFICATE_PATH_NAME ) await confirm_certificate(ctx, certificate) + elif signing_mode == CardanoTxSigningMode.POOL_REGISTRATION_AS_OWNER: + await _show_stake_pool_registration_certificate(ctx, certificate) - for withdrawal in msg.withdrawals: - await confirm_withdrawal(ctx, withdrawal) - await show_auxiliary_data( - ctx, keychain, msg.auxiliary_data, msg.protocol_magic, msg.network_id - ) +def _validate_withdrawal( + withdrawal: CardanoTxWithdrawal, seen_withdrawals: set[tuple[int, ...]] +) -> None: + if not SCHEMA_STAKING_ANY_ACCOUNT.match(withdrawal.path): + raise INVALID_WITHDRAWAL - await confirm_transaction( - ctx=ctx, - amount=total_amount, - fee=msg.fee, - protocol_magic=msg.protocol_magic, - ttl=msg.ttl, - validity_interval_start=msg.validity_interval_start, - is_network_id_verifiable=is_network_id_verifiable, - ) + if not 0 <= withdrawal.amount < LOVELACE_MAX_SUPPLY: + raise INVALID_WITHDRAWAL + + path_tuple = tuple(withdrawal.path) + if path_tuple in seen_withdrawals: + raise wire.ProcessError("Duplicate withdrawals") + else: + seen_withdrawals.add(path_tuple) -async def _show_stake_pool_registration_tx( - ctx: wire.Context, keychain: seed.Keychain, msg: CardanoSignTx +def _get_output_address( + keychain: seed.Keychain, + protocol_magic: int, + network_id: int, + output: CardanoTxOutput, +) -> bytes: + if output.address_parameters: + return derive_address_bytes( + keychain, output.address_parameters, protocol_magic, network_id + ) + else: + assert output.address is not None # _validate_output + return get_address_bytes_unsafe(output.address) + + +def _sign_tx_hash( + keychain: seed.Keychain, tx_body_hash: bytes, path: list[int] +) -> bytes: + node = keychain.derive(path) + return ed25519.sign_ext(node.private_key(), node.private_key_ext(), tx_body_hash) + + +async def _show_stake_pool_registration_certificate( + ctx: wire.Context, stake_pool_registration_certificate: CardanoTxCertificate ) -> None: - stake_pool_registration_certificate = msg.certificates[0] pool_parameters = stake_pool_registration_certificate.pool_parameters # _validate_stake_pool_registration_tx_structure ensures that there is only one # certificate, and validate_certificate ensures that the structure is valid assert pool_parameters is not None # display the transaction (certificate) in UI - await confirm_stake_pool_parameters( - ctx, pool_parameters, msg.network_id, msg.protocol_magic - ) + await confirm_stake_pool_parameters(ctx, pool_parameters) - for owner in pool_parameters.owners: - if owner.staking_key_path: - await _fail_or_warn_if_invalid_path( - ctx, - SCHEMA_STAKING, - owner.staking_key_path, - POOL_OWNER_STAKING_PATH_NAME, - ) - - await confirm_stake_pool_owners( - ctx, keychain, pool_parameters.owners, msg.network_id - ) await confirm_stake_pool_metadata(ctx, pool_parameters.metadata) - await confirm_transaction_network_ttl( - ctx, msg.protocol_magic, msg.ttl, msg.validity_interval_start - ) - await show_auxiliary_data( - ctx, keychain, msg.auxiliary_data, msg.protocol_magic, msg.network_id - ) - await confirm_stake_pool_registration_final(ctx) -async def _show_outputs( - ctx: wire.Context, keychain: seed.Keychain, msg: CardanoSignTx -) -> int: - total_amount = 0 - for output in msg.outputs: - if output.address_parameters: - await _fail_or_warn_if_invalid_path( - ctx, - SCHEMA_ADDRESS, - output.address_parameters.address_n, - CHANGE_OUTPUT_PATH_NAME, - ) +async def _show_pool_owner( + ctx: wire.Context, keychain: seed.Keychain, owner: CardanoPoolOwner, network_id: int +) -> None: + if owner.staking_key_path: + await _fail_or_warn_if_invalid_path( + ctx, + SCHEMA_STAKING, + owner.staking_key_path, + POOL_OWNER_STAKING_PATH_NAME, + ) - await _show_change_output_staking_warnings( - ctx, keychain, output.address_parameters, output.amount - ) + await confirm_stake_pool_owner(ctx, keychain, owner, network_id) - if _should_hide_output(output.address_parameters.address_n, msg.inputs): - continue - address = derive_human_readable_address( - keychain, output.address_parameters, msg.protocol_magic, msg.network_id - ) - else: - assert output.address is not None # _validate_outputs - address = output.address +def _validate_witness( + signing_mode: CardanoTxSigningMode, + witness: CardanoTxWitnessRequest, +) -> None: + # witness path validation happens in _show_witness - total_amount += output.amount + if signing_mode == CardanoTxSigningMode.POOL_REGISTRATION_AS_OWNER: + _ensure_no_payment_witness(witness) - if len(output.token_bundle) > 0: - await show_warning_tx_output_contains_tokens(ctx) - await confirm_sending(ctx, output.amount, output.token_bundle, address) +def _ensure_no_payment_witness(witness: CardanoTxWitnessRequest) -> None: + """ + We have a separate tx signing flow for stake pool registration because it's a + transaction where the witnessable entries (i.e. inputs, withdrawals, etc.) + in the transaction are not supposed to be controlled by the HW wallet, which + means the user is vulnerable to unknowingly supplying a witness for an UTXO + or other tx entry they think is external, resulting in the co-signers + gaining control over their funds (Something SLIP-0019 is dealing with for + BTC but no similar standard is currently available for Cardano). Hence we + completely forbid witnessing inputs and other entries of the transaction + except the stake pool certificate itself and we provide a witness only to the + user's staking key in the list of pool owners. + """ + if not SCHEMA_STAKING_ANY_ACCOUNT.match(witness.path): + raise INVALID_STAKEPOOL_REGISTRATION_TX_WITNESSES - return total_amount + +async def _show_witness( + ctx: wire.Context, + witness_path: list[int], +) -> None: + await _fail_or_warn_if_invalid_path( + ctx, + SCHEMA_ADDRESS, + witness_path, + WITNESS_PATH_NAME, + ) async def _show_change_output_staking_warnings( @@ -743,31 +854,38 @@ async def _show_change_output_staking_warnings( ) -# addresses from the same account as inputs should be hidden -def _should_hide_output(output: list[int], inputs: list[CardanoTxInputType]) -> bool: - for tx_input in inputs: - inp = tx_input.address_n - if ( - len(output) != BIP_PATH_LENGTH - or output[ACCOUNT_PATH_INDEX] != inp[ACCOUNT_PATH_INDEX] - or output[ACCOUNT_PATH_INDEX] > MAX_SAFE_ACCOUNT_INDEX - or output[-2] >= 2 - or output[-1] >= MAX_SAFE_CHANGE_ADDRESS_INDEX - ): - return False +def _should_hide_output(path: list[int]) -> bool: + """Return whether the output address is from a safe path, so it could be hidden.""" + return ( + len(path) == BIP_PATH_LENGTH + and path[ACCOUNT_PATH_INDEX] <= MAX_SAFE_ACCOUNT_INDEX + and path[-2] < 2 + and path[-1] < MAX_SAFE_CHANGE_ADDRESS_INDEX + ) + + +def _should_show_tokens( + output: CardanoTxOutput, signing_mode: CardanoTxSigningMode +) -> bool: + if signing_mode != CardanoTxSigningMode.ORDINARY_TRANSACTION: + return True + + if output.address_parameters: + return not _should_hide_output(output.address_parameters.address_n) + return True -def _is_network_id_verifiable(msg: CardanoSignTx) -> bool: +def _is_network_id_verifiable(msg: CardanoSignTxInit) -> bool: """ checks whether there is at least one element that contains information about network ID, otherwise Trezor cannot guarantee that the tx is actually meant for the given network """ return ( - len(msg.outputs) != 0 - or len(msg.withdrawals) != 0 - or _has_stake_pool_registration(msg) + msg.outputs_count != 0 + or msg.withdrawals_count != 0 + or msg.signing_mode == CardanoTxSigningMode.POOL_REGISTRATION_AS_OWNER ) diff --git a/core/src/apps/common/cbor.py b/core/src/apps/common/cbor.py index 352c086e3..75495d98a 100644 --- a/core/src/apps/common/cbor.py +++ b/core/src/apps/common/cbor.py @@ -309,3 +309,11 @@ def decode(cbor: bytes, offset: int = 0) -> Value: if r.remaining_count(): raise ValueError return res + + +def create_array_header(size: int) -> bytes: + return _header(_CBOR_ARRAY, size) + + +def create_map_header(size: int) -> bytes: + return _header(_CBOR_MAP, size) diff --git a/core/src/apps/workflow_handlers.py b/core/src/apps/workflow_handlers.py index 387b452a9..3389b84fd 100644 --- a/core/src/apps/workflow_handlers.py +++ b/core/src/apps/workflow_handlers.py @@ -149,7 +149,7 @@ def find_message_handler_module(msg_type: int) -> str: return "apps.cardano.get_address" elif msg_type == MessageType.CardanoGetPublicKey: return "apps.cardano.get_public_key" - elif msg_type == MessageType.CardanoSignTx: + elif msg_type == MessageType.CardanoSignTxInit: return "apps.cardano.sign_tx" # tezos diff --git a/core/tests/test_apps.common.cbor.py b/core/tests/test_apps.common.cbor.py index 723491b24..d0d67e690 100644 --- a/core/tests/test_apps.common.cbor.py +++ b/core/tests/test_apps.common.cbor.py @@ -6,13 +6,48 @@ from apps.common.cbor import ( IndefiniteLengthArray, OrderedMap, Tagged, + create_array_header, + create_map_header, decode, encode, encode_chunked, encode_streamed, ) + class TestCardanoCbor(unittest.TestCase): + def test_create_array_header(self): + test_vectors = [ + (0, '80'), + (23, '97'), + ((2 ** 8) - 1, '98ff'), + ((2 ** 16) - 1, '99ffff'), + ((2 ** 32) - 1, '9affffffff'), + ((2 ** 64) - 1, '9bffffffffffffffff'), + ] + for val, header_hex in test_vectors: + header = unhexlify(header_hex) + self.assertEqual(create_array_header(val), header) + + with self.assertRaises(NotImplementedError): + create_array_header(2 ** 64) + + def test_create_map_header(self): + test_vectors = [ + (0, 'a0'), + (23, 'b7'), + ((2 ** 8) - 1, 'b8ff'), + ((2 ** 16) - 1, 'b9ffff'), + ((2 ** 32) - 1, 'baffffffff'), + ((2 ** 64) - 1, 'bbffffffffffffffff'), + ] + for val, header_hex in test_vectors: + header = unhexlify(header_hex) + self.assertEqual(create_map_header(val), header) + + with self.assertRaises(NotImplementedError): + create_map_header(2 ** 64) + def test_cbor_encoding(self): test_vectors = [ # unsigned integers diff --git a/python/src/trezorlib/cardano.py b/python/src/trezorlib/cardano.py index 48a3d9fff..a388d16db 100644 --- a/python/src/trezorlib/cardano.py +++ b/python/src/trezorlib/cardano.py @@ -15,11 +15,17 @@ # If not, see . from ipaddress import ip_address -from typing import List, Optional +from itertools import chain +from typing import Dict, Iterator, List, Optional, Tuple, Union from . import exceptions, messages, tools from .tools import expect +SIGNING_MODE_IDS = { + "ORDINARY_TRANSACTION": messages.CardanoTxSigningMode.ORDINARY_TRANSACTION, + "POOL_REGISTRATION_AS_OWNER": messages.CardanoTxSigningMode.POOL_REGISTRATION_AS_OWNER, +} + PROTOCOL_MAGICS = {"mainnet": 764824073, "testnet": 42} NETWORK_IDS = {"mainnet": 1, "testnet": 0} @@ -57,6 +63,28 @@ ADDRESS_TYPES = ( messages.CardanoAddressType.REWARD, ) +InputWithPath = Tuple[messages.CardanoTxInput, List[int]] +AssetGroupWithTokens = Tuple[messages.CardanoAssetGroup, List[messages.CardanoToken]] +OutputWithAssetGroups = Tuple[messages.CardanoTxOutput, List[AssetGroupWithTokens]] +OutputItem = Union[ + messages.CardanoTxOutput, messages.CardanoAssetGroup, messages.CardanoToken +] +CertificateItem = Union[ + messages.CardanoTxCertificate, + messages.CardanoPoolOwner, + messages.CardanoPoolRelayParameters, +] +PoolOwnersAndRelays = Tuple[ + List[messages.CardanoPoolOwner], List[messages.CardanoPoolRelayParameters] +] +CertificateWithPoolOwnersAndRelays = Tuple[ + messages.CardanoTxCertificate, Optional[PoolOwnersAndRelays] +] +Path = List[int] +Witness = Tuple[Path, bytes] +AuxiliaryDataSupplement = Dict[str, Union[int, bytes]] +SignTxResponse = Dict[str, Union[bytes, List[Witness], AuxiliaryDataSupplement]] + def create_address_parameters( address_type: messages.CardanoAddressType, @@ -97,18 +125,21 @@ def _create_certificate_pointer( ) -def parse_input(tx_input) -> messages.CardanoTxInputType: +def parse_input(tx_input) -> InputWithPath: if not all(k in tx_input for k in REQUIRED_FIELDS_INPUT): raise ValueError("The input is missing some fields") - return messages.CardanoTxInputType( - address_n=tools.parse_path(tx_input.get("path")), - prev_hash=bytes.fromhex(tx_input["prev_hash"]), - prev_index=tx_input["prev_index"], + path = tools.parse_path(tx_input.get("path")) + return ( + messages.CardanoTxInput( + prev_hash=bytes.fromhex(tx_input["prev_hash"]), + prev_index=tx_input["prev_index"], + ), + path, ) -def parse_output(output) -> messages.CardanoTxOutputType: +def parse_output(output) -> OutputWithAssetGroups: contains_address = "address" in output contains_address_type = "addressType" in output @@ -119,7 +150,7 @@ def parse_output(output) -> messages.CardanoTxOutputType: address = None address_parameters = None - token_bundle = None + token_bundle = [] if contains_address: address = output["address"] @@ -130,38 +161,46 @@ def parse_output(output) -> messages.CardanoTxOutputType: if "token_bundle" in output: token_bundle = _parse_token_bundle(output["token_bundle"]) - return messages.CardanoTxOutputType( - address=address, - address_parameters=address_parameters, - amount=int(output["amount"]), - token_bundle=token_bundle, + return ( + messages.CardanoTxOutput( + address=address, + address_parameters=address_parameters, + amount=int(output["amount"]), + asset_groups_count=len(token_bundle), + ), + token_bundle, ) -def _parse_token_bundle(token_bundle) -> List[messages.CardanoAssetGroupType]: +def _parse_token_bundle(token_bundle) -> List[AssetGroupWithTokens]: result = [] for token_group in token_bundle: if not all(k in token_group for k in REQUIRED_FIELDS_TOKEN_GROUP): raise ValueError(INVALID_OUTPUT_TOKEN_BUNDLE_ENTRY) + tokens = _parse_tokens(token_group["tokens"]) + result.append( - messages.CardanoAssetGroupType( - policy_id=bytes.fromhex(token_group["policy_id"]), - tokens=_parse_tokens(token_group["tokens"]), + ( + messages.CardanoAssetGroup( + policy_id=bytes.fromhex(token_group["policy_id"]), + tokens_count=len(tokens), + ), + tokens, ) ) return result -def _parse_tokens(tokens) -> List[messages.CardanoTokenType]: +def _parse_tokens(tokens) -> List[messages.CardanoToken]: result = [] for token in tokens: if not all(k in token for k in REQUIRED_FIELDS_TOKEN): raise ValueError(INVALID_OUTPUT_TOKEN_BUNDLE_ENTRY) result.append( - messages.CardanoTokenType( + messages.CardanoToken( asset_name_bytes=bytes.fromhex(token["asset_name_bytes"]), amount=int(token["amount"]), ) @@ -191,7 +230,7 @@ def _parse_address_parameters( ) -def parse_certificate(certificate) -> messages.CardanoTxCertificateType: +def parse_certificate(certificate) -> CertificateWithPoolOwnersAndRelays: CERTIFICATE_MISSING_FIELDS_ERROR = ValueError( "The certificate is missing some fields" ) @@ -205,10 +244,13 @@ def parse_certificate(certificate) -> messages.CardanoTxCertificateType: if "pool" not in certificate: raise CERTIFICATE_MISSING_FIELDS_ERROR - return messages.CardanoTxCertificateType( - type=certificate_type, - path=tools.parse_path(certificate["path"]), - pool=bytes.fromhex(certificate["pool"]), + return ( + messages.CardanoTxCertificate( + type=certificate_type, + path=tools.parse_path(certificate["path"]), + pool=bytes.fromhex(certificate["pool"]), + ), + None, ) elif certificate_type in ( messages.CardanoCertificateType.STAKE_REGISTRATION, @@ -216,9 +258,12 @@ def parse_certificate(certificate) -> messages.CardanoTxCertificateType: ): if "path" not in certificate: raise CERTIFICATE_MISSING_FIELDS_ERROR - return messages.CardanoTxCertificateType( - type=certificate_type, - path=tools.parse_path(certificate["path"]), + return ( + messages.CardanoTxCertificate( + type=certificate_type, + path=tools.parse_path(certificate["path"]), + ), + None, ) elif certificate_type == messages.CardanoCertificateType.STAKE_POOL_REGISTRATION: pool_parameters = certificate["pool_parameters"] @@ -237,45 +282,49 @@ def parse_certificate(certificate) -> messages.CardanoTxCertificateType: else: pool_metadata = None - return messages.CardanoTxCertificateType( - type=certificate_type, - pool_parameters=messages.CardanoPoolParametersType( - pool_id=bytes.fromhex(pool_parameters["pool_id"]), - vrf_key_hash=bytes.fromhex(pool_parameters["vrf_key_hash"]), - pledge=int(pool_parameters["pledge"]), - cost=int(pool_parameters["cost"]), - margin_numerator=int(pool_parameters["margin"]["numerator"]), - margin_denominator=int(pool_parameters["margin"]["denominator"]), - reward_account=pool_parameters["reward_account"], - metadata=pool_metadata, - owners=[ - _parse_pool_owner(pool_owner) - for pool_owner in pool_parameters.get("owners", []) - ], - relays=[ - _parse_pool_relay(pool_relay) - for pool_relay in pool_parameters.get("relays", []) - ] - if "relays" in pool_parameters - else [], + owners = [ + _parse_pool_owner(pool_owner) + for pool_owner in pool_parameters.get("owners", []) + ] + relays = [ + _parse_pool_relay(pool_relay) + for pool_relay in pool_parameters.get("relays", []) + ] + + return ( + messages.CardanoTxCertificate( + type=certificate_type, + pool_parameters=messages.CardanoPoolParametersType( + pool_id=bytes.fromhex(pool_parameters["pool_id"]), + vrf_key_hash=bytes.fromhex(pool_parameters["vrf_key_hash"]), + pledge=int(pool_parameters["pledge"]), + cost=int(pool_parameters["cost"]), + margin_numerator=int(pool_parameters["margin"]["numerator"]), + margin_denominator=int(pool_parameters["margin"]["denominator"]), + reward_account=pool_parameters["reward_account"], + metadata=pool_metadata, + owners_count=len(owners), + relays_count=len(relays), + ), ), + (owners, relays), ) else: raise ValueError("Unknown certificate type") -def _parse_pool_owner(pool_owner) -> messages.CardanoPoolOwnerType: +def _parse_pool_owner(pool_owner) -> messages.CardanoPoolOwner: if "staking_key_path" in pool_owner: - return messages.CardanoPoolOwnerType( + return messages.CardanoPoolOwner( staking_key_path=tools.parse_path(pool_owner["staking_key_path"]) ) - return messages.CardanoPoolOwnerType( + return messages.CardanoPoolOwner( staking_key_hash=bytes.fromhex(pool_owner["staking_key_hash"]) ) -def _parse_pool_relay(pool_relay) -> messages.CardanoPoolRelayParametersType: +def _parse_pool_relay(pool_relay) -> messages.CardanoPoolRelayParameters: pool_relay_type = int(pool_relay["type"]) if pool_relay_type == messages.CardanoPoolRelayType.SINGLE_HOST_IP: @@ -290,20 +339,20 @@ def _parse_pool_relay(pool_relay) -> messages.CardanoPoolRelayParametersType: else None ) - return messages.CardanoPoolRelayParametersType( + return messages.CardanoPoolRelayParameters( type=pool_relay_type, port=int(pool_relay["port"]), ipv4_address=ipv4_address_packed, ipv6_address=ipv6_address_packed, ) elif pool_relay_type == messages.CardanoPoolRelayType.SINGLE_HOST_NAME: - return messages.CardanoPoolRelayParametersType( + return messages.CardanoPoolRelayParameters( type=pool_relay_type, port=int(pool_relay["port"]), host_name=pool_relay["host_name"], ) elif pool_relay_type == messages.CardanoPoolRelayType.MULTIPLE_HOST_NAME: - return messages.CardanoPoolRelayParametersType( + return messages.CardanoPoolRelayParameters( type=pool_relay_type, host_name=pool_relay["host_name"], ) @@ -311,18 +360,18 @@ def _parse_pool_relay(pool_relay) -> messages.CardanoPoolRelayParametersType: raise ValueError("Unknown pool relay type") -def parse_withdrawal(withdrawal) -> messages.CardanoTxWithdrawalType: +def parse_withdrawal(withdrawal) -> messages.CardanoTxWithdrawal: if not all(k in withdrawal for k in REQUIRED_FIELDS_WITHDRAWAL): raise ValueError("Withdrawal is missing some fields") path = withdrawal["path"] - return messages.CardanoTxWithdrawalType( + return messages.CardanoTxWithdrawal( path=tools.parse_path(path), amount=int(withdrawal["amount"]), ) -def parse_auxiliary_data(auxiliary_data) -> messages.CardanoTxAuxiliaryDataType: +def parse_auxiliary_data(auxiliary_data) -> messages.CardanoTxAuxiliaryData: if auxiliary_data is None: return None @@ -331,9 +380,9 @@ def parse_auxiliary_data(auxiliary_data) -> messages.CardanoTxAuxiliaryDataType: ) # include all provided fields so we can test validation in FW - blob = None - if "blob" in auxiliary_data: - blob = bytes.fromhex(auxiliary_data["blob"]) + hash = None + if "hash" in auxiliary_data: + hash = bytes.fromhex(auxiliary_data["hash"]) catalyst_registration_parameters = None if "catalyst_registration_parameters" in auxiliary_data: @@ -356,15 +405,68 @@ def parse_auxiliary_data(auxiliary_data) -> messages.CardanoTxAuxiliaryDataType: ) ) - if blob is None and catalyst_registration_parameters is None: + if hash is None and catalyst_registration_parameters is None: raise AUXILIARY_DATA_MISSING_FIELDS_ERROR - return messages.CardanoTxAuxiliaryDataType( - blob=blob, + return messages.CardanoTxAuxiliaryData( + hash=hash, catalyst_registration_parameters=catalyst_registration_parameters, ) +def _get_witness_paths( + inputs: List[InputWithPath], + certificates: List[CertificateWithPoolOwnersAndRelays], + withdrawals: List[messages.CardanoTxWithdrawal], +) -> List[Path]: + paths = set() + for _, path in inputs: + if path: + paths.add(tuple(path)) + for certificate, pool_owners_and_relays in certificates: + if certificate.type in ( + messages.CardanoCertificateType.STAKE_DEREGISTRATION, + messages.CardanoCertificateType.STAKE_DELEGATION, + ): + paths.add(tuple(certificate.path)) + elif ( + certificate.type == messages.CardanoCertificateType.STAKE_POOL_REGISTRATION + and pool_owners_and_relays is not None + ): + owners, _ = pool_owners_and_relays + for pool_owner in owners: + if pool_owner.staking_key_path: + paths.add(tuple(pool_owner.staking_key_path)) + for withdrawal in withdrawals: + paths.add(tuple(withdrawal.path)) + + return sorted([list(path) for path in paths]) + + +def _get_input_items(inputs: List[InputWithPath]) -> Iterator[messages.CardanoTxInput]: + for input, _ in inputs: + yield input + + +def _get_output_items(outputs: List[OutputWithAssetGroups]) -> Iterator[OutputItem]: + for output, asset_groups in outputs: + yield output + for asset_group, tokens in asset_groups: + yield asset_group + yield from tokens + + +def _get_certificate_items( + certificates: List[CertificateWithPoolOwnersAndRelays], +) -> Iterator[CertificateItem]: + for certificate, pool_owners_and_relays in certificates: + yield certificate + if pool_owners_and_relays is not None: + owners, relays = pool_owners_and_relays + yield from owners + yield from relays + + # ====== Client functions ====== # @@ -391,44 +493,94 @@ def get_public_key(client, address_n: List[int]) -> messages.CardanoPublicKey: return client.call(messages.CardanoGetPublicKey(address_n=address_n)) -@expect(messages.CardanoSignedTx) def sign_tx( client, - inputs: List[messages.CardanoTxInputType], - outputs: List[messages.CardanoTxOutputType], + signing_mode: messages.CardanoTxSigningMode, + inputs: List[InputWithPath], + outputs: List[OutputWithAssetGroups], fee: int, ttl: Optional[int], validity_interval_start: Optional[int], - certificates: List[messages.CardanoTxCertificateType] = (), - withdrawals: List[messages.CardanoTxWithdrawalType] = (), + certificates: List[CertificateWithPoolOwnersAndRelays] = (), + withdrawals: List[messages.CardanoTxWithdrawal] = (), protocol_magic: int = PROTOCOL_MAGICS["mainnet"], network_id: int = NETWORK_IDS["mainnet"], - auxiliary_data: messages.CardanoTxAuxiliaryDataType = None, -) -> messages.CardanoSignedTx: + auxiliary_data: messages.CardanoTxAuxiliaryData = None, +) -> SignTxResponse: + UNEXPECTED_RESPONSE_ERROR = exceptions.TrezorException("Unexpected response") + + witness_paths = _get_witness_paths(inputs, certificates, withdrawals) + response = client.call( - messages.CardanoSignTx( - inputs=inputs, - outputs=outputs, + messages.CardanoSignTxInit( + signing_mode=signing_mode, + inputs_count=len(inputs), + outputs_count=len(outputs), fee=fee, ttl=ttl, validity_interval_start=validity_interval_start, - certificates=certificates, - withdrawals=withdrawals, + certificates_count=len(certificates), + withdrawals_count=len(withdrawals), protocol_magic=protocol_magic, network_id=network_id, - auxiliary_data=auxiliary_data, + has_auxiliary_data=auxiliary_data is not None, + witness_requests_count=len(witness_paths), ) ) + if not isinstance(response, messages.CardanoTxItemAck): + raise UNEXPECTED_RESPONSE_ERROR + + for tx_item in chain( + _get_input_items(inputs), + _get_output_items(outputs), + _get_certificate_items(certificates), + withdrawals, + ): + response = client.call(tx_item) + if not isinstance(response, messages.CardanoTxItemAck): + raise UNEXPECTED_RESPONSE_ERROR + + sign_tx_response = {} - result = bytearray() - while isinstance(response, messages.CardanoSignedTxChunk): - result.extend(response.signed_tx_chunk) - response = client.call(messages.CardanoSignedTxChunkAck()) + if auxiliary_data is not None: + auxiliary_data_supplement = client.call(auxiliary_data) + if not isinstance( + auxiliary_data_supplement, messages.CardanoTxAuxiliaryDataSupplement + ): + raise UNEXPECTED_RESPONSE_ERROR + if ( + auxiliary_data_supplement.type + != messages.CardanoTxAuxiliaryDataSupplementType.NONE + ): + sign_tx_response[ + "auxiliary_data_supplement" + ] = auxiliary_data_supplement.__dict__ + + response = client.call(messages.CardanoTxHostAck()) + if not isinstance(response, messages.CardanoTxItemAck): + raise UNEXPECTED_RESPONSE_ERROR + + sign_tx_response["witnesses"] = [] + for path in witness_paths: + response = client.call(messages.CardanoTxWitnessRequest(path=path)) + if not isinstance(response, messages.CardanoTxWitnessResponse): + raise UNEXPECTED_RESPONSE_ERROR + sign_tx_response["witnesses"].append( + { + "type": response.type, + "pub_key": response.pub_key, + "signature": response.signature, + "chain_code": response.chain_code, + } + ) - if not isinstance(response, messages.CardanoSignedTx): - raise exceptions.TrezorException("Unexpected response") + response = client.call(messages.CardanoTxHostAck()) + if not isinstance(response, messages.CardanoTxBodyHash): + raise UNEXPECTED_RESPONSE_ERROR + sign_tx_response["tx_hash"] = response.tx_hash - if response.serialized_tx is not None: - result.extend(response.serialized_tx) + response = client.call(messages.CardanoTxHostAck()) + if not isinstance(response, messages.CardanoSignTxFinished): + raise UNEXPECTED_RESPONSE_ERROR - return messages.CardanoSignedTx(tx_hash=response.tx_hash, serialized_tx=result) + return sign_tx_response diff --git a/python/src/trezorlib/cli/cardano.py b/python/src/trezorlib/cli/cardano.py index a3832abec..26b0468e2 100644 --- a/python/src/trezorlib/cli/cardano.py +++ b/python/src/trezorlib/cli/cardano.py @@ -40,13 +40,19 @@ def cli(): @cli.command() @click.argument("file", type=click.File("r")) @click.option("-f", "--file", "_ignore", is_flag=True, hidden=True, expose_value=False) +@click.option( + "-s", + "--signing-mode", + required=True, + type=ChoiceType({m.name: m for m in messages.CardanoTxSigningMode}), +) @click.option( "-p", "--protocol-magic", type=int, default=cardano.PROTOCOL_MAGICS["mainnet"] ) @click.option("-N", "--network-id", type=int, default=cardano.NETWORK_IDS["mainnet"]) @click.option("-t", "--testnet", is_flag=True) @with_client -def sign_tx(client, file, protocol_magic, network_id, testnet): +def sign_tx(client, file, signing_mode, protocol_magic, network_id, testnet): """Sign Cardano transaction.""" transaction = json.load(file) @@ -69,8 +75,9 @@ def sign_tx(client, file, protocol_magic, network_id, testnet): ] auxiliary_data = cardano.parse_auxiliary_data(transaction.get("auxiliary_data")) - signed_transaction = cardano.sign_tx( + sign_tx_response = cardano.sign_tx( client, + signing_mode, inputs, outputs, fee, @@ -83,10 +90,28 @@ def sign_tx(client, file, protocol_magic, network_id, testnet): auxiliary_data, ) - return { - "tx_hash": signed_transaction.tx_hash.hex(), - "serialized_tx": signed_transaction.serialized_tx.hex(), - } + sign_tx_response["tx_hash"] = sign_tx_response["tx_hash"].hex() + sign_tx_response["witnesses"] = [ + { + "type": witness["type"], + "pub_key": witness["pub_key"].hex(), + "signature": witness["signature"].hex(), + "chain_code": witness["chain_code"].hex() + if witness["chain_code"] is not None + else None, + } + for witness in sign_tx_response["witnesses"] + ] + auxiliary_data_supplement = sign_tx_response.get("auxiliary_data_supplement") + if auxiliary_data_supplement: + auxiliary_data_supplement["auxiliary_data_hash"] = auxiliary_data_supplement[ + "auxiliary_data_hash" + ].hex() + catalyst_signature = auxiliary_data_supplement.get("catalyst_signature") + if catalyst_signature: + auxiliary_data_supplement["catalyst_signature"] = catalyst_signature.hex() + sign_tx_response["auxiliary_data_supplement"] = auxiliary_data_supplement + return sign_tx_response @cli.command()