You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
926 lines
34 KiB
926 lines
34 KiB
from micropython import const
|
|
from typing import TYPE_CHECKING
|
|
|
|
from trezor import wire
|
|
from trezor.crypto import hashlib
|
|
from trezor.crypto.curve import ed25519
|
|
from trezor.enums import (
|
|
CardanoAddressType,
|
|
CardanoCertificateType,
|
|
CardanoTxWitnessType,
|
|
)
|
|
from trezor.messages import (
|
|
CardanoAddressParametersType,
|
|
CardanoAssetGroup,
|
|
CardanoPoolOwner,
|
|
CardanoPoolRelayParameters,
|
|
CardanoSignTxInit,
|
|
CardanoToken,
|
|
CardanoTxAuxiliaryData,
|
|
CardanoTxBodyHash,
|
|
CardanoTxCertificate,
|
|
CardanoTxCollateralInput,
|
|
CardanoTxHostAck,
|
|
CardanoTxInput,
|
|
CardanoTxItemAck,
|
|
CardanoTxMint,
|
|
CardanoTxOutput,
|
|
CardanoTxRequiredSigner,
|
|
CardanoTxWithdrawal,
|
|
CardanoTxWitnessRequest,
|
|
CardanoTxWitnessResponse,
|
|
)
|
|
|
|
from apps.common import cbor, safety_checks
|
|
|
|
from .. import seed
|
|
from ..address import (
|
|
ADDRESS_TYPES_PAYMENT_SCRIPT,
|
|
derive_address_bytes,
|
|
derive_human_readable_address,
|
|
get_address_bytes_unsafe,
|
|
get_address_type,
|
|
validate_output_address,
|
|
validate_output_address_parameters,
|
|
)
|
|
from ..auxiliary_data import (
|
|
get_auxiliary_data_hash_and_supplement,
|
|
show_auxiliary_data,
|
|
validate_auxiliary_data,
|
|
)
|
|
from ..certificates import (
|
|
assert_certificate_cond,
|
|
cborize_certificate,
|
|
cborize_initial_pool_registration_certificate_fields,
|
|
cborize_pool_metadata,
|
|
cborize_pool_owner,
|
|
cborize_pool_relay,
|
|
validate_certificate,
|
|
validate_pool_owner,
|
|
validate_pool_relay,
|
|
)
|
|
from ..helpers import (
|
|
ADDRESS_KEY_HASH_SIZE,
|
|
INPUT_PREV_HASH_SIZE,
|
|
LOVELACE_MAX_SUPPLY,
|
|
OUTPUT_DATUM_HASH_SIZE,
|
|
SCRIPT_DATA_HASH_SIZE,
|
|
)
|
|
from ..helpers.account_path_check import AccountPathChecker
|
|
from ..helpers.credential import Credential, should_show_address_credentials
|
|
from ..helpers.hash_builder_collection import HashBuilderDict, HashBuilderList
|
|
from ..helpers.paths import (
|
|
CERTIFICATE_PATH_NAME,
|
|
CHANGE_OUTPUT_PATH_NAME,
|
|
CHANGE_OUTPUT_STAKING_PATH_NAME,
|
|
POOL_OWNER_STAKING_PATH_NAME,
|
|
SCHEMA_STAKING,
|
|
)
|
|
from ..helpers.utils import (
|
|
derive_public_key,
|
|
get_public_key_hash,
|
|
validate_network_info,
|
|
validate_stake_credential,
|
|
)
|
|
from ..layout import (
|
|
confirm_certificate,
|
|
confirm_collateral_input,
|
|
confirm_required_signer,
|
|
confirm_script_data_hash,
|
|
confirm_sending,
|
|
confirm_sending_token,
|
|
confirm_stake_pool_metadata,
|
|
confirm_stake_pool_owner,
|
|
confirm_stake_pool_parameters,
|
|
confirm_token_minting,
|
|
confirm_transaction,
|
|
confirm_withdrawal,
|
|
confirm_witness_request,
|
|
show_change_output_credentials,
|
|
show_warning_path,
|
|
show_warning_tx_contains_mint,
|
|
show_warning_tx_network_unverifiable,
|
|
show_warning_tx_output_contains_datum_hash,
|
|
show_warning_tx_output_contains_tokens,
|
|
show_warning_tx_output_no_datum_hash,
|
|
)
|
|
from ..seed import is_byron_path, is_minting_path, is_multisig_path, is_shelley_path
|
|
|
|
if TYPE_CHECKING:
|
|
from typing import Any
|
|
from apps.common.paths import PathSchema
|
|
|
|
CardanoTxResponseType = CardanoTxItemAck | CardanoTxWitnessResponse
|
|
|
|
MINTING_POLICY_ID_LENGTH = 28
|
|
MAX_ASSET_NAME_LENGTH = 32
|
|
|
|
TX_BODY_KEY_INPUTS = const(0)
|
|
TX_BODY_KEY_OUTPUTS = const(1)
|
|
TX_BODY_KEY_FEE = const(2)
|
|
TX_BODY_KEY_TTL = const(3)
|
|
TX_BODY_KEY_CERTIFICATES = const(4)
|
|
TX_BODY_KEY_WITHDRAWALS = const(5)
|
|
TX_BODY_KEY_AUXILIARY_DATA = const(7)
|
|
TX_BODY_KEY_VALIDITY_INTERVAL_START = const(8)
|
|
TX_BODY_KEY_MINT = const(9)
|
|
TX_BODY_KEY_SCRIPT_DATA_HASH = const(11)
|
|
TX_BODY_KEY_COLLATERAL_INPUTS = const(13)
|
|
TX_BODY_KEY_REQUIRED_SIGNERS = const(14)
|
|
TX_BODY_KEY_NETWORK_ID = const(15)
|
|
|
|
POOL_REGISTRATION_CERTIFICATE_ITEMS_COUNT = 10
|
|
|
|
|
|
class Signer:
|
|
"""
|
|
This class encapsulates the entire tx signing process. By default, most tx items are
|
|
allowed and shown to the user. For each signing mode, there is a subclass that
|
|
overrides some methods, usually to add more validation rules and show/hide some
|
|
items. Each tx item is processed in a _process_xyz() method which handles validation,
|
|
user confirmation and serialization of the tx item.
|
|
"""
|
|
|
|
def __init__(
|
|
self, ctx: wire.Context, msg: CardanoSignTxInit, keychain: seed.Keychain
|
|
) -> None:
|
|
self.ctx = ctx
|
|
self.msg = msg
|
|
self.keychain = keychain
|
|
|
|
self.account_path_checker = AccountPathChecker()
|
|
|
|
# Inputs, outputs and fee are mandatory, count the number of optional fields present.
|
|
tx_body_map_item_count = 3 + sum(
|
|
(
|
|
msg.ttl is not None,
|
|
msg.certificates_count > 0,
|
|
msg.withdrawals_count > 0,
|
|
msg.has_auxiliary_data,
|
|
msg.validity_interval_start is not None,
|
|
msg.minting_asset_groups_count > 0,
|
|
msg.include_network_id,
|
|
msg.script_data_hash is not None,
|
|
msg.collateral_inputs_count > 0,
|
|
msg.required_signers_count > 0,
|
|
)
|
|
)
|
|
self.tx_dict: HashBuilderDict[int, Any] = HashBuilderDict(
|
|
tx_body_map_item_count, wire.ProcessError("Invalid tx signing request")
|
|
)
|
|
|
|
async def sign(self) -> None:
|
|
hash_fn = hashlib.blake2b(outlen=32)
|
|
self.tx_dict.start(hash_fn)
|
|
with self.tx_dict:
|
|
await self._processs_tx_signing_request()
|
|
|
|
tx_hash = hash_fn.digest()
|
|
await self._confirm_transaction(tx_hash)
|
|
|
|
response_after_witness_requests = await self._process_witness_requests(tx_hash)
|
|
await self.ctx.call(response_after_witness_requests, CardanoTxHostAck)
|
|
await self.ctx.call(CardanoTxBodyHash(tx_hash=tx_hash), CardanoTxHostAck)
|
|
|
|
# signing request
|
|
|
|
async def _processs_tx_signing_request(self) -> None:
|
|
self._validate_tx_signing_request()
|
|
await self._show_tx_signing_request()
|
|
|
|
inputs_list: HashBuilderList[tuple[bytes, int]] = HashBuilderList(
|
|
self.msg.inputs_count
|
|
)
|
|
with self.tx_dict.add(TX_BODY_KEY_INPUTS, inputs_list):
|
|
await self._process_inputs(inputs_list)
|
|
|
|
outputs_list: HashBuilderList = HashBuilderList(self.msg.outputs_count)
|
|
with self.tx_dict.add(TX_BODY_KEY_OUTPUTS, outputs_list):
|
|
await self._process_outputs(outputs_list)
|
|
|
|
self.tx_dict.add(TX_BODY_KEY_FEE, self.msg.fee)
|
|
|
|
if self.msg.ttl is not None:
|
|
self.tx_dict.add(TX_BODY_KEY_TTL, self.msg.ttl)
|
|
|
|
if self.msg.certificates_count > 0:
|
|
certificates_list: HashBuilderList = HashBuilderList(
|
|
self.msg.certificates_count
|
|
)
|
|
with self.tx_dict.add(TX_BODY_KEY_CERTIFICATES, certificates_list):
|
|
await self._process_certificates(certificates_list)
|
|
|
|
if self.msg.withdrawals_count > 0:
|
|
withdrawals_dict: HashBuilderDict[bytes, int] = HashBuilderDict(
|
|
self.msg.withdrawals_count, wire.ProcessError("Invalid withdrawal")
|
|
)
|
|
with self.tx_dict.add(TX_BODY_KEY_WITHDRAWALS, withdrawals_dict):
|
|
await self._process_withdrawals(withdrawals_dict)
|
|
|
|
if self.msg.has_auxiliary_data:
|
|
await self._process_auxiliary_data()
|
|
|
|
if self.msg.validity_interval_start is not None:
|
|
self.tx_dict.add(
|
|
TX_BODY_KEY_VALIDITY_INTERVAL_START, self.msg.validity_interval_start
|
|
)
|
|
|
|
if self.msg.minting_asset_groups_count > 0:
|
|
minting_dict: HashBuilderDict[bytes, HashBuilderDict] = HashBuilderDict(
|
|
self.msg.minting_asset_groups_count,
|
|
wire.ProcessError("Invalid mint token bundle"),
|
|
)
|
|
with self.tx_dict.add(TX_BODY_KEY_MINT, minting_dict):
|
|
await self._process_minting(minting_dict)
|
|
|
|
if self.msg.script_data_hash is not None:
|
|
await self._process_script_data_hash()
|
|
|
|
if self.msg.collateral_inputs_count > 0:
|
|
collateral_inputs_list: HashBuilderList[
|
|
tuple[bytes, int]
|
|
] = HashBuilderList(self.msg.collateral_inputs_count)
|
|
with self.tx_dict.add(
|
|
TX_BODY_KEY_COLLATERAL_INPUTS, collateral_inputs_list
|
|
):
|
|
await self._process_collateral_inputs(collateral_inputs_list)
|
|
|
|
if self.msg.required_signers_count > 0:
|
|
required_signers_list: HashBuilderList[bytes] = HashBuilderList(
|
|
self.msg.required_signers_count
|
|
)
|
|
with self.tx_dict.add(TX_BODY_KEY_REQUIRED_SIGNERS, required_signers_list):
|
|
await self._process_required_signers(required_signers_list)
|
|
|
|
if self.msg.include_network_id:
|
|
self.tx_dict.add(TX_BODY_KEY_NETWORK_ID, self.msg.network_id)
|
|
|
|
def _validate_tx_signing_request(self) -> None:
|
|
if self.msg.fee > LOVELACE_MAX_SUPPLY:
|
|
raise wire.ProcessError("Fee is out of range!")
|
|
validate_network_info(self.msg.network_id, self.msg.protocol_magic)
|
|
|
|
async def _show_tx_signing_request(self) -> None:
|
|
if not self._is_network_id_verifiable():
|
|
await show_warning_tx_network_unverifiable(self.ctx)
|
|
|
|
async def _confirm_tx(self, tx_hash: bytes) -> None:
|
|
# Final signing confirmation is handled separately in each signing mode.
|
|
raise NotImplementedError
|
|
|
|
# inputs
|
|
|
|
async def _process_inputs(
|
|
self, inputs_list: HashBuilderList[tuple[bytes, int]]
|
|
) -> None:
|
|
for _ in range(self.msg.inputs_count):
|
|
input: CardanoTxInput = await self.ctx.call(
|
|
CardanoTxItemAck(), CardanoTxInput
|
|
)
|
|
self._validate_input(input)
|
|
await self._show_input(input)
|
|
inputs_list.append((input.prev_hash, input.prev_index))
|
|
|
|
def _validate_input(self, input: CardanoTxInput) -> None:
|
|
if len(input.prev_hash) != INPUT_PREV_HASH_SIZE:
|
|
raise wire.ProcessError("Invalid input")
|
|
|
|
async def _show_input(self, input: CardanoTxInput) -> None:
|
|
# We never show the inputs, except for Plutus txs.
|
|
pass
|
|
|
|
# outputs
|
|
|
|
async def _process_outputs(self, outputs_list: HashBuilderList) -> None:
|
|
total_amount = 0
|
|
for _ in range(self.msg.outputs_count):
|
|
output: CardanoTxOutput = await self.ctx.call(
|
|
CardanoTxItemAck(), CardanoTxOutput
|
|
)
|
|
self._validate_output(output)
|
|
await self._show_output(output)
|
|
|
|
output_address = self._get_output_address(output)
|
|
|
|
has_datum_hash = output.datum_hash is not None
|
|
output_list: HashBuilderList = HashBuilderList(2 + int(has_datum_hash))
|
|
with outputs_list.append(output_list):
|
|
output_list.append(output_address)
|
|
if output.asset_groups_count == 0:
|
|
# output structure is: [address, amount, datum_hash?]
|
|
output_list.append(output.amount)
|
|
else:
|
|
# output structure is: [address, [amount, asset_groups], datum_hash?]
|
|
output_value_list: HashBuilderList = HashBuilderList(2)
|
|
with output_list.append(output_value_list):
|
|
output_value_list.append(output.amount)
|
|
asset_groups_dict: HashBuilderDict[
|
|
bytes, HashBuilderDict[bytes, int]
|
|
] = HashBuilderDict(
|
|
output.asset_groups_count,
|
|
wire.ProcessError("Invalid token bundle in output"),
|
|
)
|
|
with output_value_list.append(asset_groups_dict):
|
|
await self._process_asset_groups(
|
|
asset_groups_dict,
|
|
output.asset_groups_count,
|
|
self._should_show_output(output),
|
|
)
|
|
if has_datum_hash:
|
|
output_list.append(output.datum_hash)
|
|
|
|
total_amount += output.amount
|
|
|
|
if total_amount > LOVELACE_MAX_SUPPLY:
|
|
raise wire.ProcessError("Total transaction amount is out of range!")
|
|
|
|
def _validate_output(self, output: CardanoTxOutput) -> None:
|
|
if output.address_parameters is not None and output.address is not None:
|
|
raise wire.ProcessError("Invalid output")
|
|
|
|
if output.address_parameters is not None:
|
|
validate_output_address_parameters(output.address_parameters)
|
|
self._fail_if_strict_and_unusual(output.address_parameters)
|
|
elif output.address is not None:
|
|
validate_output_address(
|
|
output.address, self.msg.protocol_magic, self.msg.network_id
|
|
)
|
|
else:
|
|
raise wire.ProcessError("Invalid output")
|
|
|
|
if output.datum_hash is not None:
|
|
if len(output.datum_hash) != OUTPUT_DATUM_HASH_SIZE:
|
|
raise wire.ProcessError("Invalid output datum hash")
|
|
address_type = self._get_output_address_type(output)
|
|
if address_type not in ADDRESS_TYPES_PAYMENT_SCRIPT:
|
|
raise wire.ProcessError("Invalid output")
|
|
|
|
self.account_path_checker.add_output(output)
|
|
|
|
async def _show_output(self, output: CardanoTxOutput) -> None:
|
|
if not self._should_show_output(output):
|
|
return
|
|
|
|
if output.datum_hash is not None:
|
|
await show_warning_tx_output_contains_datum_hash(
|
|
self.ctx, output.datum_hash
|
|
)
|
|
|
|
address_type = self._get_output_address_type(output)
|
|
if output.datum_hash is None and address_type in ADDRESS_TYPES_PAYMENT_SCRIPT:
|
|
await show_warning_tx_output_no_datum_hash(self.ctx)
|
|
|
|
if output.asset_groups_count > 0:
|
|
await show_warning_tx_output_contains_tokens(self.ctx)
|
|
|
|
if output.address_parameters is not None:
|
|
address = derive_human_readable_address(
|
|
self.keychain,
|
|
output.address_parameters,
|
|
self.msg.protocol_magic,
|
|
self.msg.network_id,
|
|
)
|
|
await self._show_output_credentials(output.address_parameters)
|
|
else:
|
|
assert output.address is not None # _validate_output
|
|
address = output.address
|
|
|
|
await confirm_sending(
|
|
self.ctx,
|
|
output.amount,
|
|
address,
|
|
self._is_change_output(output),
|
|
self.msg.network_id,
|
|
)
|
|
|
|
async def _show_output_credentials(
|
|
self, address_parameters: CardanoAddressParametersType
|
|
) -> None:
|
|
await show_change_output_credentials(
|
|
self.ctx,
|
|
Credential.payment_credential(address_parameters),
|
|
Credential.stake_credential(address_parameters),
|
|
)
|
|
|
|
def _should_show_output(self, output: CardanoTxOutput) -> bool:
|
|
"""
|
|
Determines whether the output should be shown. Extracted from _show_output because
|
|
of readability and because the same decision is made when displaying output tokens.
|
|
"""
|
|
if output.datum_hash is not None:
|
|
# The `return False` case below should not be reachable when datum hash is
|
|
# present, but let's make it explicit.
|
|
return True
|
|
|
|
address_type = self._get_output_address_type(output)
|
|
if output.datum_hash is None and address_type in ADDRESS_TYPES_PAYMENT_SCRIPT:
|
|
# Plutus script address without a datum hash is unspendable, we must show a warning.
|
|
return True
|
|
|
|
if output.address_parameters is not None: # change output
|
|
if not should_show_address_credentials(output.address_parameters):
|
|
# We don't need to display simple address outputs.
|
|
return False
|
|
|
|
return True
|
|
|
|
def _is_change_output(self, output: CardanoTxOutput) -> bool:
|
|
"""Used only to determine what message to show to the user when confirming sending."""
|
|
return output.address_parameters is not None
|
|
|
|
# asset groups
|
|
|
|
async def _process_asset_groups(
|
|
self,
|
|
asset_groups_dict: HashBuilderDict[bytes, HashBuilderDict[bytes, int]],
|
|
asset_groups_count: int,
|
|
should_show_tokens: bool,
|
|
) -> None:
|
|
for _ in range(asset_groups_count):
|
|
asset_group: CardanoAssetGroup = await self.ctx.call(
|
|
CardanoTxItemAck(), CardanoAssetGroup
|
|
)
|
|
self._validate_asset_group(asset_group)
|
|
|
|
tokens: HashBuilderDict[bytes, int] = HashBuilderDict(
|
|
asset_group.tokens_count,
|
|
wire.ProcessError("Invalid token bundle in output"),
|
|
)
|
|
with asset_groups_dict.add(asset_group.policy_id, tokens):
|
|
await self._process_tokens(
|
|
tokens,
|
|
asset_group.policy_id,
|
|
asset_group.tokens_count,
|
|
should_show_tokens,
|
|
)
|
|
|
|
def _validate_asset_group(
|
|
self, asset_group: CardanoAssetGroup, is_mint: bool = False
|
|
) -> None:
|
|
INVALID_TOKEN_BUNDLE = (
|
|
wire.ProcessError("Invalid mint token bundle")
|
|
if is_mint
|
|
else wire.ProcessError("Invalid token bundle in output")
|
|
)
|
|
|
|
if len(asset_group.policy_id) != MINTING_POLICY_ID_LENGTH:
|
|
raise INVALID_TOKEN_BUNDLE
|
|
if asset_group.tokens_count == 0:
|
|
raise INVALID_TOKEN_BUNDLE
|
|
|
|
# tokens
|
|
|
|
async def _process_tokens(
|
|
self,
|
|
tokens_dict: HashBuilderDict[bytes, int],
|
|
policy_id: bytes,
|
|
tokens_count: int,
|
|
should_show_tokens: bool,
|
|
) -> None:
|
|
for _ in range(tokens_count):
|
|
token: CardanoToken = await self.ctx.call(CardanoTxItemAck(), CardanoToken)
|
|
self._validate_token(token)
|
|
if should_show_tokens:
|
|
await confirm_sending_token(self.ctx, policy_id, token)
|
|
|
|
assert token.amount is not None # _validate_token
|
|
tokens_dict.add(token.asset_name_bytes, token.amount)
|
|
|
|
def _validate_token(self, token: CardanoToken, is_mint: bool = False) -> None:
|
|
INVALID_TOKEN_BUNDLE = (
|
|
wire.ProcessError("Invalid mint token bundle")
|
|
if is_mint
|
|
else wire.ProcessError("Invalid token bundle in output")
|
|
)
|
|
|
|
if is_mint:
|
|
if token.mint_amount is None or token.amount is not None:
|
|
raise INVALID_TOKEN_BUNDLE
|
|
else:
|
|
if token.amount is None or token.mint_amount is not None:
|
|
raise INVALID_TOKEN_BUNDLE
|
|
|
|
if len(token.asset_name_bytes) > MAX_ASSET_NAME_LENGTH:
|
|
raise INVALID_TOKEN_BUNDLE
|
|
|
|
# certificates
|
|
|
|
async def _process_certificates(self, certificates_list: HashBuilderList) -> None:
|
|
for _ in range(self.msg.certificates_count):
|
|
certificate: CardanoTxCertificate = await self.ctx.call(
|
|
CardanoTxItemAck(), CardanoTxCertificate
|
|
)
|
|
self._validate_certificate(certificate)
|
|
await self._show_certificate(certificate)
|
|
|
|
if certificate.type == CardanoCertificateType.STAKE_POOL_REGISTRATION:
|
|
pool_parameters = certificate.pool_parameters
|
|
assert pool_parameters is not None # _validate_certificate
|
|
|
|
pool_items_list: HashBuilderList = HashBuilderList(
|
|
POOL_REGISTRATION_CERTIFICATE_ITEMS_COUNT
|
|
)
|
|
with certificates_list.append(pool_items_list):
|
|
for item in cborize_initial_pool_registration_certificate_fields(
|
|
certificate
|
|
):
|
|
pool_items_list.append(item)
|
|
|
|
pool_owners_list: HashBuilderList[bytes] = HashBuilderList(
|
|
pool_parameters.owners_count
|
|
)
|
|
with pool_items_list.append(pool_owners_list):
|
|
await self._process_pool_owners(
|
|
pool_owners_list, pool_parameters.owners_count
|
|
)
|
|
|
|
relays_list: HashBuilderList[cbor.CborSequence] = HashBuilderList(
|
|
pool_parameters.relays_count
|
|
)
|
|
with pool_items_list.append(relays_list):
|
|
await self._process_pool_relays(
|
|
relays_list, pool_parameters.relays_count
|
|
)
|
|
|
|
pool_items_list.append(
|
|
cborize_pool_metadata(pool_parameters.metadata)
|
|
)
|
|
else:
|
|
certificates_list.append(
|
|
cborize_certificate(self.keychain, certificate)
|
|
)
|
|
|
|
def _validate_certificate(self, certificate: CardanoTxCertificate) -> None:
|
|
validate_certificate(
|
|
certificate,
|
|
self.msg.protocol_magic,
|
|
self.msg.network_id,
|
|
self.account_path_checker,
|
|
)
|
|
|
|
async def _show_certificate(self, certificate: CardanoTxCertificate) -> None:
|
|
if certificate.path:
|
|
await self._fail_or_warn_if_invalid_path(
|
|
SCHEMA_STAKING, certificate.path, CERTIFICATE_PATH_NAME
|
|
)
|
|
|
|
if certificate.type == CardanoCertificateType.STAKE_POOL_REGISTRATION:
|
|
assert certificate.pool_parameters is not None
|
|
await confirm_stake_pool_parameters(
|
|
self.ctx, certificate.pool_parameters, self.msg.network_id
|
|
)
|
|
await confirm_stake_pool_metadata(
|
|
self.ctx, certificate.pool_parameters.metadata
|
|
)
|
|
else:
|
|
await confirm_certificate(self.ctx, certificate)
|
|
|
|
# pool owners
|
|
|
|
async def _process_pool_owners(
|
|
self, pool_owners_list: HashBuilderList[bytes], owners_count: int
|
|
) -> None:
|
|
owners_as_path_count = 0
|
|
for _ in range(owners_count):
|
|
owner: CardanoPoolOwner = await self.ctx.call(
|
|
CardanoTxItemAck(), CardanoPoolOwner
|
|
)
|
|
validate_pool_owner(owner, self.account_path_checker)
|
|
await self._show_pool_owner(owner)
|
|
pool_owners_list.append(cborize_pool_owner(self.keychain, owner))
|
|
|
|
if owner.staking_key_path:
|
|
owners_as_path_count += 1
|
|
|
|
assert_certificate_cond(owners_as_path_count == 1)
|
|
|
|
async def _show_pool_owner(self, owner: CardanoPoolOwner) -> None:
|
|
if owner.staking_key_path:
|
|
await self._fail_or_warn_if_invalid_path(
|
|
SCHEMA_STAKING, owner.staking_key_path, POOL_OWNER_STAKING_PATH_NAME
|
|
)
|
|
|
|
await confirm_stake_pool_owner(
|
|
self.ctx, self.keychain, owner, self.msg.protocol_magic, self.msg.network_id
|
|
)
|
|
|
|
# pool relays
|
|
|
|
async def _process_pool_relays(
|
|
self,
|
|
relays_list: HashBuilderList[cbor.CborSequence],
|
|
relays_count: int,
|
|
) -> None:
|
|
for _ in range(relays_count):
|
|
relay: CardanoPoolRelayParameters = await self.ctx.call(
|
|
CardanoTxItemAck(), CardanoPoolRelayParameters
|
|
)
|
|
validate_pool_relay(relay)
|
|
relays_list.append(cborize_pool_relay(relay))
|
|
|
|
# withdrawals
|
|
|
|
async def _process_withdrawals(
|
|
self, withdrawals_dict: HashBuilderDict[bytes, int]
|
|
) -> None:
|
|
for _ in range(self.msg.withdrawals_count):
|
|
withdrawal: CardanoTxWithdrawal = await self.ctx.call(
|
|
CardanoTxItemAck(), CardanoTxWithdrawal
|
|
)
|
|
self._validate_withdrawal(withdrawal)
|
|
reward_address_bytes = self._derive_withdrawal_reward_address_bytes(
|
|
withdrawal
|
|
)
|
|
await confirm_withdrawal(
|
|
self.ctx, withdrawal, reward_address_bytes, self.msg.network_id
|
|
)
|
|
withdrawals_dict.add(reward_address_bytes, withdrawal.amount)
|
|
|
|
def _validate_withdrawal(self, withdrawal: CardanoTxWithdrawal) -> None:
|
|
validate_stake_credential(
|
|
withdrawal.path,
|
|
withdrawal.script_hash,
|
|
withdrawal.key_hash,
|
|
wire.ProcessError("Invalid withdrawal"),
|
|
)
|
|
|
|
if not 0 <= withdrawal.amount < LOVELACE_MAX_SUPPLY:
|
|
raise wire.ProcessError("Invalid withdrawal")
|
|
|
|
self.account_path_checker.add_withdrawal(withdrawal)
|
|
|
|
# auxiliary data
|
|
|
|
async def _process_auxiliary_data(self) -> None:
|
|
auxiliary_data: CardanoTxAuxiliaryData = await self.ctx.call(
|
|
CardanoTxItemAck(), CardanoTxAuxiliaryData
|
|
)
|
|
validate_auxiliary_data(auxiliary_data)
|
|
|
|
(
|
|
auxiliary_data_hash,
|
|
auxiliary_data_supplement,
|
|
) = get_auxiliary_data_hash_and_supplement(
|
|
self.keychain, auxiliary_data, self.msg.protocol_magic, self.msg.network_id
|
|
)
|
|
await show_auxiliary_data(
|
|
self.ctx,
|
|
self.keychain,
|
|
auxiliary_data_hash,
|
|
auxiliary_data.catalyst_registration_parameters,
|
|
self.msg.protocol_magic,
|
|
self.msg.network_id,
|
|
)
|
|
self.tx_dict.add(TX_BODY_KEY_AUXILIARY_DATA, auxiliary_data_hash)
|
|
|
|
await self.ctx.call(auxiliary_data_supplement, CardanoTxHostAck)
|
|
|
|
# minting
|
|
|
|
async def _process_minting(
|
|
self, minting_dict: HashBuilderDict[bytes, HashBuilderDict]
|
|
) -> None:
|
|
token_minting: CardanoTxMint = await self.ctx.call(
|
|
CardanoTxItemAck(), CardanoTxMint
|
|
)
|
|
|
|
await show_warning_tx_contains_mint(self.ctx)
|
|
|
|
for _ in range(token_minting.asset_groups_count):
|
|
asset_group: CardanoAssetGroup = await self.ctx.call(
|
|
CardanoTxItemAck(), CardanoAssetGroup
|
|
)
|
|
self._validate_asset_group(asset_group, is_mint=True)
|
|
|
|
tokens: HashBuilderDict[bytes, int] = HashBuilderDict(
|
|
asset_group.tokens_count, wire.ProcessError("Invalid mint token bundle")
|
|
)
|
|
with minting_dict.add(asset_group.policy_id, tokens):
|
|
await self._process_minting_tokens(
|
|
tokens,
|
|
asset_group.policy_id,
|
|
asset_group.tokens_count,
|
|
)
|
|
|
|
# minting tokens
|
|
|
|
async def _process_minting_tokens(
|
|
self,
|
|
tokens: HashBuilderDict[bytes, int],
|
|
policy_id: bytes,
|
|
tokens_count: int,
|
|
) -> None:
|
|
for _ in range(tokens_count):
|
|
token: CardanoToken = await self.ctx.call(CardanoTxItemAck(), CardanoToken)
|
|
self._validate_token(token, is_mint=True)
|
|
await confirm_token_minting(self.ctx, policy_id, token)
|
|
|
|
assert token.mint_amount is not None # _validate_token
|
|
tokens.add(token.asset_name_bytes, token.mint_amount)
|
|
|
|
# script data hash
|
|
|
|
async def _process_script_data_hash(self) -> None:
|
|
assert self.msg.script_data_hash is not None
|
|
self._validate_script_data_hash()
|
|
await confirm_script_data_hash(self.ctx, self.msg.script_data_hash)
|
|
self.tx_dict.add(TX_BODY_KEY_SCRIPT_DATA_HASH, self.msg.script_data_hash)
|
|
|
|
def _validate_script_data_hash(self) -> None:
|
|
assert self.msg.script_data_hash is not None
|
|
if len(self.msg.script_data_hash) != SCRIPT_DATA_HASH_SIZE:
|
|
raise wire.ProcessError("Invalid script data hash")
|
|
|
|
# collateral inputs
|
|
|
|
async def _process_collateral_inputs(
|
|
self, collateral_inputs_list: HashBuilderList[tuple[bytes, int]]
|
|
) -> None:
|
|
for _ in range(self.msg.collateral_inputs_count):
|
|
collateral_input: CardanoTxCollateralInput = await self.ctx.call(
|
|
CardanoTxItemAck(), CardanoTxCollateralInput
|
|
)
|
|
self._validate_collateral_input(collateral_input)
|
|
await confirm_collateral_input(self.ctx, collateral_input)
|
|
collateral_inputs_list.append(
|
|
(collateral_input.prev_hash, collateral_input.prev_index)
|
|
)
|
|
|
|
def _validate_collateral_input(
|
|
self, collateral_input: CardanoTxCollateralInput
|
|
) -> None:
|
|
if len(collateral_input.prev_hash) != INPUT_PREV_HASH_SIZE:
|
|
raise wire.ProcessError("Invalid collateral input")
|
|
|
|
# required signers
|
|
|
|
async def _process_required_signers(
|
|
self, required_signers_list: HashBuilderList[bytes]
|
|
) -> None:
|
|
for _ in range(self.msg.required_signers_count):
|
|
required_signer: CardanoTxRequiredSigner = await self.ctx.call(
|
|
CardanoTxItemAck(), CardanoTxRequiredSigner
|
|
)
|
|
self._validate_required_signer(required_signer)
|
|
await confirm_required_signer(self.ctx, required_signer)
|
|
|
|
key_hash = required_signer.key_hash or get_public_key_hash(
|
|
self.keychain, required_signer.key_path
|
|
)
|
|
required_signers_list.append(key_hash)
|
|
|
|
def _validate_required_signer(
|
|
self, required_signer: CardanoTxRequiredSigner
|
|
) -> None:
|
|
INVALID_REQUIRED_SIGNER = wire.ProcessError("Invalid required signer")
|
|
|
|
if required_signer.key_hash and required_signer.key_path:
|
|
raise INVALID_REQUIRED_SIGNER
|
|
|
|
if required_signer.key_hash:
|
|
if len(required_signer.key_hash) != ADDRESS_KEY_HASH_SIZE:
|
|
raise INVALID_REQUIRED_SIGNER
|
|
elif required_signer.key_path:
|
|
if not (
|
|
is_shelley_path(required_signer.key_path)
|
|
or is_multisig_path(required_signer.key_path)
|
|
or is_minting_path(required_signer.key_path)
|
|
):
|
|
raise INVALID_REQUIRED_SIGNER
|
|
else:
|
|
raise INVALID_REQUIRED_SIGNER
|
|
|
|
# witness requests
|
|
|
|
async def _process_witness_requests(self, tx_hash: bytes) -> CardanoTxResponseType:
|
|
response: CardanoTxResponseType = CardanoTxItemAck()
|
|
|
|
for _ in range(self.msg.witness_requests_count):
|
|
witness_request = await self.ctx.call(response, CardanoTxWitnessRequest)
|
|
self._validate_witness_request(witness_request)
|
|
path = witness_request.path
|
|
await self._show_witness_request(path)
|
|
if is_byron_path(path):
|
|
response = self._get_byron_witness(path, tx_hash)
|
|
else:
|
|
response = self._get_shelley_witness(path, tx_hash)
|
|
|
|
return response
|
|
|
|
def _validate_witness_request(
|
|
self, witness_request: CardanoTxWitnessRequest
|
|
) -> None:
|
|
self.account_path_checker.add_witness_request(witness_request)
|
|
|
|
async def _show_witness_request(
|
|
self,
|
|
witness_path: list[int],
|
|
) -> None:
|
|
await confirm_witness_request(self.ctx, witness_path)
|
|
|
|
# helpers
|
|
|
|
def _is_network_id_verifiable(self) -> bool:
|
|
"""
|
|
Checks whether there is at least one element that contains information about
|
|
network ID, otherwise Trezor cannot guarantee that the tx is actually meant for
|
|
the given network.
|
|
|
|
Note: Shelley addresses contain network id. The intended network of Byron
|
|
addresses can be determined based on whether they contain the protocol magic.
|
|
These checks are performed during address validation.
|
|
"""
|
|
return (
|
|
self.msg.include_network_id
|
|
or self.msg.outputs_count != 0
|
|
or self.msg.withdrawals_count != 0
|
|
)
|
|
|
|
def _get_output_address(self, output: CardanoTxOutput) -> bytes:
|
|
if output.address_parameters:
|
|
return derive_address_bytes(
|
|
self.keychain,
|
|
output.address_parameters,
|
|
self.msg.protocol_magic,
|
|
self.msg.network_id,
|
|
)
|
|
else:
|
|
assert output.address is not None # _validate_output
|
|
return get_address_bytes_unsafe(output.address)
|
|
|
|
def _get_output_address_type(self, output: CardanoTxOutput) -> CardanoAddressType:
|
|
if output.address_parameters:
|
|
return output.address_parameters.address_type
|
|
assert output.address is not None # _validate_output
|
|
return get_address_type(get_address_bytes_unsafe(output.address))
|
|
|
|
def _derive_withdrawal_reward_address_bytes(
|
|
self, withdrawal: CardanoTxWithdrawal
|
|
) -> bytes:
|
|
reward_address_type = (
|
|
CardanoAddressType.REWARD
|
|
if withdrawal.path or withdrawal.key_hash
|
|
else CardanoAddressType.REWARD_SCRIPT
|
|
)
|
|
return derive_address_bytes(
|
|
self.keychain,
|
|
CardanoAddressParametersType(
|
|
address_type=reward_address_type,
|
|
address_n_staking=withdrawal.path,
|
|
staking_key_hash=withdrawal.key_hash,
|
|
script_staking_hash=withdrawal.script_hash,
|
|
),
|
|
self.msg.protocol_magic,
|
|
self.msg.network_id,
|
|
)
|
|
|
|
def _get_byron_witness(
|
|
self, path: list[int], tx_hash: bytes
|
|
) -> CardanoTxWitnessResponse:
|
|
node = self.keychain.derive(path)
|
|
return CardanoTxWitnessResponse(
|
|
type=CardanoTxWitnessType.BYRON_WITNESS,
|
|
pub_key=derive_public_key(self.keychain, path),
|
|
signature=self._sign_tx_hash(tx_hash, path),
|
|
chain_code=node.chain_code(),
|
|
)
|
|
|
|
def _get_shelley_witness(
|
|
self, path: list[int], tx_hash: bytes
|
|
) -> CardanoTxWitnessResponse:
|
|
return CardanoTxWitnessResponse(
|
|
type=CardanoTxWitnessType.SHELLEY_WITNESS,
|
|
pub_key=derive_public_key(self.keychain, path),
|
|
signature=self._sign_tx_hash(tx_hash, path),
|
|
)
|
|
|
|
def _sign_tx_hash(self, tx_body_hash: bytes, path: list[int]) -> bytes:
|
|
node = self.keychain.derive(path)
|
|
return ed25519.sign_ext(
|
|
node.private_key(), node.private_key_ext(), tx_body_hash
|
|
)
|
|
|
|
async def _fail_or_warn_if_invalid_path(
|
|
self, schema: PathSchema, path: list[int], path_name: str
|
|
) -> None:
|
|
if not schema.match(path):
|
|
await self._fail_or_warn_path(path, path_name)
|
|
|
|
async def _fail_or_warn_path(self, path: list[int], path_name: str) -> None:
|
|
if safety_checks.is_strict():
|
|
raise wire.DataError(f"Invalid {path_name.lower()}")
|
|
else:
|
|
await show_warning_path(self.ctx, path, path_name)
|
|
|
|
def _fail_if_strict_and_unusual(
|
|
self, address_parameters: CardanoAddressParametersType
|
|
) -> None:
|
|
if not safety_checks.is_strict():
|
|
return
|
|
|
|
if Credential.payment_credential(address_parameters).is_unusual_path:
|
|
raise wire.DataError(f"Invalid {CHANGE_OUTPUT_PATH_NAME.lower()}")
|
|
|
|
if Credential.stake_credential(address_parameters).is_unusual_path:
|
|
raise wire.DataError(f"Invalid {CHANGE_OUTPUT_STAKING_PATH_NAME.lower()}")
|