You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
trezor-firmware/core/src/apps/cardano/sign_tx/signer.py

927 lines
34 KiB

from micropython import const
from typing import TYPE_CHECKING
from trezor import wire
from trezor.crypto import hashlib
from trezor.crypto.curve import ed25519
from trezor.enums import (
CardanoAddressType,
CardanoCertificateType,
CardanoTxWitnessType,
)
from trezor.messages import (
CardanoAddressParametersType,
CardanoAssetGroup,
CardanoPoolOwner,
CardanoPoolRelayParameters,
CardanoSignTxInit,
CardanoToken,
CardanoTxAuxiliaryData,
CardanoTxBodyHash,
CardanoTxCertificate,
CardanoTxCollateralInput,
CardanoTxHostAck,
CardanoTxInput,
CardanoTxItemAck,
CardanoTxMint,
CardanoTxOutput,
CardanoTxRequiredSigner,
CardanoTxWithdrawal,
CardanoTxWitnessRequest,
CardanoTxWitnessResponse,
)
from apps.common import cbor, safety_checks
from .. import seed
from ..address import (
ADDRESS_TYPES_PAYMENT_SCRIPT,
derive_address_bytes,
derive_human_readable_address,
get_address_bytes_unsafe,
get_address_type,
validate_output_address,
validate_output_address_parameters,
)
from ..auxiliary_data import (
get_auxiliary_data_hash_and_supplement,
show_auxiliary_data,
validate_auxiliary_data,
)
from ..certificates import (
assert_certificate_cond,
cborize_certificate,
cborize_initial_pool_registration_certificate_fields,
cborize_pool_metadata,
cborize_pool_owner,
cborize_pool_relay,
validate_certificate,
validate_pool_owner,
validate_pool_relay,
)
from ..helpers import (
ADDRESS_KEY_HASH_SIZE,
INPUT_PREV_HASH_SIZE,
INVALID_COLLATERAL_INPUT,
INVALID_INPUT,
INVALID_OUTPUT,
INVALID_OUTPUT_DATUM_HASH,
INVALID_REQUIRED_SIGNER,
INVALID_SCRIPT_DATA_HASH,
INVALID_TOKEN_BUNDLE_MINT,
INVALID_TOKEN_BUNDLE_OUTPUT,
INVALID_TX_SIGNING_REQUEST,
INVALID_WITHDRAWAL,
LOVELACE_MAX_SUPPLY,
OUTPUT_DATUM_HASH_SIZE,
SCRIPT_DATA_HASH_SIZE,
)
from ..helpers.account_path_check import AccountPathChecker
from ..helpers.credential import Credential, should_show_address_credentials
from ..helpers.hash_builder_collection import HashBuilderDict, HashBuilderList
from ..helpers.paths import (
CERTIFICATE_PATH_NAME,
CHANGE_OUTPUT_PATH_NAME,
CHANGE_OUTPUT_STAKING_PATH_NAME,
POOL_OWNER_STAKING_PATH_NAME,
SCHEMA_STAKING,
)
from ..helpers.utils import (
derive_public_key,
get_public_key_hash,
validate_network_info,
validate_stake_credential,
)
from ..layout import (
confirm_certificate,
confirm_collateral_input,
confirm_required_signer,
confirm_script_data_hash,
confirm_sending,
confirm_sending_token,
confirm_stake_pool_metadata,
confirm_stake_pool_owner,
confirm_stake_pool_parameters,
confirm_token_minting,
confirm_transaction,
confirm_withdrawal,
confirm_witness_request,
show_change_output_credentials,
show_warning_path,
show_warning_tx_contains_mint,
show_warning_tx_network_unverifiable,
show_warning_tx_output_contains_datum_hash,
show_warning_tx_output_contains_tokens,
show_warning_tx_output_no_datum_hash,
)
from ..seed import is_byron_path, is_minting_path, is_multisig_path, is_shelley_path
if TYPE_CHECKING:
from typing import Any
from apps.common.paths import PathSchema
CardanoTxResponseType = CardanoTxItemAck | CardanoTxWitnessResponse
MINTING_POLICY_ID_LENGTH = 28
MAX_ASSET_NAME_LENGTH = 32
TX_BODY_KEY_INPUTS = const(0)
TX_BODY_KEY_OUTPUTS = const(1)
TX_BODY_KEY_FEE = const(2)
TX_BODY_KEY_TTL = const(3)
TX_BODY_KEY_CERTIFICATES = const(4)
TX_BODY_KEY_WITHDRAWALS = const(5)
TX_BODY_KEY_AUXILIARY_DATA = const(7)
TX_BODY_KEY_VALIDITY_INTERVAL_START = const(8)
TX_BODY_KEY_MINT = const(9)
TX_BODY_KEY_SCRIPT_DATA_HASH = const(11)
TX_BODY_KEY_COLLATERAL_INPUTS = const(13)
TX_BODY_KEY_REQUIRED_SIGNERS = const(14)
TX_BODY_KEY_NETWORK_ID = const(15)
POOL_REGISTRATION_CERTIFICATE_ITEMS_COUNT = 10
class Signer:
"""
This class encapsulates the entire tx signing process. By default, most tx items are
allowed and shown to the user. For each signing mode, there is a subclass that
overrides some methods, usually to add more validation rules and show/hide some
items. Each tx item is processed in a _process_xyz() method which handles validation,
user confirmation and serialization of the tx item.
"""
def __init__(
self, ctx: wire.Context, msg: CardanoSignTxInit, keychain: seed.Keychain
) -> None:
self.ctx = ctx
self.msg = msg
self.keychain = keychain
self.account_path_checker = AccountPathChecker()
# Inputs, outputs and fee are mandatory, count the number of optional fields present.
tx_body_map_item_count = 3 + sum(
(
msg.ttl is not None,
msg.certificates_count > 0,
msg.withdrawals_count > 0,
msg.has_auxiliary_data,
msg.validity_interval_start is not None,
msg.minting_asset_groups_count > 0,
msg.include_network_id,
msg.script_data_hash is not None,
msg.collateral_inputs_count > 0,
msg.required_signers_count > 0,
)
)
self.tx_dict: HashBuilderDict[int, Any] = HashBuilderDict(
tx_body_map_item_count, INVALID_TX_SIGNING_REQUEST
)
async def sign(self) -> None:
hash_fn = hashlib.blake2b(outlen=32)
self.tx_dict.start(hash_fn)
with self.tx_dict:
await self._processs_tx_signing_request()
tx_hash = hash_fn.digest()
await self._confirm_transaction(tx_hash)
response_after_witness_requests = await self._process_witness_requests(tx_hash)
await self.ctx.call(response_after_witness_requests, CardanoTxHostAck)
await self.ctx.call(CardanoTxBodyHash(tx_hash=tx_hash), CardanoTxHostAck)
# signing request
async def _processs_tx_signing_request(self) -> None:
self._validate_tx_signing_request()
await self._show_tx_signing_request()
inputs_list: HashBuilderList[tuple[bytes, int]] = HashBuilderList(
self.msg.inputs_count
)
with self.tx_dict.add(TX_BODY_KEY_INPUTS, inputs_list):
await self._process_inputs(inputs_list)
outputs_list: HashBuilderList = HashBuilderList(self.msg.outputs_count)
with self.tx_dict.add(TX_BODY_KEY_OUTPUTS, outputs_list):
await self._process_outputs(outputs_list)
self.tx_dict.add(TX_BODY_KEY_FEE, self.msg.fee)
if self.msg.ttl is not None:
self.tx_dict.add(TX_BODY_KEY_TTL, self.msg.ttl)
if self.msg.certificates_count > 0:
certificates_list: HashBuilderList = HashBuilderList(
self.msg.certificates_count
)
with self.tx_dict.add(TX_BODY_KEY_CERTIFICATES, certificates_list):
await self._process_certificates(certificates_list)
if self.msg.withdrawals_count > 0:
withdrawals_dict: HashBuilderDict[bytes, int] = HashBuilderDict(
self.msg.withdrawals_count, INVALID_WITHDRAWAL
)
with self.tx_dict.add(TX_BODY_KEY_WITHDRAWALS, withdrawals_dict):
await self._process_withdrawals(withdrawals_dict)
if self.msg.has_auxiliary_data:
await self._process_auxiliary_data()
if self.msg.validity_interval_start is not None:
self.tx_dict.add(
TX_BODY_KEY_VALIDITY_INTERVAL_START, self.msg.validity_interval_start
)
if self.msg.minting_asset_groups_count > 0:
minting_dict: HashBuilderDict[bytes, HashBuilderDict] = HashBuilderDict(
self.msg.minting_asset_groups_count, INVALID_TOKEN_BUNDLE_MINT
)
with self.tx_dict.add(TX_BODY_KEY_MINT, minting_dict):
await self._process_minting(minting_dict)
if self.msg.script_data_hash is not None:
await self._process_script_data_hash()
if self.msg.collateral_inputs_count > 0:
collateral_inputs_list: HashBuilderList[
tuple[bytes, int]
] = HashBuilderList(self.msg.collateral_inputs_count)
with self.tx_dict.add(
TX_BODY_KEY_COLLATERAL_INPUTS, collateral_inputs_list
):
await self._process_collateral_inputs(collateral_inputs_list)
if self.msg.required_signers_count > 0:
required_signers_list: HashBuilderList[bytes] = HashBuilderList(
self.msg.required_signers_count
)
with self.tx_dict.add(TX_BODY_KEY_REQUIRED_SIGNERS, required_signers_list):
await self._process_required_signers(required_signers_list)
if self.msg.include_network_id:
self.tx_dict.add(TX_BODY_KEY_NETWORK_ID, self.msg.network_id)
def _validate_tx_signing_request(self) -> None:
if self.msg.fee > LOVELACE_MAX_SUPPLY:
raise wire.ProcessError("Fee is out of range!")
validate_network_info(self.msg.network_id, self.msg.protocol_magic)
async def _show_tx_signing_request(self) -> None:
if not self._is_network_id_verifiable():
await show_warning_tx_network_unverifiable(self.ctx)
async def _confirm_tx(self, tx_hash: bytes) -> None:
# Final signing confirmation is handled separately in each signing mode.
raise NotImplementedError
# inputs
async def _process_inputs(
self, inputs_list: HashBuilderList[tuple[bytes, int]]
) -> None:
for _ in range(self.msg.inputs_count):
input: CardanoTxInput = await self.ctx.call(
CardanoTxItemAck(), CardanoTxInput
)
self._validate_input(input)
await self._show_input(input)
inputs_list.append((input.prev_hash, input.prev_index))
def _validate_input(self, input: CardanoTxInput) -> None:
if len(input.prev_hash) != INPUT_PREV_HASH_SIZE:
raise INVALID_INPUT
async def _show_input(self, input: CardanoTxInput) -> None:
# We never show the inputs, except for Plutus txs.
pass
# outputs
async def _process_outputs(self, outputs_list: HashBuilderList) -> None:
total_amount = 0
for _ in range(self.msg.outputs_count):
output: CardanoTxOutput = await self.ctx.call(
CardanoTxItemAck(), CardanoTxOutput
)
self._validate_output(output)
await self._show_output(output)
output_address = self._get_output_address(output)
has_datum_hash = output.datum_hash is not None
output_list: HashBuilderList = HashBuilderList(2 + int(has_datum_hash))
with outputs_list.append(output_list):
output_list.append(output_address)
if output.asset_groups_count == 0:
# output structure is: [address, amount, datum_hash?]
output_list.append(output.amount)
else:
# output structure is: [address, [amount, asset_groups], datum_hash?]
output_value_list: HashBuilderList = HashBuilderList(2)
with output_list.append(output_value_list):
output_value_list.append(output.amount)
asset_groups_dict: HashBuilderDict[
bytes, HashBuilderDict[bytes, int]
] = HashBuilderDict(
output.asset_groups_count, INVALID_TOKEN_BUNDLE_OUTPUT
)
with output_value_list.append(asset_groups_dict):
await self._process_asset_groups(
asset_groups_dict,
output.asset_groups_count,
self._should_show_output(output),
)
if has_datum_hash:
output_list.append(output.datum_hash)
total_amount += output.amount
if total_amount > LOVELACE_MAX_SUPPLY:
raise wire.ProcessError("Total transaction amount is out of range!")
def _validate_output(self, output: CardanoTxOutput) -> None:
if output.address_parameters is not None and output.address is not None:
raise INVALID_OUTPUT
if output.address_parameters is not None:
validate_output_address_parameters(output.address_parameters)
self._fail_if_strict_and_unusual(output.address_parameters)
elif output.address is not None:
validate_output_address(
output.address, self.msg.protocol_magic, self.msg.network_id
)
else:
raise INVALID_OUTPUT
if output.datum_hash is not None:
if len(output.datum_hash) != OUTPUT_DATUM_HASH_SIZE:
raise INVALID_OUTPUT_DATUM_HASH
address_type = self._get_output_address_type(output)
if address_type not in ADDRESS_TYPES_PAYMENT_SCRIPT:
raise INVALID_OUTPUT
self.account_path_checker.add_output(output)
async def _show_output(self, output: CardanoTxOutput) -> None:
if not self._should_show_output(output):
return
if output.datum_hash is not None:
await show_warning_tx_output_contains_datum_hash(
self.ctx, output.datum_hash
)
address_type = self._get_output_address_type(output)
if output.datum_hash is None and address_type in ADDRESS_TYPES_PAYMENT_SCRIPT:
await show_warning_tx_output_no_datum_hash(self.ctx)
if output.asset_groups_count > 0:
await show_warning_tx_output_contains_tokens(self.ctx)
if output.address_parameters is not None:
address = derive_human_readable_address(
self.keychain,
output.address_parameters,
self.msg.protocol_magic,
self.msg.network_id,
)
await self._show_output_credentials(output.address_parameters)
else:
assert output.address is not None # _validate_output
address = output.address
await confirm_sending(
self.ctx,
output.amount,
address,
self._is_change_output(output),
self.msg.network_id,
)
async def _show_output_credentials(
self, address_parameters: CardanoAddressParametersType
) -> None:
await show_change_output_credentials(
self.ctx,
Credential.payment_credential(address_parameters),
Credential.stake_credential(address_parameters),
)
def _should_show_output(self, output: CardanoTxOutput) -> bool:
"""
Determines whether the output should be shown. Extracted from _show_output because
of readability and because the same decision is made when displaying output tokens.
"""
if output.datum_hash is not None:
# The `return False` case below should not be reachable when datum hash is
# present, but let's make it explicit.
return True
address_type = self._get_output_address_type(output)
if output.datum_hash is None and address_type in ADDRESS_TYPES_PAYMENT_SCRIPT:
# Plutus script address without a datum hash is unspendable, we must show a warning.
return True
if output.address_parameters is not None: # change output
if not should_show_address_credentials(output.address_parameters):
# We don't need to display simple address outputs.
return False
return True
def _is_change_output(self, output: CardanoTxOutput) -> bool:
"""Used only to determine what message to show to the user when confirming sending."""
return output.address_parameters is not None
# asset groups
async def _process_asset_groups(
self,
asset_groups_dict: HashBuilderDict[bytes, HashBuilderDict[bytes, int]],
asset_groups_count: int,
should_show_tokens: bool,
) -> None:
for _ in range(asset_groups_count):
asset_group: CardanoAssetGroup = await self.ctx.call(
CardanoTxItemAck(), CardanoAssetGroup
)
self._validate_asset_group(asset_group)
tokens: HashBuilderDict[bytes, int] = HashBuilderDict(
asset_group.tokens_count, INVALID_TOKEN_BUNDLE_OUTPUT
)
with asset_groups_dict.add(asset_group.policy_id, tokens):
await self._process_tokens(
tokens,
asset_group.policy_id,
asset_group.tokens_count,
should_show_tokens,
)
def _validate_asset_group(
self, asset_group: CardanoAssetGroup, is_mint: bool = False
) -> None:
INVALID_TOKEN_BUNDLE = (
INVALID_TOKEN_BUNDLE_MINT if is_mint else INVALID_TOKEN_BUNDLE_OUTPUT
)
if len(asset_group.policy_id) != MINTING_POLICY_ID_LENGTH:
raise INVALID_TOKEN_BUNDLE
if asset_group.tokens_count == 0:
raise INVALID_TOKEN_BUNDLE
# tokens
async def _process_tokens(
self,
tokens_dict: HashBuilderDict[bytes, int],
policy_id: bytes,
tokens_count: int,
should_show_tokens: bool,
) -> None:
for _ in range(tokens_count):
token: CardanoToken = await self.ctx.call(CardanoTxItemAck(), CardanoToken)
self._validate_token(token)
if should_show_tokens:
await confirm_sending_token(self.ctx, policy_id, token)
assert token.amount is not None # _validate_token
tokens_dict.add(token.asset_name_bytes, token.amount)
def _validate_token(self, token: CardanoToken, is_mint: bool = False) -> None:
INVALID_TOKEN_BUNDLE = (
INVALID_TOKEN_BUNDLE_MINT if is_mint else INVALID_TOKEN_BUNDLE_OUTPUT
)
if is_mint:
if token.mint_amount is None or token.amount is not None:
raise INVALID_TOKEN_BUNDLE
else:
if token.amount is None or token.mint_amount is not None:
raise INVALID_TOKEN_BUNDLE
if len(token.asset_name_bytes) > MAX_ASSET_NAME_LENGTH:
raise INVALID_TOKEN_BUNDLE
# certificates
async def _process_certificates(self, certificates_list: HashBuilderList) -> None:
for _ in range(self.msg.certificates_count):
certificate: CardanoTxCertificate = await self.ctx.call(
CardanoTxItemAck(), CardanoTxCertificate
)
self._validate_certificate(certificate)
await self._show_certificate(certificate)
if certificate.type == CardanoCertificateType.STAKE_POOL_REGISTRATION:
pool_parameters = certificate.pool_parameters
assert pool_parameters is not None # _validate_certificate
pool_items_list: HashBuilderList = HashBuilderList(
POOL_REGISTRATION_CERTIFICATE_ITEMS_COUNT
)
with certificates_list.append(pool_items_list):
for item in cborize_initial_pool_registration_certificate_fields(
certificate
):
pool_items_list.append(item)
pool_owners_list: HashBuilderList[bytes] = HashBuilderList(
pool_parameters.owners_count
)
with pool_items_list.append(pool_owners_list):
await self._process_pool_owners(
pool_owners_list, pool_parameters.owners_count
)
relays_list: HashBuilderList[cbor.CborSequence] = HashBuilderList(
pool_parameters.relays_count
)
with pool_items_list.append(relays_list):
await self._process_pool_relays(
relays_list, pool_parameters.relays_count
)
pool_items_list.append(
cborize_pool_metadata(pool_parameters.metadata)
)
else:
certificates_list.append(
cborize_certificate(self.keychain, certificate)
)
def _validate_certificate(self, certificate: CardanoTxCertificate) -> None:
validate_certificate(
certificate,
self.msg.protocol_magic,
self.msg.network_id,
self.account_path_checker,
)
async def _show_certificate(self, certificate: CardanoTxCertificate) -> None:
if certificate.path:
await self._fail_or_warn_if_invalid_path(
SCHEMA_STAKING, certificate.path, CERTIFICATE_PATH_NAME
)
if certificate.type == CardanoCertificateType.STAKE_POOL_REGISTRATION:
assert certificate.pool_parameters is not None
await confirm_stake_pool_parameters(
self.ctx, certificate.pool_parameters, self.msg.network_id
)
await confirm_stake_pool_metadata(
self.ctx, certificate.pool_parameters.metadata
)
else:
await confirm_certificate(self.ctx, certificate)
# pool owners
async def _process_pool_owners(
self, pool_owners_list: HashBuilderList[bytes], owners_count: int
) -> None:
owners_as_path_count = 0
for _ in range(owners_count):
owner: CardanoPoolOwner = await self.ctx.call(
CardanoTxItemAck(), CardanoPoolOwner
)
validate_pool_owner(owner, self.account_path_checker)
await self._show_pool_owner(owner)
pool_owners_list.append(cborize_pool_owner(self.keychain, owner))
if owner.staking_key_path:
owners_as_path_count += 1
assert_certificate_cond(owners_as_path_count == 1)
async def _show_pool_owner(self, owner: CardanoPoolOwner) -> None:
if owner.staking_key_path:
await self._fail_or_warn_if_invalid_path(
SCHEMA_STAKING, owner.staking_key_path, POOL_OWNER_STAKING_PATH_NAME
)
await confirm_stake_pool_owner(
self.ctx, self.keychain, owner, self.msg.protocol_magic, self.msg.network_id
)
# pool relays
async def _process_pool_relays(
self,
relays_list: HashBuilderList[cbor.CborSequence],
relays_count: int,
) -> None:
for _ in range(relays_count):
relay: CardanoPoolRelayParameters = await self.ctx.call(
CardanoTxItemAck(), CardanoPoolRelayParameters
)
validate_pool_relay(relay)
relays_list.append(cborize_pool_relay(relay))
# withdrawals
async def _process_withdrawals(
self, withdrawals_dict: HashBuilderDict[bytes, int]
) -> None:
for _ in range(self.msg.withdrawals_count):
withdrawal: CardanoTxWithdrawal = await self.ctx.call(
CardanoTxItemAck(), CardanoTxWithdrawal
)
self._validate_withdrawal(withdrawal)
reward_address_bytes = self._derive_withdrawal_reward_address_bytes(
withdrawal
)
await confirm_withdrawal(
self.ctx, withdrawal, reward_address_bytes, self.msg.network_id
)
withdrawals_dict.add(reward_address_bytes, withdrawal.amount)
def _validate_withdrawal(self, withdrawal: CardanoTxWithdrawal) -> None:
validate_stake_credential(
withdrawal.path,
withdrawal.script_hash,
withdrawal.key_hash,
INVALID_WITHDRAWAL,
)
if not 0 <= withdrawal.amount < LOVELACE_MAX_SUPPLY:
raise INVALID_WITHDRAWAL
self.account_path_checker.add_withdrawal(withdrawal)
# auxiliary data
async def _process_auxiliary_data(self) -> None:
auxiliary_data: CardanoTxAuxiliaryData = await self.ctx.call(
CardanoTxItemAck(), CardanoTxAuxiliaryData
)
validate_auxiliary_data(auxiliary_data)
(
auxiliary_data_hash,
auxiliary_data_supplement,
) = get_auxiliary_data_hash_and_supplement(
self.keychain, auxiliary_data, self.msg.protocol_magic, self.msg.network_id
)
await show_auxiliary_data(
self.ctx,
self.keychain,
auxiliary_data_hash,
auxiliary_data.catalyst_registration_parameters,
self.msg.protocol_magic,
self.msg.network_id,
)
self.tx_dict.add(TX_BODY_KEY_AUXILIARY_DATA, auxiliary_data_hash)
await self.ctx.call(auxiliary_data_supplement, CardanoTxHostAck)
# minting
async def _process_minting(
self, minting_dict: HashBuilderDict[bytes, HashBuilderDict]
) -> None:
token_minting: CardanoTxMint = await self.ctx.call(
CardanoTxItemAck(), CardanoTxMint
)
await show_warning_tx_contains_mint(self.ctx)
for _ in range(token_minting.asset_groups_count):
asset_group: CardanoAssetGroup = await self.ctx.call(
CardanoTxItemAck(), CardanoAssetGroup
)
self._validate_asset_group(asset_group, is_mint=True)
tokens: HashBuilderDict[bytes, int] = HashBuilderDict(
asset_group.tokens_count, INVALID_TOKEN_BUNDLE_MINT
)
with minting_dict.add(asset_group.policy_id, tokens):
await self._process_minting_tokens(
tokens,
asset_group.policy_id,
asset_group.tokens_count,
)
# minting tokens
async def _process_minting_tokens(
self,
tokens: HashBuilderDict[bytes, int],
policy_id: bytes,
tokens_count: int,
) -> None:
for _ in range(tokens_count):
token: CardanoToken = await self.ctx.call(CardanoTxItemAck(), CardanoToken)
self._validate_token(token, is_mint=True)
await confirm_token_minting(self.ctx, policy_id, token)
assert token.mint_amount is not None # _validate_token
tokens.add(token.asset_name_bytes, token.mint_amount)
# script data hash
async def _process_script_data_hash(self) -> None:
assert self.msg.script_data_hash is not None
self._validate_script_data_hash()
await confirm_script_data_hash(self.ctx, self.msg.script_data_hash)
self.tx_dict.add(TX_BODY_KEY_SCRIPT_DATA_HASH, self.msg.script_data_hash)
def _validate_script_data_hash(self) -> None:
assert self.msg.script_data_hash is not None
if len(self.msg.script_data_hash) != SCRIPT_DATA_HASH_SIZE:
raise INVALID_SCRIPT_DATA_HASH
# collateral inputs
async def _process_collateral_inputs(
self, collateral_inputs_list: HashBuilderList[tuple[bytes, int]]
) -> None:
for _ in range(self.msg.collateral_inputs_count):
collateral_input: CardanoTxCollateralInput = await self.ctx.call(
CardanoTxItemAck(), CardanoTxCollateralInput
)
self._validate_collateral_input(collateral_input)
await confirm_collateral_input(self.ctx, collateral_input)
collateral_inputs_list.append(
(collateral_input.prev_hash, collateral_input.prev_index)
)
def _validate_collateral_input(
self, collateral_input: CardanoTxCollateralInput
) -> None:
if len(collateral_input.prev_hash) != INPUT_PREV_HASH_SIZE:
raise INVALID_COLLATERAL_INPUT
# required signers
async def _process_required_signers(
self, required_signers_list: HashBuilderList[bytes]
) -> None:
for _ in range(self.msg.required_signers_count):
required_signer: CardanoTxRequiredSigner = await self.ctx.call(
CardanoTxItemAck(), CardanoTxRequiredSigner
)
self._validate_required_signer(required_signer)
await confirm_required_signer(self.ctx, required_signer)
key_hash = required_signer.key_hash or get_public_key_hash(
self.keychain, required_signer.key_path
)
required_signers_list.append(key_hash)
def _validate_required_signer(
self, required_signer: CardanoTxRequiredSigner
) -> None:
if required_signer.key_hash and required_signer.key_path:
raise INVALID_REQUIRED_SIGNER
if required_signer.key_hash:
if len(required_signer.key_hash) != ADDRESS_KEY_HASH_SIZE:
raise INVALID_REQUIRED_SIGNER
elif required_signer.key_path:
if not (
is_shelley_path(required_signer.key_path)
or is_multisig_path(required_signer.key_path)
or is_minting_path(required_signer.key_path)
):
raise INVALID_REQUIRED_SIGNER
else:
raise INVALID_REQUIRED_SIGNER
# witness requests
async def _process_witness_requests(self, tx_hash: bytes) -> CardanoTxResponseType:
response: CardanoTxResponseType = CardanoTxItemAck()
for _ in range(self.msg.witness_requests_count):
witness_request = await self.ctx.call(response, CardanoTxWitnessRequest)
self._validate_witness_request(witness_request)
path = witness_request.path
await self._show_witness_request(path)
if is_byron_path(path):
response = self._get_byron_witness(path, tx_hash)
else:
response = self._get_shelley_witness(path, tx_hash)
return response
def _validate_witness_request(
self, witness_request: CardanoTxWitnessRequest
) -> None:
self.account_path_checker.add_witness_request(witness_request)
async def _show_witness_request(
self,
witness_path: list[int],
) -> None:
await confirm_witness_request(self.ctx, witness_path)
# helpers
def _is_network_id_verifiable(self) -> bool:
"""
Checks whether there is at least one element that contains information about
network ID, otherwise Trezor cannot guarantee that the tx is actually meant for
the given network.
Note: Shelley addresses contain network id. The intended network of Byron
addresses can be determined based on whether they contain the protocol magic.
These checks are performed during address validation.
"""
return (
self.msg.include_network_id
or self.msg.outputs_count != 0
or self.msg.withdrawals_count != 0
)
def _get_output_address(self, output: CardanoTxOutput) -> bytes:
if output.address_parameters:
return derive_address_bytes(
self.keychain,
output.address_parameters,
self.msg.protocol_magic,
self.msg.network_id,
)
else:
assert output.address is not None # _validate_output
return get_address_bytes_unsafe(output.address)
def _get_output_address_type(self, output: CardanoTxOutput) -> CardanoAddressType:
if output.address_parameters:
return output.address_parameters.address_type
assert output.address is not None # _validate_output
return get_address_type(get_address_bytes_unsafe(output.address))
def _derive_withdrawal_reward_address_bytes(
self, withdrawal: CardanoTxWithdrawal
) -> bytes:
reward_address_type = (
CardanoAddressType.REWARD
if withdrawal.path or withdrawal.key_hash
else CardanoAddressType.REWARD_SCRIPT
)
return derive_address_bytes(
self.keychain,
CardanoAddressParametersType(
address_type=reward_address_type,
address_n_staking=withdrawal.path,
staking_key_hash=withdrawal.key_hash,
script_staking_hash=withdrawal.script_hash,
),
self.msg.protocol_magic,
self.msg.network_id,
)
def _get_byron_witness(
self, path: list[int], tx_hash: bytes
) -> CardanoTxWitnessResponse:
node = self.keychain.derive(path)
return CardanoTxWitnessResponse(
type=CardanoTxWitnessType.BYRON_WITNESS,
pub_key=derive_public_key(self.keychain, path),
signature=self._sign_tx_hash(tx_hash, path),
chain_code=node.chain_code(),
)
def _get_shelley_witness(
self, path: list[int], tx_hash: bytes
) -> CardanoTxWitnessResponse:
return CardanoTxWitnessResponse(
type=CardanoTxWitnessType.SHELLEY_WITNESS,
pub_key=derive_public_key(self.keychain, path),
signature=self._sign_tx_hash(tx_hash, path),
)
def _sign_tx_hash(self, tx_body_hash: bytes, path: list[int]) -> bytes:
node = self.keychain.derive(path)
return ed25519.sign_ext(
node.private_key(), node.private_key_ext(), tx_body_hash
)
async def _fail_or_warn_if_invalid_path(
self, schema: PathSchema, path: list[int], path_name: str
) -> None:
if not schema.match(path):
await self._fail_or_warn_path(path, path_name)
async def _fail_or_warn_path(self, path: list[int], path_name: str) -> None:
if safety_checks.is_strict():
raise wire.DataError(f"Invalid {path_name.lower()}")
else:
await show_warning_path(self.ctx, path, path_name)
def _fail_if_strict_and_unusual(
self, address_parameters: CardanoAddressParametersType
) -> None:
if not safety_checks.is_strict():
return
if Credential.payment_credential(address_parameters).is_unusual_path:
raise wire.DataError(f"Invalid {CHANGE_OUTPUT_PATH_NAME.lower()}")
if Credential.stake_credential(address_parameters).is_unusual_path:
raise wire.DataError(f"Invalid {CHANGE_OUTPUT_STAKING_PATH_NAME.lower()}")