feat(core): use specialized protobufs in apps.bitcoin, enable typing

pull/1279/head
matejcik 4 years ago committed by matejcik
parent b0a2297b14
commit 37025e2a84

@ -126,6 +126,7 @@ def address_p2wsh_in_p2sh(witness_script_hash: bytes, coin: CoinInfo) -> str:
def address_p2wpkh(pubkey: bytes, coin: CoinInfo) -> str:
assert coin.bech32_prefix is not None
pubkeyhash = ecdsa_hash_pubkey(pubkey, coin)
return encode_bech32_address(coin.bech32_prefix, pubkeyhash)
@ -135,6 +136,7 @@ def address_p2wsh(witness_script_hash: bytes, hrp: str) -> str:
def address_to_cashaddr(address: str, coin: CoinInfo) -> str:
assert coin.cashaddr_prefix is not None
raw = base58.decode_check(address, coin.b58_hash)
version, data = raw[0], raw[1:]
if version == coin.address_type:

@ -1,7 +1,6 @@
from micropython import const
from trezor.messages import MessageType
from trezor.messages.TxInputType import TxInputType
from .common import BIP32_WALLET_DEPTH
@ -10,8 +9,10 @@ if False:
from trezor.messages.AuthorizeCoinJoin import AuthorizeCoinJoin
from trezor.messages.GetOwnershipProof import GetOwnershipProof
from trezor.messages.SignTx import SignTx
from apps.common import coininfo
from apps.common.seed import Keychain
from trezor.messages.TxAckInputType import TxAckInputType
from apps.common.coininfo import CoinInfo
from apps.common.keychain import Keychain
_ROUND_ID_LEN = const(32)
FEE_PER_ANONYMITY_DECIMALS = const(9)
@ -19,8 +20,8 @@ FEE_PER_ANONYMITY_DECIMALS = const(9)
class CoinJoinAuthorization:
def __init__(
self, msg: AuthorizeCoinJoin, keychain: Keychain, coin: coininfo.CoinInfo
):
self, msg: AuthorizeCoinJoin, keychain: Keychain, coin: CoinInfo
) -> None:
self.coordinator = msg.coordinator
self.remaining_fee = msg.max_total_fee
self.fee_per_anonymity = msg.fee_per_anonymity or 0
@ -46,7 +47,7 @@ class CoinJoinAuthorization:
and msg.commitment_data[:-_ROUND_ID_LEN] == self.coordinator.encode()
)
def check_sign_tx_input(self, txi: TxInputType, coin: coininfo.CoinInfo) -> bool:
def check_sign_tx_input(self, txi: TxAckInputType, coin: CoinInfo) -> bool:
# Check whether the current input matches the parameters of the request.
return (
len(txi.address_n) >= BIP32_WALLET_DEPTH

@ -83,4 +83,5 @@ def decode_bech32_address(prefix: str, address: str) -> bytes:
witver, raw = bech32.decode(prefix, address)
if witver != _BECH32_WITVER:
raise wire.ProcessError("Invalid address witness program")
assert raw is not None
return bytes(raw)

@ -11,8 +11,10 @@ from .multisig import multisig_pubkey_index
if False:
from typing import List
from trezor.messages import HDNodeType
from trezor.messages.GetAddress import GetAddress
from trezor.messages.HDNodeType import HDNodeType
from trezor import wire
from apps.common.keychain import Keychain
from apps.common.coininfo import CoinInfo
@ -38,7 +40,9 @@ async def show_xpubs(
@with_keychain
async def get_address(ctx, msg, keychain, coin):
async def get_address(
ctx: wire.Context, msg: GetAddress, keychain: Keychain, coin: CoinInfo
) -> Address:
await validate_path(
ctx,
addresses.validate_full_path,
@ -50,6 +54,7 @@ async def get_address(ctx, msg, keychain, coin):
)
node = keychain.derive(msg.address_n)
address = addresses.get_address(msg.script_type, coin, node, msg.multisig)
address_short = addresses.address_short(coin, address)
if msg.script_type == InputScriptType.SPENDWITNESS:

@ -2,7 +2,6 @@ from trezor import wire
from trezor.messages.GetOwnershipId import GetOwnershipId
from trezor.messages.OwnershipId import OwnershipId
from apps.common import coininfo
from apps.common.paths import validate_path
from . import addresses, common, scripts
@ -10,12 +9,13 @@ from .keychain import with_keychain
from .ownership import get_identifier
if False:
from apps.common.coininfo import CoinInfo
from apps.common.keychain import Keychain
@with_keychain
async def get_ownership_id(
ctx, msg: GetOwnershipId, keychain: Keychain, coin: coininfo.CoinInfo
ctx: wire.Context, msg: GetOwnershipId, keychain: Keychain, coin: CoinInfo
) -> OwnershipId:
await validate_path(
ctx,

@ -5,7 +5,6 @@ from trezor.messages.GetOwnershipProof import GetOwnershipProof
from trezor.messages.OwnershipProof import OwnershipProof
from trezor.ui.text import Text
from apps.common import coininfo
from apps.common.confirm import require_confirm
from apps.common.paths import validate_path
@ -15,6 +14,7 @@ from .ownership import generate_proof, get_identifier
if False:
from typing import Optional
from apps.common.coininfo import CoinInfo
from apps.common.keychain import Keychain
from .authorization import CoinJoinAuthorization
@ -24,10 +24,10 @@ _MAX_MONO_LINE = 18
@with_keychain
async def get_ownership_proof(
ctx,
ctx: wire.Context,
msg: GetOwnershipProof,
keychain: Keychain,
coin: coininfo.CoinInfo,
coin: CoinInfo,
authorization: Optional[CoinJoinAuthorization] = None,
) -> OwnershipProof:
if authorization:

@ -6,8 +6,11 @@ from trezor.messages.PublicKey import PublicKey
from apps.common import coins, layout
from apps.common.keychain import get_keychain
if False:
from trezor.messages.GetPublicKey import GetPublicKey
async def get_public_key(ctx, msg):
async def get_public_key(ctx: wire.Context, msg: GetPublicKey) -> PublicKey:
coin_name = msg.coin_name or "Bitcoin"
script_type = msg.script_type or InputScriptType.SPENDADDRESS
coin = coins.by_name(coin_name)

@ -6,24 +6,22 @@ from apps.common.keychain import get_keychain
from .common import BITCOIN_NAMES
if False:
from protobuf import MessageType
from typing import Callable, Optional, Tuple, TypeVar
from typing import Awaitable, Callable, Optional, Sequence, Tuple, TypeVar
from typing_extensions import Protocol
from apps.common.keychain import Keychain, MsgOut, Handler
from apps.common.paths import Bip32Path
from .authorization import CoinJoinAuthorization
class MsgWithCoinName(MessageType, Protocol):
coin_name = ... # type: Optional[str]
class MsgWithCoinName(Protocol):
coin_name = ... # type: str
MsgIn = TypeVar("MsgIn", bound=MsgWithCoinName)
HandlerWithCoinInfo = Callable[
[wire.Context, MsgIn, Keychain, coininfo.CoinInfo], MsgOut
]
HandlerWithCoinInfo = Callable[..., Awaitable[MsgOut]]
def get_namespaces_for_coin(coin: coininfo.CoinInfo):
def get_namespaces_for_coin(coin: coininfo.CoinInfo) -> Sequence[Bip32Path]:
namespaces = []
slip44_id = coin.slip44 | HARDENED
@ -89,7 +87,7 @@ async def get_keychain_for_coin(
return keychain, coin
def with_keychain(func: HandlerWithCoinInfo[MsgIn, MsgOut]) -> Handler[MsgIn, MsgOut]:
def with_keychain(func: HandlerWithCoinInfo[MsgOut]) -> Handler[MsgIn, MsgOut]:
async def wrapper(
ctx: wire.Context,
msg: MsgIn,

@ -31,12 +31,12 @@ _OWNERSHIP_ID_KEY_PATH = [b"SLIP-0019", b"Ownership identification key"]
def generate_proof(
node: bip32.HDNode,
script_type: EnumTypeInputScriptType,
multisig: MultisigRedeemScriptType,
multisig: Optional[MultisigRedeemScriptType],
coin: CoinInfo,
user_confirmed: bool,
ownership_ids: List[bytes],
script_pubkey: bytes,
commitment_data: Optional[bytes],
commitment_data: bytes,
) -> Tuple[bytes, bytes]:
flags = 0
if user_confirmed:
@ -52,8 +52,7 @@ def generate_proof(
sighash = hashlib.sha256(proof)
sighash.update(script_pubkey)
if commitment_data:
sighash.update(commitment_data)
sighash.update(commitment_data)
signature = common.ecdsa_sign(node, sighash.digest())
public_key = node.public_key()
write_bip322_signature_proof(

@ -2,11 +2,8 @@ from trezor import utils, wire
from trezor.crypto import base58, cashaddr
from trezor.crypto.hashlib import sha256
from trezor.messages import InputScriptType
from trezor.messages.MultisigRedeemScriptType import MultisigRedeemScriptType
from trezor.messages.TxInputType import TxInputType
from apps.common import address_type
from apps.common.coininfo import CoinInfo
from apps.common.readers import read_bitcoin_varint
from apps.common.writers import empty_bytearray, write_bitcoin_varint
@ -26,17 +23,23 @@ from .writers import (
if False:
from typing import List, Optional, Tuple
from trezor.messages.TxInputType import EnumTypeInputScriptType
from trezor.messages.MultisigRedeemScriptType import MultisigRedeemScriptType
from trezor.messages.TxAckInputType import TxAckInputType
from trezor.messages.TxAckInputType import EnumTypeInputScriptType
from apps.common.coininfo import CoinInfo
from .writers import Writer
def input_derive_script(
script_type: EnumTypeInputScriptType,
multisig: MultisigRedeemScriptType,
multisig: Optional[MultisigRedeemScriptType],
coin: CoinInfo,
hash_type: int,
pubkey: bytes,
signature: Optional[bytes],
signature: bytes,
) -> bytes:
if script_type == InputScriptType.SPENDADDRESS:
# p2pkh or p2sh
@ -45,7 +48,7 @@ def input_derive_script(
if script_type == InputScriptType.SPENDP2SHWITNESS:
# p2wpkh or p2wsh using p2sh
if multisig:
if multisig is not None:
# p2wsh in p2sh
pubkeys = multisig_get_pubkeys(multisig)
witness_script_hasher = utils.HashWriter(sha256())
@ -60,6 +63,7 @@ def input_derive_script(
return input_script_native_p2wpkh_or_p2wsh()
elif script_type == InputScriptType.SPENDMULTISIG:
# p2sh multisig
assert multisig is not None # checked in sanitize_tx_input
signature_index = multisig_pubkey_index(multisig, pubkey)
return input_script_multisig(
multisig, signature, signature_index, hash_type, coin
@ -111,7 +115,7 @@ def output_derive_script(address: str, coin: CoinInfo) -> bytes:
# see https://github.com/bitcoin/bips/blob/master/bip-0143.mediawiki#specification
# item 5 for details
def bip143_derive_script_code(
txi: TxInputType, public_keys: List[bytes], threshold: int, coin: CoinInfo
txi: TxAckInputType, public_keys: List[bytes], threshold: int, coin: CoinInfo
) -> bytearray:
if len(public_keys) > 1:
return output_script_multisig(public_keys, threshold)
@ -299,8 +303,8 @@ def witness_multisig(
signature_index: int,
hash_type: int,
) -> bytearray:
# get other signatures, stretch with None to the number of the pubkeys
signatures = multisig.signatures + [None] * (
# get other signatures, stretch with empty bytes to the number of the pubkeys
signatures = multisig.signatures + [b""] * (
multisig_get_pubkey_count(multisig) - len(multisig.signatures)
)
# fill in our signature
@ -523,7 +527,7 @@ def output_script_paytoopreturn(data: bytes) -> bytearray:
def write_bip322_signature_proof(
w: Writer,
script_type: EnumTypeInputScriptType,
multisig: MultisigRedeemScriptType,
multisig: Optional[MultisigRedeemScriptType],
coin: CoinInfo,
public_key: bytes,
signature: bytes,

@ -9,9 +9,17 @@ from apps.common.signverify import message_digest, require_confirm_sign_message
from .addresses import get_address, validate_full_path
from .keychain import with_keychain
if False:
from trezor.messages.SignMessage import SignMessage
from apps.common.coininfo import CoinInfo
from apps.common.keychain import Keychain
@with_keychain
async def sign_message(ctx, msg, keychain, coin):
async def sign_message(
ctx: wire.Context, msg: SignMessage, keychain: Keychain, coin: CoinInfo
) -> MessageSignature:
message = msg.message
address_n = msg.address_n
script_type = msg.script_type or 0

@ -1,11 +1,7 @@
from trezor import utils, wire
from trezor.messages.RequestType import TXFINISHED
from trezor.messages.SignTx import SignTx
from trezor.messages.TxAck import TxAck
from trezor.messages.TxRequest import TxRequest
from apps.common import coininfo
from ..common import BITCOIN_NAMES
from ..keychain import with_keychain
from . import approvers, bitcoin, helpers, progress
@ -15,20 +11,44 @@ if not utils.BITCOIN_ONLY:
if False:
from typing import Optional, Union
from apps.common.seed import Keychain
from protobuf import FieldCache
from trezor.messages.SignTx import SignTx
from trezor.messages.TxAckInput import TxAckInput
from trezor.messages.TxAckOutput import TxAckOutput
from trezor.messages.TxAckPrevMeta import TxAckPrevMeta
from trezor.messages.TxAckPrevInput import TxAckPrevInput
from trezor.messages.TxAckPrevOutput import TxAckPrevOutput
from trezor.messages.TxAckPrevExtraData import TxAckPrevExtraData
from apps.common.coininfo import CoinInfo
from apps.common.keychain import Keychain
from ..authorization import CoinJoinAuthorization
TxAckType = Union[
TxAckInput,
TxAckOutput,
TxAckPrevMeta,
TxAckPrevInput,
TxAckPrevOutput,
TxAckPrevExtraData,
]
@with_keychain
async def sign_tx(
ctx: wire.Context,
msg: SignTx,
keychain: Keychain,
coin: coininfo.CoinInfo,
coin: CoinInfo,
authorization: Optional[CoinJoinAuthorization] = None,
) -> TxRequest:
if authorization:
approver = approvers.CoinJoinApprover(msg, coin, authorization)
approver = approvers.CoinJoinApprover(
msg, coin, authorization
) # type: approvers.Approver
else:
approver = approvers.BasicApprover(msg, coin)
@ -44,17 +64,18 @@ async def sign_tx(
signer = signer_class(msg, keychain, coin, approver).signer()
res = None # type: Union[TxAck, bool, None]
field_cache = {}
res = None # type: Union[TxAckType, bool, None]
field_cache = {} # type: FieldCache
while True:
req = signer.send(res)
if isinstance(req, TxRequest):
if isinstance(req, tuple):
request_class, req = req
assert isinstance(req, TxRequest)
if req.request_type == TXFINISHED:
break
res = await ctx.call(req, TxAck, field_cache)
return req
res = await ctx.call(req, request_class, field_cache)
elif isinstance(req, helpers.UiConfirm):
res = await req.confirm_dialog(ctx)
progress.report_init()
else:
raise TypeError("Invalid signing instruction")
return req

@ -1,17 +1,20 @@
from micropython import const
from trezor import wire
from trezor.messages.SignTx import SignTx
from trezor.messages.TxInputType import TxInputType
from trezor.messages.TxOutputType import TxOutputType
from apps.common import coininfo, safety_checks
from apps.common import safety_checks
from .. import addresses
from ..authorization import FEE_PER_ANONYMITY_DECIMALS
from . import helpers, tx_weight
if False:
from trezor.messages.SignTx import SignTx
from trezor.messages.TxAckInputType import TxAckInputType
from trezor.messages.TxAckOutputType import TxAckOutputType
from apps.common.coininfo import CoinInfo
from ..authorization import CoinJoinAuthorization
# Setting nSequence to this value for every input in a transaction disables nLockTime.
@ -23,7 +26,7 @@ _SEQUENCE_FINAL = const(0xFFFFFFFF)
# an Authorization object to verify that the user authorized a transaction with
# these parameters to be executed.
class Approver:
def __init__(self, tx: SignTx, coin: coininfo.CoinInfo) -> None:
def __init__(self, tx: SignTx, coin: CoinInfo) -> None:
self.tx = tx
self.coin = coin
self.weight = tx_weight.TxWeightCalculator(tx.inputs_count, tx.outputs_count)
@ -35,24 +38,24 @@ class Approver:
self.total_out = 0 # sum of output amounts
self.change_out = 0 # change output amount
async def add_internal_input(self, txi: TxInputType) -> None:
async def add_internal_input(self, txi: TxAckInputType) -> None:
self.weight.add_input(txi)
self.total_in += txi.amount
self.min_sequence = min(self.min_sequence, txi.sequence)
def add_external_input(self, txi: TxInputType) -> None:
def add_external_input(self, txi: TxAckInputType) -> None:
self.weight.add_input(txi)
self.total_in += txi.amount
self.external_in += txi.amount
self.min_sequence = min(self.min_sequence, txi.sequence)
def add_change_output(self, txo: TxOutputType, script_pubkey: bytes) -> None:
def add_change_output(self, txo: TxAckOutputType, script_pubkey: bytes) -> None:
self.weight.add_output(script_pubkey)
self.total_out += txo.amount
self.change_out += txo.amount
async def add_external_output(
self, txo: TxOutputType, script_pubkey: bytes
self, txo: TxAckOutputType, script_pubkey: bytes
) -> None:
self.weight.add_output(script_pubkey)
self.total_out += txo.amount
@ -65,22 +68,22 @@ class BasicApprover(Approver):
# the maximum number of change-outputs allowed without user confirmation
MAX_SILENT_CHANGE_COUNT = const(2)
def __init__(self, tx: SignTx, coin: coininfo.CoinInfo) -> None:
def __init__(self, tx: SignTx, coin: CoinInfo) -> None:
super().__init__(tx, coin)
self.change_count = 0 # the number of change-outputs
async def add_internal_input(self, txi: TxInputType) -> None:
async def add_internal_input(self, txi: TxAckInputType) -> None:
if not addresses.validate_full_path(txi.address_n, self.coin, txi.script_type):
await helpers.confirm_foreign_address(txi.address_n)
await super().add_internal_input(txi)
def add_change_output(self, txo: TxOutputType, script_pubkey: bytes) -> None:
def add_change_output(self, txo: TxAckOutputType, script_pubkey: bytes) -> None:
super().add_change_output(txo, script_pubkey)
self.change_count += 1
async def add_external_output(
self, txo: TxOutputType, script_pubkey: bytes
self, txo: TxAckOutputType, script_pubkey: bytes
) -> None:
await super().add_external_output(txo, script_pubkey)
await helpers.confirm_output(txo, self.coin)
@ -117,7 +120,7 @@ class BasicApprover(Approver):
class CoinJoinApprover(Approver):
def __init__(
self, tx: SignTx, coin: coininfo.CoinInfo, authorization: CoinJoinAuthorization
self, tx: SignTx, coin: CoinInfo, authorization: CoinJoinAuthorization
) -> None:
super().__init__(tx, coin)
self.authorization = authorization
@ -142,21 +145,21 @@ class CoinJoinApprover(Approver):
# flag indicating whether our outputs are gaining any anonymity
self.anonymity = False
async def add_internal_input(self, txi: TxInputType) -> None:
async def add_internal_input(self, txi: TxAckInputType) -> None:
self.our_weight.add_input(txi)
if not self.authorization.check_sign_tx_input(txi, self.coin):
raise wire.ProcessError("Unauthorized path")
await super().add_internal_input(txi)
def add_change_output(self, txo: TxOutputType, script_pubkey: bytes) -> None:
def add_change_output(self, txo: TxAckOutputType, script_pubkey: bytes) -> None:
super().add_change_output(txo, script_pubkey)
self._add_output(txo, script_pubkey)
self.our_weight.add_output(script_pubkey)
self.group_our_count += 1
async def add_external_output(
self, txo: TxOutputType, script_pubkey: bytes
self, txo: TxAckOutputType, script_pubkey: bytes
) -> None:
await super().add_external_output(txo, script_pubkey)
self._add_output(txo, script_pubkey)
@ -199,13 +202,14 @@ class CoinJoinApprover(Approver):
# Add the coordinator fee for the last group of outputs.
self._new_group(0)
decimal_divisor = pow(10, FEE_PER_ANONYMITY_DECIMALS + 2) # type: float
return (
self.coordinator_fee_base
* self.authorization.fee_per_anonymity
/ pow(10, FEE_PER_ANONYMITY_DECIMALS + 2)
/ decimal_divisor
)
def _add_output(self, txo: TxOutputType, script_pubkey: bytes):
def _add_output(self, txo: TxAckOutputType, script_pubkey: bytes) -> None:
# Assumption: CoinJoin outputs are grouped by amount. (If this assumption is
# not satisfied, then we will compute a lower coordinator fee, which may lead
# us to wrongfully decline the transaction.)
@ -214,7 +218,7 @@ class CoinJoinApprover(Approver):
self.group_size += 1
def _new_group(self, amount: int):
def _new_group(self, amount: int) -> None:
# Add the base coordinator fee for the previous group of outputs.
# Skip groups of size 1, because those must be change-outputs.
if self.group_size > 1:

@ -3,17 +3,11 @@ from micropython import const
from trezor import wire
from trezor.crypto.hashlib import sha256
from trezor.messages import InputScriptType, OutputScriptType
from trezor.messages.SignTx import SignTx
from trezor.messages.TransactionType import TransactionType
from trezor.messages.TxInputType import TxInputType
from trezor.messages.TxOutputBinType import TxOutputBinType
from trezor.messages.TxOutputType import TxOutputType
from trezor.messages.TxRequest import TxRequest
from trezor.messages.TxRequestDetailsType import TxRequestDetailsType
from trezor.messages.TxRequestSerializedType import TxRequestSerializedType
from trezor.utils import HashWriter, ensure
from apps.common import coininfo, seed
from apps.common.writers import write_bitcoin_varint
from .. import addresses, common, multisig, scripts, writers
@ -25,7 +19,18 @@ from .matchcheck import MultisigFingerprintChecker, WalletPathChecker
if False:
from typing import List, Optional, Set, Tuple, Union
from trezor.crypto.bip32 import HDNode
from trezor.crypto import bip32
from trezor.messages.SignTx import SignTx
from trezor.messages.TxAckInputType import TxAckInputType
from trezor.messages.TxAckOutputType import TxAckOutputType
from trezor.messages.TxAckPrevTxType import TxAckPrevTxType
from trezor.messages.TxAckPrevInputType import TxAckPrevInputType
from trezor.messages.TxAckPrevOutputType import TxAckPrevOutputType
from apps.common.coininfo import CoinInfo
from apps.common.keychain import Keychain
# the chain id used for change
_BIP32_CHANGE_CHAIN = const(1)
@ -70,8 +75,8 @@ class Bitcoin:
def __init__(
self,
tx: SignTx,
keychain: seed.Keychain,
coin: coininfo.CoinInfo,
keychain: Keychain,
coin: CoinInfo,
approver: approvers.Approver,
) -> None:
self.tx = helpers.sanitize_sign_tx(tx, coin)
@ -193,7 +198,7 @@ class Bitcoin:
if i in self.segwit:
if i in self.external:
txi = await helpers.request_tx_input(self.tx_req, i, self.coin)
self.serialized_tx.extend(txi.witness)
self.serialized_tx.extend(txi.witness or b"")
else:
await self.sign_segwit_input(i)
else:
@ -204,7 +209,7 @@ class Bitcoin:
self.write_tx_footer(self.serialized_tx, self.tx)
await helpers.request_tx_finish(self.tx_req)
async def process_internal_input(self, txi: TxInputType) -> None:
async def process_internal_input(self, txi: TxAckInputType) -> None:
self.wallet_path.add_input(txi)
self.multisig_fingerprint.add_input(txi)
@ -213,10 +218,10 @@ class Bitcoin:
await self.approver.add_internal_input(txi)
async def process_external_input(self, txi: TxInputType) -> None:
async def process_external_input(self, txi: TxAckInputType) -> None:
self.approver.add_external_input(txi)
async def approve_output(self, txo: TxOutputType, script_pubkey: bytes) -> None:
async def approve_output(self, txo: TxAckOutputType, script_pubkey: bytes) -> None:
if self.output_is_change(txo):
# output is change and does not need approval
self.approver.add_change_output(txo, script_pubkey)
@ -229,7 +234,7 @@ class Bitcoin:
async def get_tx_digest(
self,
i: int,
txi: TxInputType,
txi: TxAckInputType,
public_keys: List[bytes],
threshold: int,
script_pubkey: bytes,
@ -241,7 +246,7 @@ class Bitcoin:
return digest
async def verify_external_input(
self, i: int, txi: TxInputType, script_pubkey: bytes
self, i: int, txi: TxAckInputType, script_pubkey: bytes
) -> None:
if txi.ownership_proof:
if not verify_nonownership(
@ -283,10 +288,10 @@ class Bitcoin:
node = self.keychain.derive(txi.address_n)
key_sign_pub = node.public_key()
script_sig = self.input_derive_script(txi, key_sign_pub)
script_sig = self.input_derive_script(txi, key_sign_pub, b"")
self.write_tx_input(self.serialized_tx, txi, script_sig)
def sign_bip143_input(self, txi: TxInputType) -> Tuple[bytes, bytes]:
def sign_bip143_input(self, txi: TxAckInputType) -> Tuple[bytes, bytes]:
self.wallet_path.check_input(txi)
self.multisig_fingerprint.check_input(txi)
@ -330,7 +335,7 @@ class Bitcoin:
async def get_legacy_tx_digest(
self, index: int, script_pubkey: Optional[bytes] = None
) -> Tuple[bytes, TxInputType, Optional[HDNode]]:
) -> Tuple[bytes, TxAckInputType, Optional[bip32.HDNode]]:
# the transaction digest which gets signed for this input
h_sign = self.create_hash_writer()
# should come out the same as h_approved, checked before signing the digest
@ -357,6 +362,7 @@ class Bitcoin:
multisig.multisig_pubkey_index(txi.multisig, key_sign_pub)
if txi.script_type == InputScriptType.SPENDMULTISIG:
assert txi.multisig is not None # checked in sanitize_tx_input
script_pubkey = scripts.output_script_multisig(
multisig.multisig_get_pubkeys(txi.multisig), txi.multisig.m,
)
@ -415,25 +421,27 @@ class Bitcoin:
# STAGE_REQUEST_3_PREV_META in legacy
tx = await helpers.request_tx_meta(self.tx_req, self.coin, prev_hash)
if tx.outputs_cnt <= prev_index:
if tx.outputs_count <= prev_index:
raise wire.ProcessError("Not enough outputs in previous transaction.")
txh = self.create_hash_writer()
# witnesses are not included in txid hash
self.write_tx_header(txh, tx, witness_marker=False)
write_bitcoin_varint(txh, tx.inputs_cnt)
write_bitcoin_varint(txh, tx.inputs_count)
for i in range(tx.inputs_cnt):
for i in range(tx.inputs_count):
# STAGE_REQUEST_3_PREV_INPUT in legacy
txi = await helpers.request_tx_input(self.tx_req, i, self.coin, prev_hash)
txi = await helpers.request_tx_prev_input(
self.tx_req, i, self.coin, prev_hash
)
self.write_tx_input(txh, txi, txi.script_sig)
write_bitcoin_varint(txh, tx.outputs_cnt)
write_bitcoin_varint(txh, tx.outputs_count)
for i in range(tx.outputs_cnt):
for i in range(tx.outputs_count):
# STAGE_REQUEST_3_PREV_OUTPUT in legacy
txo_bin = await helpers.request_tx_output(
txo_bin = await helpers.request_tx_prev_output(
self.tx_req, i, self.coin, prev_hash
)
self.write_tx_output(txh, txo_bin, txo_bin.script_pubkey)
@ -452,17 +460,17 @@ class Bitcoin:
return amount_out, script_pubkey
def check_prevtx_output(self, txo_bin: TxOutputBinType) -> None:
def check_prevtx_output(self, txo_bin: TxAckPrevOutputType) -> None:
# Validations to perform on the UTXO when checking the previous transaction output amount.
pass
# Tx Helpers
# ===
def get_sighash_type(self, txi: TxInputType) -> int:
def get_sighash_type(self, txi: TxAckInputType) -> int:
return SIGHASH_ALL
def get_hash_type(self, txi: TxInputType) -> int:
def get_hash_type(self, txi: TxAckInputType) -> int:
""" Return the nHashType flags."""
# The nHashType is the 8 least significant bits of the sighash type.
# Some coins set the 24 most significant bits of the sighash type to
@ -470,14 +478,17 @@ class Bitcoin:
return self.get_sighash_type(txi) & 0xFF
def write_tx_input(
self, w: writers.Writer, txi: TxInputType, script: bytes
self,
w: writers.Writer,
txi: Union[TxAckInputType, TxAckPrevInputType],
script: bytes,
) -> None:
writers.write_tx_input(w, txi, script)
def write_tx_output(
self,
w: writers.Writer,
txo: Union[TxOutputType, TxOutputBinType],
txo: Union[TxAckOutputType, TxAckPrevOutputType],
script_pubkey: bytes,
) -> None:
writers.write_tx_output(w, txo, script_pubkey)
@ -485,7 +496,7 @@ class Bitcoin:
def write_tx_header(
self,
w: writers.Writer,
tx: Union[SignTx, TransactionType],
tx: Union[SignTx, TxAckPrevTxType],
witness_marker: bool,
) -> None:
writers.write_uint32(w, tx.version) # nVersion
@ -494,17 +505,18 @@ class Bitcoin:
write_bitcoin_varint(w, 0x01) # segwit witness flag
def write_tx_footer(
self, w: writers.Writer, tx: Union[SignTx, TransactionType]
self, w: writers.Writer, tx: Union[SignTx, TxAckPrevTxType]
) -> None:
writers.write_uint32(w, tx.lock_time)
async def write_prev_tx_footer(
self, w: writers.Writer, tx: TransactionType, prev_hash: bytes
self, w: writers.Writer, tx: TxAckPrevTxType, prev_hash: bytes
) -> None:
self.write_tx_footer(w, tx)
def set_serialized_signature(self, index: int, signature: bytes) -> None:
# Only one signature per TxRequest can be serialized.
assert self.tx_req.serialized is not None
ensure(self.tx_req.serialized.signature is None)
self.tx_req.serialized.signature_index = index
@ -513,8 +525,9 @@ class Bitcoin:
# Tx Outputs
# ===
def output_derive_script(self, txo: TxOutputType) -> bytes:
def output_derive_script(self, txo: TxAckOutputType) -> bytes:
if txo.script_type == OutputScriptType.PAYTOOPRETURN:
assert txo.op_return_data is not None # checked in sanitize_tx_output
return scripts.output_script_paytoopreturn(txo.op_return_data)
if txo.address_n:
@ -530,9 +543,11 @@ class Bitcoin:
input_script_type, self.coin, node, txo.multisig
)
assert txo.address is not None # checked in sanitize_tx_output
return scripts.output_derive_script(txo.address, self.coin)
def output_is_change(self, txo: TxOutputType) -> bool:
def output_is_change(self, txo: TxAckOutputType) -> bool:
if txo.script_type not in common.CHANGE_OUTPUT_SCRIPT_TYPES:
return False
if txo.multisig and not self.multisig_fingerprint.output_matches(txo):
@ -549,7 +564,7 @@ class Bitcoin:
# ===
def input_derive_script(
self, txi: TxInputType, pubkey: bytes, signature: bytes = None
self, txi: TxAckInputType, pubkey: bytes, signature: bytes
) -> bytes:
return scripts.input_derive_script(
txi.script_type,
@ -568,18 +583,18 @@ class Bitcoin:
self.h_sequence = HashWriter(sha256())
self.h_outputs = HashWriter(sha256())
def hash143_add_input(self, txi: TxInputType) -> None:
def hash143_add_input(self, txi: TxAckInputType) -> None:
writers.write_bytes_reversed(
self.h_prevouts, txi.prev_hash, writers.TX_HASH_SIZE
)
writers.write_uint32(self.h_prevouts, txi.prev_index)
writers.write_uint32(self.h_sequence, txi.sequence)
def hash143_add_output(self, txo: TxOutputType, script_pubkey: bytes) -> None:
def hash143_add_output(self, txo: TxAckOutputType, script_pubkey: bytes) -> None:
writers.write_tx_output(self.h_outputs, txo, script_pubkey)
def hash143_preimage_hash(
self, txi: TxInputType, public_keys: List[bytes], threshold: int
self, txi: TxAckInputType, public_keys: List[bytes], threshold: int
) -> bytes:
h_preimage = HashWriter(sha256())
@ -629,17 +644,17 @@ class Bitcoin:
return writers.get_tx_hash(h_preimage, double=self.coin.sign_hash_double)
def input_is_segwit(txi: TxInputType) -> bool:
def input_is_segwit(txi: TxAckInputType) -> bool:
return txi.script_type in common.SEGWIT_INPUT_SCRIPT_TYPES or (
txi.script_type == InputScriptType.EXTERNAL and txi.witness is not None
)
def input_is_nonsegwit(txi: TxInputType) -> bool:
def input_is_nonsegwit(txi: TxAckInputType) -> bool:
return txi.script_type in common.NONSEGWIT_INPUT_SCRIPT_TYPES or (
txi.script_type == InputScriptType.EXTERNAL and txi.witness is None
)
def input_is_external(txi: TxInputType) -> bool:
def input_is_external(txi: TxAckInputType) -> bool:
return txi.script_type == InputScriptType.EXTERNAL

@ -2,8 +2,8 @@ from micropython import const
from trezor import wire
from trezor.messages.SignTx import SignTx
from trezor.messages.TransactionType import TransactionType
from trezor.messages.TxInputType import TxInputType
from trezor.messages.TxAckInputType import TxAckInputType
from trezor.messages.TxAckPrevTxType import TxAckPrevTxType
from apps.common.writers import write_bitcoin_varint
@ -43,7 +43,7 @@ class Bitcoinlike(Bitcoin):
async def get_tx_digest(
self,
i: int,
txi: TxInputType,
txi: TxAckInputType,
public_keys: List[bytes],
threshold: int,
script_pubkey: bytes,
@ -55,7 +55,7 @@ class Bitcoinlike(Bitcoin):
i, txi, public_keys, threshold, script_pubkey
)
def get_sighash_type(self, txi: TxInputType) -> int:
def get_sighash_type(self, txi: TxAckInputType) -> int:
hashtype = super().get_sighash_type(txi)
if self.coin.fork_id is not None:
hashtype |= (self.coin.fork_id << 8) | _SIGHASH_FORKID
@ -64,18 +64,19 @@ class Bitcoinlike(Bitcoin):
def write_tx_header(
self,
w: writers.Writer,
tx: Union[SignTx, TransactionType],
tx: Union[SignTx, TxAckPrevTxType],
witness_marker: bool,
) -> None:
writers.write_uint32(w, tx.version) # nVersion
if self.coin.timestamp:
assert tx.timestamp is not None # checked in sanitize_*
writers.write_uint32(w, tx.timestamp)
if witness_marker:
write_bitcoin_varint(w, 0x00) # segwit witness marker
write_bitcoin_varint(w, 0x01) # segwit witness flag
async def write_prev_tx_footer(
self, w: writers.Writer, tx: TransactionType, prev_hash: bytes
self, w: writers.Writer, tx: TxAckPrevTxType, prev_hash: bytes
) -> None:
await super().write_prev_tx_footer(w, tx, prev_hash)

@ -3,14 +3,9 @@ from micropython import const
from trezor import wire
from trezor.crypto.hashlib import blake256
from trezor.messages import InputScriptType
from trezor.messages.SignTx import SignTx
from trezor.messages.TransactionType import TransactionType
from trezor.messages.TxInputType import TxInputType
from trezor.messages.TxOutputBinType import TxOutputBinType
from trezor.messages.TxOutputType import TxOutputType
from trezor.messages.TxAckPrevOutputType import TxAckPrevOutputType
from trezor.utils import HashWriter, ensure
from apps.common import coininfo, seed
from apps.common.writers import write_bitcoin_varint
from .. import multisig, scripts, writers
@ -28,13 +23,22 @@ DECRED_SIGHASH_ALL = const(1)
if False:
from typing import Union
from trezor.messages.SignTx import SignTx
from trezor.messages.TxAckInputType import TxAckInputType
from trezor.messages.TxAckOutputType import TxAckOutputType
from trezor.messages.TxAckPrevTxType import TxAckPrevTxType
from trezor.messages.TxAckPrevInputType import TxAckPrevInputType
from apps.common.coininfo import CoinInfo
from apps.common.keychain import Keychain
class Decred(Bitcoin):
def __init__(
self,
tx: SignTx,
keychain: seed.Keychain,
coin: coininfo.CoinInfo,
keychain: Keychain,
coin: CoinInfo,
approver: approvers.Approver,
) -> None:
ensure(coin.decred)
@ -60,16 +64,16 @@ class Decred(Bitcoin):
self.write_tx_footer(self.serialized_tx, self.tx)
self.write_tx_footer(self.h_prefix, self.tx)
async def process_internal_input(self, txi: TxInputType) -> None:
async def process_internal_input(self, txi: TxAckInputType) -> None:
await super().process_internal_input(txi)
# Decred serializes inputs early.
self.write_tx_input(self.serialized_tx, txi, bytes())
async def process_external_input(self, txi: TxInputType) -> None:
async def process_external_input(self, txi: TxAckInputType) -> None:
raise wire.DataError("External inputs not supported")
async def approve_output(self, txo: TxOutputType, script_pubkey: bytes) -> None:
async def approve_output(self, txo: TxAckOutputType, script_pubkey: bytes) -> None:
await super().approve_output(txo, script_pubkey)
self.write_tx_output(self.serialized_tx, txo, script_pubkey)
@ -90,6 +94,7 @@ class Decred(Bitcoin):
key_sign_pub = key_sign.public_key()
if txi_sign.script_type == InputScriptType.SPENDMULTISIG:
assert txi_sign.multisig is not None
prev_pkscript = scripts.output_script_multisig(
multisig.multisig_get_pubkeys(txi_sign.multisig),
txi_sign.multisig.m,
@ -139,18 +144,21 @@ class Decred(Bitcoin):
async def step7_finish(self) -> None:
await helpers.request_tx_finish(self.tx_req)
def check_prevtx_output(self, txo_bin: TxOutputBinType) -> None:
def check_prevtx_output(self, txo_bin: TxAckPrevOutputType) -> None:
if txo_bin.decred_script_version != 0:
raise wire.ProcessError("Cannot use utxo that has script_version != 0")
def hash143_add_input(self, txi: TxInputType) -> None:
def hash143_add_input(self, txi: TxAckInputType) -> None:
self.write_tx_input(self.h_prefix, txi, bytes())
def hash143_add_output(self, txo: TxOutputType, script_pubkey: bytes) -> None:
def hash143_add_output(self, txo: TxAckOutputType, script_pubkey: bytes) -> None:
self.write_tx_output(self.h_prefix, txo, script_pubkey)
def write_tx_input(
self, w: writers.Writer, txi: TxInputType, script: bytes
self,
w: writers.Writer,
txi: Union[TxAckInputType, TxAckPrevInputType],
script: bytes,
) -> None:
writers.write_bytes_reversed(w, txi.prev_hash, writers.TX_HASH_SIZE)
writers.write_uint32(w, txi.prev_index or 0)
@ -160,11 +168,13 @@ class Decred(Bitcoin):
def write_tx_output(
self,
w: writers.Writer,
txo: Union[TxOutputType, TxOutputBinType],
txo: Union[TxAckOutputType, TxAckPrevOutputType],
script_pubkey: bytes,
) -> None:
writers.write_uint64(w, txo.amount)
if isinstance(txo, TxOutputBinType):
if isinstance(txo, TxAckPrevOutputType):
if txo.decred_script_version is None:
raise wire.DataError("Script version must be provided")
writers.write_uint16(w, txo.decred_script_version)
else:
writers.write_uint16(w, DECRED_SCRIPT_VERSION)
@ -173,7 +183,7 @@ class Decred(Bitcoin):
def write_tx_header(
self,
w: writers.Writer,
tx: Union[SignTx, TransactionType],
tx: Union[SignTx, TxAckPrevTxType],
witness_marker: bool,
) -> None:
# The upper 16 bits of the transaction version specify the serialization
@ -186,13 +196,14 @@ class Decred(Bitcoin):
writers.write_uint32(w, version)
def write_tx_footer(
self, w: writers.Writer, tx: Union[SignTx, TransactionType]
self, w: writers.Writer, tx: Union[SignTx, TxAckPrevTxType]
) -> None:
assert tx.expiry is not None # checked in sanitize_*
writers.write_uint32(w, tx.lock_time)
writers.write_uint32(w, tx.expiry)
def write_tx_input_witness(
self, w: writers.Writer, i: TxInputType, script_sig: bytes
self, w: writers.Writer, i: TxAckInputType, script_sig: bytes
) -> None:
writers.write_uint64(w, i.amount)
writers.write_uint32(w, 0) # block height fraud proof

@ -8,10 +8,17 @@ from trezor.messages.RequestType import (
TXOUTPUT,
)
from trezor.messages.SignTx import SignTx
from trezor.messages.TransactionType import TransactionType
from trezor.messages.TxInputType import TxInputType
from trezor.messages.TxOutputBinType import TxOutputBinType
from trezor.messages.TxOutputType import TxOutputType
from trezor.messages.TxAckInput import TxAckInput
from trezor.messages.TxAckInputType import TxAckInputType
from trezor.messages.TxAckOutput import TxAckOutput
from trezor.messages.TxAckOutputType import TxAckOutputType
from trezor.messages.TxAckPrevExtraData import TxAckPrevExtraData
from trezor.messages.TxAckPrevInput import TxAckPrevInput
from trezor.messages.TxAckPrevInputType import TxAckPrevInputType
from trezor.messages.TxAckPrevMeta import TxAckPrevMeta
from trezor.messages.TxAckPrevOutput import TxAckPrevOutput
from trezor.messages.TxAckPrevOutputType import TxAckPrevOutputType
from trezor.messages.TxAckPrevTxType import TxAckPrevTxType
from trezor.messages.TxRequest import TxRequest
from apps.common import paths
@ -35,7 +42,7 @@ class UiConfirm:
class UiConfirmOutput(UiConfirm):
def __init__(self, output: TxOutputType, coin: CoinInfo):
def __init__(self, output: TxAckOutputType, coin: CoinInfo):
self.output = output
self.coin = coin
@ -113,11 +120,11 @@ class UiConfirmNonDefaultLocktime(UiConfirm):
__eq__ = utils.obj_eq
def confirm_output(output: TxOutputType, coin: CoinInfo) -> Awaitable[Any]: # type: ignore
def confirm_output(output: TxAckOutputType, coin: CoinInfo) -> Awaitable[None]: # type: ignore
return (yield UiConfirmOutput(output, coin))
def confirm_total(spending: int, fee: int, coin: CoinInfo) -> Awaitable[Any]: # type: ignore
def confirm_total(spending: int, fee: int, coin: CoinInfo) -> Awaitable[None]: # type: ignore
return (yield UiConfirmTotal(spending, fee, coin))
@ -141,54 +148,77 @@ def confirm_nondefault_locktime(lock_time: int, lock_time_disabled: bool) -> Awa
return (yield UiConfirmNonDefaultLocktime(lock_time, lock_time_disabled))
def request_tx_meta(tx_req: TxRequest, coin: CoinInfo, tx_hash: bytes = None) -> Awaitable[Any]: # type: ignore
def request_tx_meta(tx_req: TxRequest, coin: CoinInfo, tx_hash: bytes = None) -> Awaitable[TxAckPrevTxType]: # type: ignore
assert tx_req.details is not None
tx_req.request_type = TXMETA
tx_req.details.tx_hash = tx_hash
ack = yield tx_req
ack = yield TxAckPrevMeta, tx_req
_clear_tx_request(tx_req)
return sanitize_tx_meta(ack.tx, coin)
def request_tx_extra_data( # type: ignore
tx_req: TxRequest, offset: int, size: int, tx_hash: bytes = None
) -> Awaitable[Any]:
) -> Awaitable[bytearray]:
assert tx_req.details is not None
tx_req.request_type = TXEXTRADATA
tx_req.details.extra_data_offset = offset
tx_req.details.extra_data_len = size
tx_req.details.tx_hash = tx_hash
ack = yield tx_req
ack = yield TxAckPrevExtraData, tx_req
_clear_tx_request(tx_req)
return ack.tx.extra_data
return ack.tx.extra_data_chunk
def request_tx_input(tx_req: TxRequest, i: int, coin: CoinInfo, tx_hash: bytes = None) -> Awaitable[Any]: # type: ignore
def request_tx_input(tx_req: TxRequest, i: int, coin: CoinInfo) -> Awaitable[TxAckInputType]: # type: ignore
assert tx_req.details is not None
tx_req.request_type = TXINPUT
tx_req.details.request_index = i
ack = yield TxAckInput, tx_req
_clear_tx_request(tx_req)
return sanitize_tx_input(ack.tx.input, coin)
def request_tx_prev_input(tx_req: TxRequest, i: int, coin: CoinInfo, tx_hash: bytes = None) -> Awaitable[TxAckPrevInputType]: # type: ignore
assert tx_req.details is not None
tx_req.request_type = TXINPUT
tx_req.details.request_index = i
tx_req.details.tx_hash = tx_hash
ack = yield tx_req
ack = yield TxAckPrevInput, tx_req
_clear_tx_request(tx_req)
return sanitize_tx_prev_input(ack.tx.input, coin)
def request_tx_output(tx_req: TxRequest, i: int, coin: CoinInfo) -> Awaitable[TxAckOutputType]: # type: ignore
assert tx_req.details is not None
tx_req.request_type = TXOUTPUT
tx_req.details.request_index = i
ack = yield TxAckOutput, tx_req
_clear_tx_request(tx_req)
return sanitize_tx_input(ack.tx, coin)
return sanitize_tx_output(ack.tx.output, coin)
def request_tx_output(tx_req: TxRequest, i: int, coin: CoinInfo, tx_hash: bytes = None) -> Awaitable[Any]: # type: ignore
def request_tx_prev_output(tx_req: TxRequest, i: int, coin: CoinInfo, tx_hash: bytes = None) -> Awaitable[TxAckPrevOutputType]: # type: ignore
assert tx_req.details is not None
tx_req.request_type = TXOUTPUT
tx_req.details.request_index = i
tx_req.details.tx_hash = tx_hash
ack = yield tx_req
ack = yield TxAckPrevOutput, tx_req
_clear_tx_request(tx_req)
if tx_hash is None:
return sanitize_tx_output(ack.tx, coin)
else:
return sanitize_tx_binoutput(ack.tx, coin)
# return sanitize_tx_prev_output(ack.tx, coin) # no sanitize is required
return ack.tx.output
def request_tx_finish(tx_req: TxRequest) -> Awaitable[Any]: # type: ignore
def request_tx_finish(tx_req: TxRequest) -> Awaitable[None]: # type: ignore
tx_req.request_type = TXFINISHED
yield tx_req
yield None, tx_req
_clear_tx_request(tx_req)
def _clear_tx_request(tx_req: TxRequest) -> None:
assert tx_req.details is not None
assert tx_req.serialized is not None
assert tx_req.serialized.serialized_tx is not None
tx_req.request_type = None
tx_req.details.request_index = None
tx_req.details.tx_hash = None
@ -196,7 +226,8 @@ def _clear_tx_request(tx_req: TxRequest) -> None:
tx_req.details.extra_data_offset = None
tx_req.serialized.signature = None
tx_req.serialized.signature_index = None
tx_req.serialized.serialized_tx[:] = bytes()
# mypy thinks serialized_tx is `bytes`, which doesn't support indexed assignment
tx_req.serialized.serialized_tx[:] = bytes() # type: ignore
# Data sanitizers
@ -204,11 +235,6 @@ def _clear_tx_request(tx_req: TxRequest) -> None:
def sanitize_sign_tx(tx: SignTx, coin: CoinInfo) -> SignTx:
tx.version = tx.version if tx.version is not None else 1
tx.lock_time = tx.lock_time if tx.lock_time is not None else 0
tx.inputs_count = tx.inputs_count if tx.inputs_count is not None else 0
tx.outputs_count = tx.outputs_count if tx.outputs_count is not None else 0
tx.coin_name = tx.coin_name if tx.coin_name is not None else "Bitcoin"
if coin.decred or coin.overwintered:
tx.expiry = tx.expiry if tx.expiry is not None else 0
elif tx.expiry:
@ -230,14 +256,8 @@ def sanitize_sign_tx(tx: SignTx, coin: CoinInfo) -> SignTx:
return tx
def sanitize_tx_meta(tx: TransactionType, coin: CoinInfo) -> TransactionType:
tx.version = tx.version if tx.version is not None else 1
tx.lock_time = tx.lock_time if tx.lock_time is not None else 0
tx.inputs_cnt = tx.inputs_cnt if tx.inputs_cnt is not None else 0
tx.outputs_cnt = tx.outputs_cnt if tx.outputs_cnt is not None else 0
if coin.extra_data:
tx.extra_data_len = tx.extra_data_len if tx.extra_data_len is not None else 0
elif tx.extra_data_len:
def sanitize_tx_meta(tx: TxAckPrevTxType, coin: CoinInfo) -> TxAckPrevTxType:
if not coin.extra_data and tx.extra_data_len:
raise wire.DataError("Extra data not enabled on this coin.")
if coin.decred or coin.overwintered:
tx.expiry = tx.expiry if tx.expiry is not None else 0
@ -255,34 +275,36 @@ def sanitize_tx_meta(tx: TransactionType, coin: CoinInfo) -> TransactionType:
return tx
def sanitize_tx_input(tx: TransactionType, coin: CoinInfo) -> TxInputType:
txi = tx.inputs[0]
if txi.amount is None:
txi.amount = 0
if txi.script_type is None:
txi.script_type = InputScriptType.SPENDADDRESS
if txi.sequence is None:
txi.sequence = 0xFFFFFFFF
if txi.prev_index is None:
raise wire.DataError("Missing prev_index field.")
if txi.prev_hash is None or len(txi.prev_hash) != TX_HASH_SIZE:
def sanitize_tx_input(txi: TxAckInputType, coin: CoinInfo) -> TxAckInputType:
if len(txi.prev_hash) != TX_HASH_SIZE:
raise wire.DataError("Provided prev_hash is invalid.")
if txi.multisig and txi.script_type not in common.MULTISIG_INPUT_SCRIPT_TYPES:
raise wire.DataError("Multisig field provided but not expected.")
elif not txi.multisig and txi.script_type == InputScriptType.SPENDMULTISIG:
raise wire.DataError("Multisig details required.")
if txi.address_n and txi.script_type not in common.INTERNAL_INPUT_SCRIPT_TYPES:
raise wire.DataError("Input's address_n provided but not expected.")
if not coin.decred and txi.decred_tree is not None:
raise wire.DataError("Decred details provided but Decred coin not specified.")
if txi.script_type in common.SEGWIT_INPUT_SCRIPT_TYPES or txi.witness is not None:
if not coin.segwit:
raise wire.DataError("Segwit not enabled on this coin")
raise wire.DataError("Segwit not enabled on this coin.")
if txi.commitment_data and not txi.ownership_proof:
raise wire.DataError("commitment_data field provided but not expected.")
return txi
def sanitize_tx_output(tx: TransactionType, coin: CoinInfo) -> TxOutputType:
txo = tx.outputs[0]
def sanitize_tx_prev_input(
txi: TxAckPrevInputType, coin: CoinInfo
) -> TxAckPrevInputType:
if len(txi.prev_hash) != TX_HASH_SIZE:
raise wire.DataError("Provided prev_hash is invalid.")
if not coin.decred and txi.decred_tree is not None:
raise wire.DataError("Decred details provided but Decred coin not specified.")
return txi
def sanitize_tx_output(txo: TxAckOutputType, coin: CoinInfo) -> TxAckOutputType:
if txo.multisig and txo.script_type not in common.MULTISIG_OUTPUT_SCRIPT_TYPES:
raise wire.DataError("Multisig field provided but not expected.")
if txo.address_n and txo.script_type not in common.CHANGE_OUTPUT_SCRIPT_TYPES:
@ -307,12 +329,3 @@ def sanitize_tx_output(tx: TransactionType, coin: CoinInfo) -> TxOutputType:
if not txo.address_n and not txo.address:
raise wire.DataError("Missing address")
return txo
def sanitize_tx_binoutput(tx: TransactionType, coin: CoinInfo) -> TxOutputBinType:
txo_bin = tx.bin_outputs[0]
if txo_bin.amount is None:
raise wire.DataError("Missing amount field.")
if txo_bin.script_pubkey is None:
raise wire.DataError("Missing script_pubkey field.")
return txo_bin

@ -3,12 +3,10 @@ from ubinascii import hexlify
from trezor import ui
from trezor.messages import ButtonRequestType, OutputScriptType
from trezor.messages.TxOutputType import TxOutputType
from trezor.strings import format_amount
from trezor.ui.text import Text
from trezor.utils import chunks
from apps.common import coininfo
from apps.common.confirm import require_confirm, require_hold_to_confirm
from .. import addresses
@ -17,11 +15,14 @@ from . import omni
if False:
from typing import Iterator
from trezor import wire
from trezor.messages.TxAckOutputType import TxAckOutputType
from apps.common.coininfo import CoinInfo
_LOCKTIME_TIMESTAMP_MIN_VALUE = const(500000000)
def format_coin_amount(amount: int, coin: coininfo.CoinInfo) -> str:
def format_coin_amount(amount: int, coin: CoinInfo) -> str:
return "%s %s" % (format_amount(amount, coin.decimals), coin.coin_shortcut)
@ -34,10 +35,11 @@ def split_op_return(data: str) -> Iterator[str]:
async def confirm_output(
ctx: wire.Context, output: TxOutputType, coin: coininfo.CoinInfo
ctx: wire.Context, output: TxAckOutputType, coin: CoinInfo
) -> None:
if output.script_type == OutputScriptType.PAYTOOPRETURN:
data = output.op_return_data
assert data is not None
if omni.is_valid(data):
# OMNI transaction
text = Text("OMNI transaction", ui.ICON_SEND, ui.GREEN)
@ -51,6 +53,7 @@ async def confirm_output(
text.mono(*split_op_return(hex_data))
else:
address = output.address
assert address is not None
address_short = addresses.address_short(coin, address)
text = Text("Confirm sending", ui.ICON_SEND, ui.GREEN)
text.normal(format_coin_amount(output.amount, coin) + " to")
@ -59,7 +62,7 @@ async def confirm_output(
async def confirm_joint_total(
ctx: wire.Context, spending: int, total: int, coin: coininfo.CoinInfo
ctx: wire.Context, spending: int, total: int, coin: CoinInfo
) -> None:
text = Text("Joint transaction", ui.ICON_SEND, ui.GREEN)
text.normal("You are contributing:")
@ -70,7 +73,7 @@ async def confirm_joint_total(
async def confirm_total(
ctx: wire.Context, spending: int, fee: int, coin: coininfo.CoinInfo
ctx: wire.Context, spending: int, fee: int, coin: CoinInfo
) -> None:
text = Text("Confirm transaction", ui.ICON_SEND, ui.GREEN)
text.normal("Total amount:")
@ -80,9 +83,7 @@ async def confirm_total(
await require_hold_to_confirm(ctx, text, ButtonRequestType.SignTx)
async def confirm_feeoverthreshold(
ctx: wire.Context, fee: int, coin: coininfo.CoinInfo
) -> None:
async def confirm_feeoverthreshold(ctx: wire.Context, fee: int, coin: CoinInfo) -> None:
text = Text("High fee", ui.ICON_SEND, ui.GREEN)
text.normal("The fee of")
text.bold(format_coin_amount(fee, coin))

@ -1,16 +1,23 @@
from trezor import wire
from trezor.messages.TxInputType import TxInputType
from trezor.messages.TxOutputType import TxOutputType
from trezor.utils import ensure
from .. import multisig
from ..common import BIP32_WALLET_DEPTH
if False:
from typing import Any, Union
from typing import Any, Union, Generic, TypeVar
from trezor.messages.TxAckInputType import TxAckInputType
from trezor.messages.TxAckOutputType import TxAckOutputType
class MatchChecker:
T = TypeVar("T")
else:
# mypy cheat: Generic[T] will be `object` which is a valid parent type
Generic = [object] # type: ignore
T = 0 # type: ignore
class MatchChecker(Generic[T]):
"""
MatchCheckers are used to identify the change-output in a transaction. An output is
a change-output if it has a certain matching attribute with all inputs.
@ -36,16 +43,16 @@ class MatchChecker:
UNDEFINED = object()
def __init__(self) -> None:
self.attribute = self.UNDEFINED # type: Any
self.attribute = self.UNDEFINED # type: Union[object, T]
self.read_only = False # Failsafe to ensure that add_input() is not accidentally called after output_matches().
def attribute_from_tx(self, txio: Union[TxInputType, TxOutputType]) -> Any:
def attribute_from_tx(self, txio: Union[TxAckInputType, TxAckOutputType]) -> T:
# Return the attribute from the txio, which is to be used for matching.
# If the txio is invalid for matching, then return an object which
# evaluates as a boolean False.
raise NotImplementedError
def add_input(self, txi: TxInputType) -> None:
def add_input(self, txi: TxAckInputType) -> None:
ensure(not self.read_only)
if self.attribute is self.MISMATCH:
@ -59,7 +66,7 @@ class MatchChecker:
elif self.attribute != added_attribute:
self.attribute = self.MISMATCH
def check_input(self, txi: TxInputType) -> None:
def check_input(self, txi: TxAckInputType) -> None:
if self.attribute is self.MISMATCH:
return # There was already a mismatch when adding inputs, ignore it now.
@ -68,7 +75,7 @@ class MatchChecker:
if self.attribute != self.attribute_from_tx(txi):
raise wire.ProcessError("Transaction has changed during signing")
def output_matches(self, txo: TxOutputType) -> bool:
def output_matches(self, txo: TxAckOutputType) -> bool:
self.read_only = True
if self.attribute is self.MISMATCH:
@ -78,14 +85,14 @@ class MatchChecker:
class WalletPathChecker(MatchChecker):
def attribute_from_tx(self, txio: Union[TxInputType, TxOutputType]) -> Any:
def attribute_from_tx(self, txio: Union[TxAckInputType, TxAckOutputType]) -> Any:
if len(txio.address_n) < BIP32_WALLET_DEPTH:
return None
return txio.address_n[:-BIP32_WALLET_DEPTH]
class MultisigFingerprintChecker(MatchChecker):
def attribute_from_tx(self, txio: Union[TxInputType, TxOutputType]) -> Any:
def attribute_from_tx(self, txio: Union[TxAckInputType, TxAckOutputType]) -> Any:
if not txio.multisig:
return None
return multisig.multisig_fingerprint(txio.multisig)

@ -2,9 +2,6 @@ from ustruct import unpack
from trezor.strings import format_amount
if False:
from typing import Optional
currencies = {
1: ("OMNI", True),
2: ("tOMNI", True),
@ -17,9 +14,9 @@ def is_valid(data: bytes) -> bool:
return len(data) >= 8 and data[:4] == b"omni"
def parse(data: bytes) -> Optional[str]:
def parse(data: bytes) -> str:
if not is_valid(data):
return None
raise ValueError # tried to parse data that fails validation
tx_version, tx_type = unpack(">HH", data[4:8])
if tx_version == 0 and tx_type == 0 and len(data) == 20: # OMNI simple send
currency, amount = unpack(">IQ", data[8:20])

@ -8,7 +8,9 @@
from micropython import const
from trezor.messages import InputScriptType
from trezor.messages.TxInputType import TxInputType
if False:
from trezor.messages.TxAckInputType import TxAckInputType
# transaction header size: 4 byte version
_TXSIZE_HEADER = const(4)
@ -50,7 +52,7 @@ class TxWeightCalculator:
self.counter += self.ser_length_size(self.inputs_count)
self.segwit = True
def add_input(self, i: TxInputType) -> None:
def add_input(self, i: TxAckInputType) -> None:
if i.multisig:
multisig_script_size = _TXSIZE_MULTISIGSCRIPT + len(i.multisig.pubkeys) * (

@ -5,8 +5,8 @@ from trezor import wire
from trezor.crypto.hashlib import blake2b
from trezor.messages import InputScriptType
from trezor.messages.SignTx import SignTx
from trezor.messages.TransactionType import TransactionType
from trezor.messages.TxInputType import TxInputType
from trezor.messages.TxAckInputType import TxAckInputType
from trezor.messages.TxAckPrevTxType import TxAckPrevTxType
from trezor.utils import HashWriter, ensure
from apps.common.coininfo import CoinInfo
@ -64,7 +64,7 @@ class Zcashlike(Bitcoinlike):
async def get_tx_digest(
self,
i: int,
txi: TxInputType,
txi: TxAckInputType,
public_keys: List[bytes],
threshold: int,
script_pubkey: bytes,
@ -72,17 +72,20 @@ class Zcashlike(Bitcoinlike):
return self.hash143_preimage_hash(txi, public_keys, threshold)
def write_tx_header(
self, w: Writer, tx: Union[SignTx, TransactionType], witness_marker: bool
self, w: Writer, tx: Union[SignTx, TxAckPrevTxType], witness_marker: bool
) -> None:
if tx.version < 3:
# pre-overwinter
write_uint32(w, tx.version)
else:
if tx.version_group_id is None:
raise wire.DataError("Version group ID is missing")
# nVersion | fOverwintered
write_uint32(w, tx.version | OVERWINTERED)
write_uint32(w, tx.version_group_id) # nVersionGroupId
def write_tx_footer(self, w: Writer, tx: Union[SignTx, TransactionType]) -> None:
def write_tx_footer(self, w: Writer, tx: Union[SignTx, TxAckPrevTxType]) -> None:
assert tx.expiry is not None # checked in sanitize_*
write_uint32(w, tx.lock_time)
if tx.version >= 3:
write_uint32(w, tx.expiry) # expiryHeight
@ -96,7 +99,7 @@ class Zcashlike(Bitcoinlike):
self.h_outputs = HashWriter(blake2b(outlen=32, personal=b"ZcashOutputsHash"))
def hash143_preimage_hash(
self, txi: TxInputType, public_keys: List[bytes], threshold: int
self, txi: TxAckInputType, public_keys: List[bytes], threshold: int
) -> bytes:
h_preimage = HashWriter(
blake2b(
@ -105,6 +108,9 @@ class Zcashlike(Bitcoinlike):
)
)
assert self.tx.version_group_id is not None
assert self.tx.expiry is not None
# 1. nVersion | fOverwintered
write_uint32(h_preimage, self.tx.version | OVERWINTERED)
# 2. nVersionGroupId
@ -150,7 +156,7 @@ class Zcashlike(Bitcoinlike):
def derive_script_code(
txi: TxInputType, public_keys: List[bytes], threshold: int, coin: CoinInfo
txi: TxAckInputType, public_keys: List[bytes], threshold: int, coin: CoinInfo
) -> bytearray:
if len(public_keys) > 1:
return output_script_multisig(public_keys, threshold)

@ -18,18 +18,26 @@ from .scripts import (
)
if False:
from typing import List, Tuple
from typing import List, Optional, Tuple
from apps.common.coininfo import CoinInfo
class SignatureVerifier:
def __init__(
self, script_pubkey: bytes, script_sig: bytes, witness: bytes, coin: CoinInfo,
self,
script_pubkey: bytes,
script_sig: Optional[bytes],
witness: Optional[bytes],
coin: CoinInfo,
):
self.threshold = 1
self.public_keys = [] # type: List[bytes]
self.signatures = [] # type: List[Tuple[bytes, int]]
if not script_sig:
if not witness:
raise wire.DataError("Signature data not provided")
if len(script_pubkey) == 22: # P2WPKH
public_key, signature, hash_type = parse_witness_p2wpkh(witness)
pubkey_hash = ecdsa_hash_pubkey(public_key, coin)

@ -14,8 +14,12 @@ from .addresses import (
address_to_cashaddr,
)
if False:
from trezor.messages.VerifyMessage import VerifyMessage
from trezor.messages.TxInputType import EnumTypeInputScriptType
async def verify_message(ctx, msg):
async def verify_message(ctx: wire.Context, msg: VerifyMessage) -> Success:
message = msg.message
address = msg.address
signature = msg.signature
@ -24,15 +28,17 @@ async def verify_message(ctx, msg):
digest = message_digest(coin, message)
script_type = None
recid = signature[0]
if recid >= 27 and recid <= 34:
script_type = SPENDADDRESS # p2pkh
# p2pkh
script_type = SPENDADDRESS # type: EnumTypeInputScriptType
elif recid >= 35 and recid <= 38:
script_type = SPENDP2SHWITNESS # segwit-in-p2sh
# segwit-in-p2sh
script_type = SPENDP2SHWITNESS
signature = bytes([signature[0] - 4]) + signature[1:]
elif recid >= 39 and recid <= 42:
script_type = SPENDWITNESS # native segwit
# native segwit
script_type = SPENDWITNESS
signature = bytes([signature[0] - 8]) + signature[1:]
else:
raise wire.ProcessError("Invalid signature")

@ -1,9 +1,6 @@
from micropython import const
from trezor.crypto.hashlib import sha256
from trezor.messages.TxInputType import TxInputType
from trezor.messages.TxOutputBinType import TxOutputBinType
from trezor.messages.TxOutputType import TxOutputType
from trezor.utils import ensure
from apps.common.writers import ( # noqa: F401
@ -20,9 +17,15 @@ from apps.common.writers import ( # noqa: F401
if False:
from typing import Union
from apps.common.writers import Writer
from trezor.messages.TxAckInputType import TxAckInputType
from trezor.messages.TxAckOutputType import TxAckOutputType
from trezor.messages.TxAckPrevInputType import TxAckPrevInputType
from trezor.messages.TxAckPrevOutputType import TxAckPrevOutputType
from trezor.utils import HashWriter
from apps.common.writers import Writer
write_uint16 = write_uint16_le
write_uint32 = write_uint32_le
write_uint64 = write_uint64_le
@ -35,14 +38,16 @@ def write_bytes_prefixed(w: Writer, b: bytes) -> None:
write_bytes_unchecked(w, b)
def write_tx_input(w: Writer, i: TxInputType, script: bytes) -> None:
def write_tx_input(
w: Writer, i: Union[TxAckInputType, TxAckPrevInputType], script: bytes,
) -> None:
write_bytes_reversed(w, i.prev_hash, TX_HASH_SIZE)
write_uint32(w, i.prev_index)
write_bytes_prefixed(w, script)
write_uint32(w, i.sequence)
def write_tx_input_check(w: Writer, i: TxInputType) -> None:
def write_tx_input_check(w: Writer, i: TxAckInputType) -> None:
write_bytes_fixed(w, i.prev_hash, TX_HASH_SIZE)
write_uint32(w, i.prev_index)
write_uint32(w, i.script_type)
@ -54,7 +59,7 @@ def write_tx_input_check(w: Writer, i: TxInputType) -> None:
def write_tx_output(
w: Writer, o: Union[TxOutputType, TxOutputBinType], script_pubkey: bytes
w: Writer, o: Union[TxAckOutputType, TxAckPrevOutputType], script_pubkey: bytes
) -> None:
write_uint64(w, o.amount)
write_bytes_prefixed(w, script_pubkey)

Loading…
Cancel
Save