From 98f0496b2cddb6e70cc74adcc88a87ffba9129da Mon Sep 17 00:00:00 2001 From: matejcik Date: Tue, 20 Jul 2021 12:59:03 +0200 Subject: [PATCH] feat(core/stellar): add type checking to Stellar app --- core/.changelog.d/1755.changed | 1 + core/Makefile | 1 + core/src/apps/stellar/consts.py | 53 ++++++++++----- core/src/apps/stellar/get_address.py | 8 ++- core/src/apps/stellar/helpers.py | 4 +- core/src/apps/stellar/layout.py | 40 +++++++---- core/src/apps/stellar/operations/__init__.py | 8 ++- core/src/apps/stellar/operations/layout.py | 59 ++++++++++------ core/src/apps/stellar/operations/serialize.py | 68 +++++++++++-------- core/src/apps/stellar/sign_tx.py | 50 +++++++++----- core/src/apps/stellar/writers.py | 8 ++- core/src/trezor/crypto/base32.py | 12 ++-- .../test_msg_stellar_sign_transaction.py | 66 +++++++----------- 13 files changed, 224 insertions(+), 154 deletions(-) create mode 100644 core/.changelog.d/1755.changed diff --git a/core/.changelog.d/1755.changed b/core/.changelog.d/1755.changed new file mode 100644 index 000000000..b914c7f79 --- /dev/null +++ b/core/.changelog.d/1755.changed @@ -0,0 +1 @@ +Type-checking enabled for apps.stellar diff --git a/core/Makefile b/core/Makefile index 72655529d..ad269aa4d 100644 --- a/core/Makefile +++ b/core/Makefile @@ -115,6 +115,7 @@ mypy: src/apps/ethereum \ src/apps/management \ src/apps/misc \ + src/apps/stellar \ src/apps/webauthn \ src/trezor/ui diff --git a/core/src/apps/stellar/consts.py b/core/src/apps/stellar/consts.py index 0faf1f8b3..04ebe99ba 100644 --- a/core/src/apps/stellar/consts.py +++ b/core/src/apps/stellar/consts.py @@ -2,11 +2,44 @@ from micropython import const from trezor.enums import MessageType -TX_TYPE = bytearray("\x00\x00\x00\x02") +if False: + from typing import Union + from trezor import protobuf + + from trezor.messages import ( + StellarAccountMergeOp, + StellarAllowTrustOp, + StellarBumpSequenceOp, + StellarChangeTrustOp, + StellarCreateAccountOp, + StellarCreatePassiveOfferOp, + StellarManageDataOp, + StellarManageOfferOp, + StellarPathPaymentOp, + StellarPaymentOp, + StellarSetOptionsOp, + ) + + StellarMessageType = Union[ + StellarAccountMergeOp, + StellarAllowTrustOp, + StellarBumpSequenceOp, + StellarChangeTrustOp, + StellarCreateAccountOp, + StellarCreatePassiveOfferOp, + StellarManageDataOp, + StellarManageOfferOp, + StellarPathPaymentOp, + StellarPaymentOp, + StellarSetOptionsOp, + ] + + +TX_TYPE = b"\x00\x00\x00\x02" # source: https://github.com/stellar/go/blob/3d2c1defe73dbfed00146ebe0e8d7e07ce4bb1b6/xdr/Stellar-transaction.x#L16 # Inflation not supported see https://github.com/trezor/trezor-core/issues/202#issuecomment-393342089 -op_codes = { +op_codes: dict[int, int] = { MessageType.StellarAccountMergeOp: 8, MessageType.StellarAllowTrustOp: 7, MessageType.StellarBumpSequenceOp: 11, @@ -53,22 +86,8 @@ FLAG_AUTH_REVOCABLE = const(2) FLAG_AUTH_IMMUTABLE = const(4) FLAGS_MAX_SIZE = const(7) -# https://github.com/stellar/go/blob/e0ffe19f58879d3c31e2976b97a5bf10e13a337b/xdr/Stellar-transaction.x#L275 -MEMO_TYPE_NONE = const(0) -MEMO_TYPE_TEXT = const(1) -MEMO_TYPE_ID = const(2) -MEMO_TYPE_HASH = const(3) -MEMO_TYPE_RETURN = const(4) - -# https://github.com/stellar/go/blob/3d2c1defe73dbfed00146ebe0e8d7e07ce4bb1b6/xdr/xdr_generated.go#L156 -SIGN_TYPE_ACCOUNT = const(0) -SIGN_TYPE_PRE_AUTH = const(1) -SIGN_TYPE_HASH = const(2) - -SIGN_TYPES = (SIGN_TYPE_ACCOUNT, SIGN_TYPE_HASH, SIGN_TYPE_PRE_AUTH) - -def get_op_code(msg) -> int: +def get_op_code(msg: protobuf.MessageType) -> int: wire = msg.MESSAGE_WIRE_TYPE if wire not in op_codes: raise ValueError("Stellar: op code unknown") diff --git a/core/src/apps/stellar/get_address.py b/core/src/apps/stellar/get_address.py index d40cb9e0d..e48224093 100644 --- a/core/src/apps/stellar/get_address.py +++ b/core/src/apps/stellar/get_address.py @@ -6,9 +6,15 @@ from apps.common.keychain import auto_keychain from . import helpers +if False: + from trezor.wire import Context + from apps.common.keychain import Keychain + @auto_keychain(__name__) -async def get_address(ctx, msg: StellarGetAddress, keychain): +async def get_address( + ctx: Context, msg: StellarGetAddress, keychain: Keychain +) -> StellarAddress: await paths.validate_path(ctx, keychain, msg.address_n) node = keychain.derive(msg.address_n) diff --git a/core/src/apps/stellar/helpers.py b/core/src/apps/stellar/helpers.py index 6255154cb..fe2033695 100644 --- a/core/src/apps/stellar/helpers.py +++ b/core/src/apps/stellar/helpers.py @@ -14,7 +14,7 @@ def public_key_from_address(address: str) -> bytes: return b[1:-2] -def address_from_public_key(pubkey: bytes): +def address_from_public_key(pubkey: bytes) -> str: """Returns the base32-encoded version of public key bytes (G...)""" address = bytearray() address.append(6 << 3) # version -> 'G' @@ -24,7 +24,7 @@ def address_from_public_key(pubkey: bytes): return base32.encode(address) -def _crc16_checksum_verify(data: bytes, checksum: bytes): +def _crc16_checksum_verify(data: bytes, checksum: bytes) -> None: if _crc16_checksum(data) != checksum: raise ProcessError("Invalid address checksum") diff --git a/core/src/apps/stellar/layout.py b/core/src/apps/stellar/layout.py index c4897e5ba..5247a0653 100644 --- a/core/src/apps/stellar/layout.py +++ b/core/src/apps/stellar/layout.py @@ -1,5 +1,5 @@ from trezor import strings, ui -from trezor.enums import ButtonRequestType +from trezor.enums import ButtonRequestType, StellarAssetType, StellarMemoType from trezor.ui.layouts import ( confirm_action, confirm_address, @@ -7,16 +7,22 @@ from trezor.ui.layouts import ( confirm_metadata, ) from trezor.ui.layouts.tt.altcoin import confirm_timebounds_stellar +from trezor.wire import DataError from . import consts if False: - from trezor.messages import StellarAssetType + from trezor.wire import Context + + from trezor.messages import StellarAsset async def require_confirm_init( - ctx, address: str, network_passphrase: str, accounts_match: bool -): + ctx: Context, + address: str, + network_passphrase: str, + accounts_match: bool, +) -> None: if accounts_match: description = "Initialize signing with your account" else: @@ -44,18 +50,20 @@ async def require_confirm_init( ) -async def require_confirm_timebounds(ctx, start: int, end: int): +async def require_confirm_timebounds(ctx: Context, start: int, end: int) -> None: await confirm_timebounds_stellar(ctx, start, end) -async def require_confirm_memo(ctx, memo_type: int, memo_text: str): - if memo_type == consts.MEMO_TYPE_TEXT: +async def require_confirm_memo( + ctx: Context, memo_type: StellarMemoType, memo_text: str +) -> None: + if memo_type == StellarMemoType.TEXT: description = "Memo (TEXT)" - elif memo_type == consts.MEMO_TYPE_ID: + elif memo_type == StellarMemoType.ID: description = "Memo (ID)" - elif memo_type == consts.MEMO_TYPE_HASH: + elif memo_type == StellarMemoType.HASH: description = "Memo (HASH)" - elif memo_type == consts.MEMO_TYPE_RETURN: + elif memo_type == StellarMemoType.RETURN: description = "Memo (RETURN)" else: return await confirm_action( @@ -78,7 +86,7 @@ async def require_confirm_memo(ctx, memo_type: int, memo_text: str): ) -async def require_confirm_final(ctx, fee: int, num_operations: int): +async def require_confirm_final(ctx: Context, fee: int, num_operations: int) -> None: op_str = strings.format_plural("{count} {plural}", num_operations, "operation") await confirm_metadata( ctx, @@ -91,14 +99,16 @@ async def require_confirm_final(ctx, fee: int, num_operations: int): ) -def format_asset(asset: StellarAssetType | None = None) -> str: - if asset is None or asset.type == consts.ASSET_TYPE_NATIVE: +def format_asset(asset: StellarAsset | None) -> str: + if asset is None or asset.type == StellarAssetType.NATIVE: return "XLM" else: + if asset.code is None: + raise DataError("Stellar asset code is missing") return asset.code -def format_amount(amount: int, asset: StellarAssetType | None = None) -> str: +def format_amount(amount: int, asset: StellarAsset | None = None) -> str: return ( strings.format_amount(amount, consts.AMOUNT_DECIMALS) + " " @@ -106,7 +116,7 @@ def format_amount(amount: int, asset: StellarAssetType | None = None) -> str: ) -def get_network_warning(network_passphrase: str): +def get_network_warning(network_passphrase: str) -> str | None: if network_passphrase == consts.NETWORK_PASSPHRASE_PUBLIC: return None if network_passphrase == consts.NETWORK_PASSPHRASE_TESTNET: diff --git a/core/src/apps/stellar/operations/__init__.py b/core/src/apps/stellar/operations/__init__.py index 5325da06d..dbac9834c 100644 --- a/core/src/apps/stellar/operations/__init__.py +++ b/core/src/apps/stellar/operations/__init__.py @@ -1,8 +1,14 @@ from .. import consts, writers from . import layout, serialize +if False: + from trezor.utils import Writer + from trezor.wire import Context -async def process_operation(ctx, w, op): + +async def process_operation( + ctx: Context, w: Writer, op: consts.StellarMessageType +) -> None: if op.source_account: await layout.confirm_source_account(ctx, op.source_account) serialize.write_account(w, op.source_account) diff --git a/core/src/apps/stellar/operations/layout.py b/core/src/apps/stellar/operations/layout.py index 9628738d8..b048f56ba 100644 --- a/core/src/apps/stellar/operations/layout.py +++ b/core/src/apps/stellar/operations/layout.py @@ -1,7 +1,8 @@ +from trezor.enums import StellarAssetType, StellarSignerType from trezor.messages import ( StellarAccountMergeOp, StellarAllowTrustOp, - StellarAssetType, + StellarAsset, StellarBumpSequenceOp, StellarChangeTrustOp, StellarCreateAccountOp, @@ -21,13 +22,16 @@ from trezor.ui.layouts import ( confirm_properties, confirm_text, ) -from trezor.wire import ProcessError +from trezor.wire import DataError, ProcessError from .. import consts, helpers from ..layout import format_amount, format_asset +if False: + from trezor.wire import Context -async def confirm_source_account(ctx, source_account: str): + +async def confirm_source_account(ctx: Context, source_account: str) -> None: await confirm_address( ctx, "Confirm operation", @@ -37,7 +41,7 @@ async def confirm_source_account(ctx, source_account: str): ) -async def confirm_allow_trust_op(ctx, op: StellarAllowTrustOp): +async def confirm_allow_trust_op(ctx: Context, op: StellarAllowTrustOp) -> None: await confirm_properties( ctx, "op_allow_trust", @@ -49,7 +53,7 @@ async def confirm_allow_trust_op(ctx, op: StellarAllowTrustOp): ) -async def confirm_account_merge_op(ctx, op: StellarAccountMergeOp): +async def confirm_account_merge_op(ctx: Context, op: StellarAccountMergeOp) -> None: await confirm_address( ctx, "Account Merge", @@ -59,7 +63,7 @@ async def confirm_account_merge_op(ctx, op: StellarAccountMergeOp): ) -async def confirm_bump_sequence_op(ctx, op: StellarBumpSequenceOp): +async def confirm_bump_sequence_op(ctx: Context, op: StellarBumpSequenceOp) -> None: await confirm_metadata( ctx, "op_bump", @@ -69,7 +73,7 @@ async def confirm_bump_sequence_op(ctx, op: StellarBumpSequenceOp): ) -async def confirm_change_trust_op(ctx, op: StellarChangeTrustOp): +async def confirm_change_trust_op(ctx: Context, op: StellarChangeTrustOp) -> None: await confirm_amount( ctx, title="Delete trust" if op.limit == 0 else "Add trust", @@ -80,7 +84,7 @@ async def confirm_change_trust_op(ctx, op: StellarChangeTrustOp): await confirm_asset_issuer(ctx, op.asset) -async def confirm_create_account_op(ctx, op: StellarCreateAccountOp): +async def confirm_create_account_op(ctx: Context, op: StellarCreateAccountOp) -> None: await confirm_properties( ctx, "op_create_account", @@ -92,7 +96,9 @@ async def confirm_create_account_op(ctx, op: StellarCreateAccountOp): ) -async def confirm_create_passive_offer_op(ctx, op: StellarCreatePassiveOfferOp): +async def confirm_create_passive_offer_op( + ctx: Context, op: StellarCreatePassiveOfferOp +) -> None: if op.amount == 0: text = "Delete Passive Offer" else: @@ -100,7 +106,7 @@ async def confirm_create_passive_offer_op(ctx, op: StellarCreatePassiveOfferOp): await _confirm_offer(ctx, text, op) -async def confirm_manage_offer_op(ctx, op: StellarManageOfferOp): +async def confirm_manage_offer_op(ctx: Context, op: StellarManageOfferOp) -> None: if op.offer_id == 0: text = "New Offer" else: @@ -112,7 +118,11 @@ async def confirm_manage_offer_op(ctx, op: StellarManageOfferOp): await _confirm_offer(ctx, text, op) -async def _confirm_offer(ctx, title, op): +async def _confirm_offer( + ctx: Context, + title: str, + op: StellarCreatePassiveOfferOp | StellarManageOfferOp, +) -> None: await confirm_properties( ctx, "op_offer", @@ -130,7 +140,7 @@ async def _confirm_offer(ctx, title, op): await confirm_asset_issuer(ctx, op.buying_asset) -async def confirm_manage_data_op(ctx, op: StellarManageDataOp): +async def confirm_manage_data_op(ctx: Context, op: StellarManageDataOp) -> None: from trezor.crypto.hashlib import sha256 if op.value: @@ -151,7 +161,7 @@ async def confirm_manage_data_op(ctx, op: StellarManageDataOp): ) -async def confirm_path_payment_op(ctx, op: StellarPathPaymentOp): +async def confirm_path_payment_op(ctx: Context, op: StellarPathPaymentOp) -> None: await confirm_output( ctx, address=op.destination_account, @@ -170,7 +180,7 @@ async def confirm_path_payment_op(ctx, op: StellarPathPaymentOp): await confirm_asset_issuer(ctx, op.send_asset) -async def confirm_payment_op(ctx, op: StellarPaymentOp): +async def confirm_payment_op(ctx: Context, op: StellarPaymentOp) -> None: await confirm_output( ctx, address=op.destination_account, @@ -179,7 +189,7 @@ async def confirm_payment_op(ctx, op: StellarPaymentOp): await confirm_asset_issuer(ctx, op.asset) -async def confirm_set_options_op(ctx, op: StellarSetOptionsOp): +async def confirm_set_options_op(ctx: Context, op: StellarSetOptionsOp) -> None: if op.inflation_destination_account: await confirm_address( ctx, @@ -207,18 +217,21 @@ async def confirm_set_options_op(ctx, op: StellarSetOptionsOp): await confirm_text(ctx, "op_home_domain", "Home Domain", op.home_domain) if op.signer_type is not None: + if op.signer_key is None or op.signer_weight is None: + raise DataError("Stellar: invalid signer option data.") + if op.signer_weight > 0: title = "Add Signer" else: title = "Remove Signer" data: str | bytes = "" - if op.signer_type == consts.SIGN_TYPE_ACCOUNT: + if op.signer_type == StellarSignerType.ACCOUNT: description = "Account:" data = helpers.address_from_public_key(op.signer_key) - elif op.signer_type == consts.SIGN_TYPE_PRE_AUTH: + elif op.signer_type == StellarSignerType.PRE_AUTH: description = "Pre-auth transaction:" data = op.signer_key - elif op.signer_type == consts.SIGN_TYPE_HASH: + elif op.signer_type == StellarSignerType.HASH: description = "Hash:" data = op.signer_key else: @@ -233,7 +246,7 @@ async def confirm_set_options_op(ctx, op: StellarSetOptionsOp): ) -def _format_thresholds(op: StellarSetOptionsOp) -> list[(str, str)]: +def _format_thresholds(op: StellarSetOptionsOp) -> list[tuple[str, str]]: props = [] if op.master_weight is not None: props.append(("Master Weight:", str(op.master_weight))) @@ -246,7 +259,7 @@ def _format_thresholds(op: StellarSetOptionsOp) -> list[(str, str)]: return props -def _format_flags(flags: int) -> tuple: +def _format_flags(flags: int) -> str: if flags > consts.FLAGS_MAX_SIZE: raise ProcessError("Stellar: invalid flags") text = "{}{}{}".format( @@ -257,9 +270,11 @@ def _format_flags(flags: int) -> tuple: return text -async def confirm_asset_issuer(ctx, asset: StellarAssetType): - if asset is None or asset.type == consts.ASSET_TYPE_NATIVE: +async def confirm_asset_issuer(ctx: Context, asset: StellarAsset) -> None: + if asset.type == StellarAssetType.NATIVE: return + if asset.issuer is None or asset.code is None: + raise DataError("Stellar: invalid asset definition") await confirm_address( ctx, "Confirm Issuer", diff --git a/core/src/apps/stellar/operations/serialize.py b/core/src/apps/stellar/operations/serialize.py index e8adef6ac..a7546e182 100644 --- a/core/src/apps/stellar/operations/serialize.py +++ b/core/src/apps/stellar/operations/serialize.py @@ -1,7 +1,8 @@ +from trezor.enums import StellarAssetType from trezor.messages import ( StellarAccountMergeOp, StellarAllowTrustOp, - StellarAssetType, + StellarAsset, StellarBumpSequenceOp, StellarChangeTrustOp, StellarCreateAccountOp, @@ -14,14 +15,17 @@ from trezor.messages import ( ) from trezor.wire import DataError, ProcessError -from .. import consts, writers +from .. import writers +if False: + from trezor.utils import Writer -def write_account_merge_op(w, msg: StellarAccountMergeOp): + +def write_account_merge_op(w: Writer, msg: StellarAccountMergeOp) -> None: writers.write_pubkey(w, msg.destination_account) -def write_allow_trust_op(w, msg: StellarAllowTrustOp): +def write_allow_trust_op(w: Writer, msg: StellarAllowTrustOp) -> None: # trustor account (the account being allowed to access the asset) writers.write_pubkey(w, msg.trusted_account) writers.write_uint32(w, msg.asset_type) @@ -30,21 +34,21 @@ def write_allow_trust_op(w, msg: StellarAllowTrustOp): writers.write_bool(w, msg.is_authorized) -def write_bump_sequence_op(w, msg: StellarBumpSequenceOp): +def write_bump_sequence_op(w: Writer, msg: StellarBumpSequenceOp) -> None: writers.write_uint64(w, msg.bump_to) -def write_change_trust_op(w, msg: StellarChangeTrustOp): +def write_change_trust_op(w: Writer, msg: StellarChangeTrustOp) -> None: _write_asset(w, msg.asset) writers.write_uint64(w, msg.limit) -def write_create_account_op(w, msg: StellarCreateAccountOp): +def write_create_account_op(w: Writer, msg: StellarCreateAccountOp) -> None: writers.write_pubkey(w, msg.new_account) writers.write_uint64(w, msg.starting_balance) -def write_create_passive_offer_op(w, msg: StellarCreatePassiveOfferOp): +def write_create_passive_offer_op(w: Writer, msg: StellarCreatePassiveOfferOp) -> None: _write_asset(w, msg.selling_asset) _write_asset(w, msg.buying_asset) writers.write_uint64(w, msg.amount) @@ -52,7 +56,7 @@ def write_create_passive_offer_op(w, msg: StellarCreatePassiveOfferOp): writers.write_uint32(w, msg.price_d) -def write_manage_data_op(w, msg: StellarManageDataOp): +def write_manage_data_op(w: Writer, msg: StellarManageDataOp) -> None: if len(msg.key) > 64: raise ProcessError("Stellar: max length of a key is 64 bytes") writers.write_string(w, msg.key) @@ -61,7 +65,7 @@ def write_manage_data_op(w, msg: StellarManageDataOp): writers.write_string(w, msg.value) -def write_manage_offer_op(w, msg: StellarManageOfferOp): +def write_manage_offer_op(w: Writer, msg: StellarManageOfferOp) -> None: _write_asset(w, msg.selling_asset) _write_asset(w, msg.buying_asset) writers.write_uint64(w, msg.amount) # amount to sell @@ -70,7 +74,7 @@ def write_manage_offer_op(w, msg: StellarManageOfferOp): writers.write_uint64(w, msg.offer_id) -def write_path_payment_op(w, msg: StellarPathPaymentOp): +def write_path_payment_op(w: Writer, msg: StellarPathPaymentOp) -> None: _write_asset(w, msg.send_asset) writers.write_uint64(w, msg.send_max) writers.write_pubkey(w, msg.destination_account) @@ -82,13 +86,13 @@ def write_path_payment_op(w, msg: StellarPathPaymentOp): _write_asset(w, p) -def write_payment_op(w, msg: StellarPaymentOp): +def write_payment_op(w: Writer, msg: StellarPaymentOp) -> None: writers.write_pubkey(w, msg.destination_account) _write_asset(w, msg.asset) writers.write_uint64(w, msg.amount) -def write_set_options_op(w, msg: StellarSetOptionsOp): +def write_set_options_op(w: Writer, msg: StellarSetOptionsOp) -> None: # inflation destination if msg.inflation_destination_account is None: writers.write_bool(w, False) @@ -118,16 +122,18 @@ def write_set_options_op(w, msg: StellarSetOptionsOp): # signer if msg.signer_type is None: writers.write_bool(w, False) - elif msg.signer_type in consts.SIGN_TYPES: + else: + if msg.signer_key is None or msg.signer_weight is None: + raise DataError( + "Stellar: signer_type, signer_key, signer_weight must be set together" + ) writers.write_bool(w, True) writers.write_uint32(w, msg.signer_type) writers.write_bytes_fixed(w, msg.signer_key, 32) writers.write_uint32(w, msg.signer_weight) - else: - raise ProcessError("Stellar: unknown signer type") -def _write_set_options_int(w, value: int): +def _write_set_options_int(w: Writer, value: int | None) -> None: if value is None: writers.write_bool(w, False) else: @@ -135,7 +141,7 @@ def _write_set_options_int(w, value: int): writers.write_uint32(w, value) -def write_account(w, source_account: str): +def write_account(w: Writer, source_account: str | None) -> None: if source_account is None: writers.write_bool(w, False) else: @@ -143,28 +149,36 @@ def write_account(w, source_account: str): writers.write_pubkey(w, source_account) -def _write_asset_code(w, asset_type: int, asset_code: str): - code = bytearray(asset_code) - if asset_type == consts.ASSET_TYPE_NATIVE: +def _write_asset_code( + w: Writer, asset_type: StellarAssetType, asset_code: str | None +) -> None: + if asset_type == StellarAssetType.NATIVE: return # nothing is needed - elif asset_type == consts.ASSET_TYPE_ALPHANUM4: + + if asset_code is None: + raise DataError("Stellar: invalid asset") + + code = asset_code.encode() + if asset_type == StellarAssetType.ALPHANUM4: if len(code) > 4: raise DataError("Stellar: asset code too long for ALPHANUM4") # pad with zeros to 4 chars - writers.write_bytes_fixed(w, code + bytearray([0] * (4 - len(code))), 4) - elif asset_type == consts.ASSET_TYPE_ALPHANUM12: + writers.write_bytes_fixed(w, code + bytes([0] * (4 - len(code))), 4) + elif asset_type == StellarAssetType.ALPHANUM12: if len(code) > 12: raise DataError("Stellar: asset code too long for ALPHANUM12") # pad with zeros to 12 chars - writers.write_bytes_fixed(w, code + bytearray([0] * (12 - len(code))), 12) + writers.write_bytes_fixed(w, code + bytes([0] * (12 - len(code))), 12) else: raise ProcessError("Stellar: invalid asset type") -def _write_asset(w, asset: StellarAssetType): - if asset is None or asset.type == consts.ASSET_TYPE_NATIVE: +def _write_asset(w: Writer, asset: StellarAsset) -> None: + if asset.type == StellarAssetType.NATIVE: writers.write_uint32(w, 0) return + if asset.code is None or asset.issuer is None: + raise DataError("Stellar: invalid asset") writers.write_uint32(w, asset.type) _write_asset_code(w, asset.type, asset.code) writers.write_pubkey(w, asset.issuer) diff --git a/core/src/apps/stellar/sign_tx.py b/core/src/apps/stellar/sign_tx.py index 418727cc3..4184f923f 100644 --- a/core/src/apps/stellar/sign_tx.py +++ b/core/src/apps/stellar/sign_tx.py @@ -2,8 +2,9 @@ from ubinascii import hexlify from trezor.crypto.curve import ed25519 from trezor.crypto.hashlib import sha256 +from trezor.enums import StellarMemoType from trezor.messages import StellarSignedTx, StellarSignTx, StellarTxOpRequest -from trezor.wire import ProcessError +from trezor.wire import DataError, ProcessError from apps.common import paths, seed from apps.common.keychain import auto_keychain @@ -11,9 +12,16 @@ from apps.common.keychain import auto_keychain from . import consts, helpers, layout, writers from .operations import process_operation +if False: + from trezor.wire import Context + + from apps.common.keychain import Keychain + @auto_keychain(__name__) -async def sign_tx(ctx, msg: StellarSignTx, keychain): +async def sign_tx( + ctx: Context, msg: StellarSignTx, keychain: Keychain +) -> StellarSignedTx: await paths.validate_path(ctx, keychain, msg.address_n) node = keychain.derive(msg.address_n) @@ -37,15 +45,15 @@ async def sign_tx(ctx, msg: StellarSignTx, keychain): return StellarSignedTx(public_key=pubkey, signature=signature) -async def _final(ctx, w: bytearray, msg: StellarSignTx): +async def _final(ctx: Context, w: bytearray, msg: StellarSignTx) -> None: # 4 null bytes representing a (currently unused) empty union writers.write_uint32(w, 0) # final confirm await layout.require_confirm_final(ctx, msg.fee, msg.num_operations) -async def _init(ctx, w: bytearray, pubkey: bytes, msg: StellarSignTx): - network_passphrase_hash = sha256(msg.network_passphrase).digest() +async def _init(ctx: Context, w: bytearray, pubkey: bytes, msg: StellarSignTx) -> None: + network_passphrase_hash = sha256(msg.network_passphrase.encode()).digest() writers.write_bytes_fixed(w, network_passphrase_hash, 32) writers.write_bytes_fixed(w, consts.TX_TYPE, 4) @@ -62,46 +70,52 @@ async def _init(ctx, w: bytearray, pubkey: bytes, msg: StellarSignTx): ) -async def _timebounds(ctx, w: bytearray, start: int, end: int): +async def _timebounds( + ctx: Context, w: bytearray, start: int | None, end: int | None +) -> None: # timebounds are only present if timebounds_start or timebounds_end is non-zero - if start or end: + if start is not None and end is not None: # confirm dialog await layout.require_confirm_timebounds(ctx, start, end) writers.write_bool(w, True) # timebounds are sent as uint32s since that's all we can display, but they must be hashed as 64bit - writers.write_uint64(w, start or 0) - writers.write_uint64(w, end or 0) + writers.write_uint64(w, start) + writers.write_uint64(w, end) else: writers.write_bool(w, False) -async def _operations(ctx, w: bytearray, num_operations: int): +async def _operations(ctx: Context, w: bytearray, num_operations: int) -> None: writers.write_uint32(w, num_operations) for i in range(num_operations): op = await ctx.call_any(StellarTxOpRequest(), *consts.op_wire_types) - await process_operation(ctx, w, op) + await process_operation(ctx, w, op) # type: ignore -async def _memo(ctx, w: bytearray, msg: StellarSignTx): - if msg.memo_type is None: - msg.memo_type = consts.MEMO_TYPE_NONE +async def _memo(ctx: Context, w: bytearray, msg: StellarSignTx) -> None: writers.write_uint32(w, msg.memo_type) - if msg.memo_type == consts.MEMO_TYPE_NONE: + if msg.memo_type == StellarMemoType.NONE: # nothing is serialized memo_confirm_text = "" - elif msg.memo_type == consts.MEMO_TYPE_TEXT: + elif msg.memo_type == StellarMemoType.TEXT: # Text: 4 bytes (size) + up to 28 bytes + if msg.memo_text is None: + raise DataError("Stellar: Missing memo text") if len(msg.memo_text) > 28: raise ProcessError("Stellar: max length of a memo text is 28 bytes") writers.write_string(w, msg.memo_text) memo_confirm_text = msg.memo_text - elif msg.memo_type == consts.MEMO_TYPE_ID: + elif msg.memo_type == StellarMemoType.ID: # ID: 64 bit unsigned integer + if msg.memo_id is None: + raise DataError("Stellar: Missing memo id") writers.write_uint64(w, msg.memo_id) memo_confirm_text = str(msg.memo_id) - elif msg.memo_type in (consts.MEMO_TYPE_HASH, consts.MEMO_TYPE_RETURN): + elif msg.memo_type in (StellarMemoType.HASH, StellarMemoType.RETURN): # Hash/Return: 32 byte hash + if msg.memo_hash is None: + raise DataError("Stellar: Missing memo hash") writers.write_bytes_fixed(w, bytearray(msg.memo_hash), 32) memo_confirm_text = hexlify(msg.memo_hash).decode() else: diff --git a/core/src/apps/stellar/writers.py b/core/src/apps/stellar/writers.py index 0d932fc0d..064671c36 100644 --- a/core/src/apps/stellar/writers.py +++ b/core/src/apps/stellar/writers.py @@ -13,8 +13,10 @@ write_uint64 = write_uint64_be if False: from typing import AnyStr + from trezor.utils import Writer -def write_string(w, s: AnyStr) -> None: + +def write_string(w: Writer, s: AnyStr) -> None: """Write XDR string padded to a multiple of 4 bytes.""" if isinstance(s, str): buf = s.encode() @@ -28,14 +30,14 @@ def write_string(w, s: AnyStr) -> None: write_bytes_unchecked(w, bytes([0] * (4 - remainder))) -def write_bool(w, val: bool): +def write_bool(w: Writer, val: bool) -> None: if val: write_uint32(w, 1) else: write_uint32(w, 0) -def write_pubkey(w, address: str): +def write_pubkey(w: Writer, address: str) -> None: # first 4 bytes of an address are the type, there's only one type (0) write_uint32(w, 0) write_bytes_fixed(w, public_key_from_address(address), 32) diff --git a/core/src/trezor/crypto/base32.py b/core/src/trezor/crypto/base32.py index eab710b15..e63a1e3a2 100644 --- a/core/src/trezor/crypto/base32.py +++ b/core/src/trezor/crypto/base32.py @@ -53,17 +53,17 @@ def encode(s: bytes) -> str: def decode(s: str) -> bytes: - s = s.encode() - quanta, leftover = divmod(len(s), 8) + data = s.encode() + quanta, leftover = divmod(len(data), 8) if leftover: raise ValueError("Incorrect padding") # Strip off pad characters from the right. We need to count the pad # characters because this will tell us how many null bytes to remove from # the end of the decoded string. - padchars = s.find(b"=") + padchars = data.find(b"=") if padchars > 0: - padchars = len(s) - padchars - s = s[:-padchars] + padchars = len(data) - padchars + data = data[:-padchars] else: padchars = 0 @@ -71,7 +71,7 @@ def decode(s: str) -> bytes: parts = [] acc = 0 shift = 35 - for c in s: + for c in data: val = _b32rev.get(c) if val is None: raise ValueError("Non-base32 digit found") diff --git a/tests/device_tests/test_msg_stellar_sign_transaction.py b/tests/device_tests/test_msg_stellar_sign_transaction.py index 70070ecd5..9aed91fa8 100644 --- a/tests/device_tests/test_msg_stellar_sign_transaction.py +++ b/tests/device_tests/test_msg_stellar_sign_transaction.py @@ -68,9 +68,9 @@ NETWORK_PASSPHRASE = "Test SDF Network ; September 2015" def _create_msg(memo=False) -> messages.StellarSignTx: - kwargs = {"memo_type": 0} + kwargs = {"memo_type": messages.StellarMemoType.NONE} if memo: - kwargs = {"memo_type": 1, "memo_text": "hi"} + kwargs = {"memo_type": messages.StellarMemoType.TEXT, "memo_text": "hi"} return messages.StellarSignTx( source_account="GAK5MSF74TJW6GLM7NLTL76YZJKM2S4CGP3UH4REJHPHZ4YBZW2GSBPW", fee=100, @@ -135,25 +135,7 @@ def test_sign_tx_payment_op_native(client): op = messages.StellarPaymentOp() op.amount = 500111000 op.destination_account = "GBOVKZBEM2YYLOCDCUXJ4IMRKHN4LCJAE7WEAEA2KF562XFAGDBOB64V" - - tx = _create_msg() - - response = stellar.sign_tx(client, tx, [op], ADDRESS_N, NETWORK_PASSPHRASE) - - assert ( - b64encode(response.signature) - == b"pDc6ghKCLNoYbt3h4eBw+533237m0BB0Jp/d/TxJCA83mF3o5Fr4l5vwAWBR62hdTWAP9MhVluY0cd5i54UwDg==" - # a4373a8212822cda186edde1e1e070fb9df7db7ee6d01074269fddfd3c49080f37985de8e45af8979bf0016051eb685d4d600ff4c85596e63471de62e785300e - ) - - -def test_sign_tx_payment_op_native_explicit_asset(client): - """Native payment of 50.0111 XLM to GBOVKZBEM2YYLOCDCUXJ4IMRKHN4LCJAE7WEAEA2KF562XFAGDBOB64V""" - - op = messages.StellarPaymentOp() - op.amount = 500111000 - op.destination_account = "GBOVKZBEM2YYLOCDCUXJ4IMRKHN4LCJAE7WEAEA2KF562XFAGDBOB64V" - op.asset = messages.StellarAssetType(type=0) + op.asset = messages.StellarAsset(type=messages.StellarAssetType.NATIVE) tx = _create_msg() @@ -173,8 +155,8 @@ def test_sign_tx_payment_op_custom_asset1(client): op.amount = 500111000 op.destination_account = "GBOVKZBEM2YYLOCDCUXJ4IMRKHN4LCJAE7WEAEA2KF562XFAGDBOB64V" - op.asset = messages.StellarAssetType( - type=1, + op.asset = messages.StellarAsset( + type=messages.StellarAssetType.ALPHANUM4, code="X", issuer="GAUYJFQCYIHFQNS7CI6BFWD2DSSFKDIQZUQ3BLQODDKE4PSW7VVBKENC", ) @@ -196,8 +178,8 @@ def test_sign_tx_payment_op_custom_asset12(client): op.amount = 500111000 op.destination_account = "GBOVKZBEM2YYLOCDCUXJ4IMRKHN4LCJAE7WEAEA2KF562XFAGDBOB64V" - op.asset = messages.StellarAssetType( - type=2, + op.asset = messages.StellarAsset( + type=messages.StellarAssetType.ALPHANUM12, code="ABCDEFGHIJKL", issuer="GAUYJFQCYIHFQNS7CI6BFWD2DSSFKDIQZUQ3BLQODDKE4PSW7VVBKENC", ) @@ -218,7 +200,7 @@ def test_sign_tx_allow_trust_op(client): op = messages.StellarAllowTrustOp() op.is_authorized = True op.trusted_account = "GBOVKZBEM2YYLOCDCUXJ4IMRKHN4LCJAE7WEAEA2KF562XFAGDBOB64V" - op.asset_type = 1 + op.asset_type = messages.StellarAssetType.ALPHANUM4 op.asset_code = "X" tx = _create_msg(memo=True) @@ -237,8 +219,8 @@ def test_sign_tx_change_trust_op(client): op.limit = 5000000000 op.source_account = "GBOVKZBEM2YYLOCDCUXJ4IMRKHN4LCJAE7WEAEA2KF562XFAGDBOB64V" - op.asset = messages.StellarAssetType( - type=2, + op.asset = messages.StellarAsset( + type=messages.StellarAssetType.ALPHANUM12, code="ABCDEFGHIJKL", issuer="GAUYJFQCYIHFQNS7CI6BFWD2DSSFKDIQZUQ3BLQODDKE4PSW7VVBKENC", ) @@ -256,13 +238,13 @@ def test_sign_tx_change_trust_op(client): def test_sign_tx_passive_offer_op(client): op = messages.StellarCreatePassiveOfferOp() - op.selling_asset = messages.StellarAssetType( - type=2, + op.selling_asset = messages.StellarAsset( + type=messages.StellarAssetType.ALPHANUM12, code="ABCDEFGHIJKL", issuer="GAUYJFQCYIHFQNS7CI6BFWD2DSSFKDIQZUQ3BLQODDKE4PSW7VVBKENC", ) - op.buing_asset = messages.StellarAssetType( - type=1, + op.buying_asset = messages.StellarAsset( + type=messages.StellarAssetType.ALPHANUM4, code="X", issuer="GAUYJFQCYIHFQNS7CI6BFWD2DSSFKDIQZUQ3BLQODDKE4PSW7VVBKENC", ) @@ -284,13 +266,13 @@ def test_sign_tx_passive_offer_op(client): def test_sign_tx_manage_offer_op(client): op = messages.StellarManageOfferOp() - op.selling_asset = messages.StellarAssetType( - type=2, + op.selling_asset = messages.StellarAsset( + type=messages.StellarAssetType.ALPHANUM12, code="ABCDEFGHIJKL", issuer="GAUYJFQCYIHFQNS7CI6BFWD2DSSFKDIQZUQ3BLQODDKE4PSW7VVBKENC", ) - op.buing_asset = messages.StellarAssetType( - type=1, + op.buying_asset = messages.StellarAsset( + type=messages.StellarAssetType.ALPHANUM4, code="X", issuer="GAUYJFQCYIHFQNS7CI6BFWD2DSSFKDIQZUQ3BLQODDKE4PSW7VVBKENC", ) @@ -313,15 +295,15 @@ def test_sign_tx_manage_offer_op(client): def test_sign_tx_path_payment_op(client): op = messages.StellarPathPaymentOp() - op.send = messages.StellarAssetType( - type=1, + op.send_asset = messages.StellarAsset( + type=messages.StellarAssetType.ALPHANUM4, code="X", issuer="GAUYJFQCYIHFQNS7CI6BFWD2DSSFKDIQZUQ3BLQODDKE4PSW7VVBKENC", ) op.send_max = 50000 op.destination_account = "GAUYJFQCYIHFQNS7CI6BFWD2DSSFKDIQZUQ3BLQODDKE4PSW7VVBKENC" - op.destination_asset = messages.StellarAssetType( - type=1, + op.destination_asset = messages.StellarAsset( + type=messages.StellarAssetType.ALPHANUM4, code="X", issuer="GAUYJFQCYIHFQNS7CI6BFWD2DSSFKDIQZUQ3BLQODDKE4PSW7VVBKENC", ) @@ -355,7 +337,7 @@ def test_sign_tx_set_options(client): ) op = messages.StellarSetOptionsOp() - op.signer_type = 0 + op.signer_type = messages.StellarSignerType.ACCOUNT op.signer_key = bytes.fromhex( "72187adb879c414346d77c71af8cce7b6eaa57b528e999fd91feae6b6418628e" ) @@ -407,7 +389,7 @@ def test_sign_tx_set_options(client): ) op = messages.StellarSetOptionsOp() - op.signer_type = 1 + op.signer_type = messages.StellarSignerType.PRE_AUTH op.signer_key = bytes.fromhex( "72187adb879c414346d77c71af8cce7b6eaa57b528e999fd91feae6b6418628e" )