refactor(core/bitcoin): Change scripts to use writer semantics.

pull/1723/head
Andrew Kozlik 3 years ago committed by Andrew Kozlik
parent 8538f55edc
commit 27e6f35f78

@ -0,0 +1 @@
Memory optimization of BTC signing and CBOR decoding.

@ -15,6 +15,7 @@ from .multisig import (
) )
from .readers import read_bytes_prefixed, read_op_push from .readers import read_bytes_prefixed, read_op_push
from .writers import ( from .writers import (
op_push_length,
write_bytes_fixed, write_bytes_fixed,
write_bytes_prefixed, write_bytes_prefixed,
write_bytes_unchecked, write_bytes_unchecked,
@ -29,40 +30,44 @@ if False:
from .writers import Writer from .writers import Writer
def input_derive_script( def write_input_script_prefixed(
w: Writer,
script_type: InputScriptType, script_type: InputScriptType,
multisig: MultisigRedeemScriptType | None, multisig: MultisigRedeemScriptType | None,
coin: CoinInfo, coin: CoinInfo,
hash_type: int, hash_type: int,
pubkey: bytes, pubkey: bytes,
signature: bytes, signature: bytes,
) -> bytes: ) -> None:
if script_type == InputScriptType.SPENDADDRESS: if script_type == InputScriptType.SPENDADDRESS:
# p2pkh or p2sh # p2pkh or p2sh
return input_script_p2pkh_or_p2sh(pubkey, signature, hash_type) write_input_script_p2pkh_or_p2sh_prefixed(w, pubkey, signature, hash_type)
elif script_type == InputScriptType.SPENDP2SHWITNESS:
if script_type == InputScriptType.SPENDP2SHWITNESS:
# p2wpkh or p2wsh using p2sh # p2wpkh or p2wsh using p2sh
if multisig is not None: if multisig is not None:
# p2wsh in p2sh # p2wsh in p2sh
pubkeys = multisig_get_pubkeys(multisig) pubkeys = multisig_get_pubkeys(multisig)
witness_script_hasher = utils.HashWriter(sha256()) witness_script_h = utils.HashWriter(sha256())
write_output_script_multisig(witness_script_hasher, pubkeys, multisig.m) write_output_script_multisig(witness_script_h, pubkeys, multisig.m)
witness_script_hash = witness_script_hasher.get_digest() write_input_script_p2wsh_in_p2sh(
return input_script_p2wsh_in_p2sh(witness_script_hash) w, witness_script_h.get_digest(), prefixed=True
)
# p2wpkh in p2sh else:
return input_script_p2wpkh_in_p2sh(common.ecdsa_hash_pubkey(pubkey, coin)) # p2wpkh in p2sh
write_input_script_p2wpkh_in_p2sh(
w, common.ecdsa_hash_pubkey(pubkey, coin), prefixed=True
)
elif script_type == InputScriptType.SPENDWITNESS: elif script_type == InputScriptType.SPENDWITNESS:
# native p2wpkh or p2wsh # native p2wpkh or p2wsh
return input_script_native_p2wpkh_or_p2wsh() script_sig = input_script_native_p2wpkh_or_p2wsh()
write_bytes_prefixed(w, script_sig)
elif script_type == InputScriptType.SPENDMULTISIG: elif script_type == InputScriptType.SPENDMULTISIG:
# p2sh multisig # p2sh multisig
assert multisig is not None # checked in sanitize_tx_input assert multisig is not None # checked in sanitize_tx_input
signature_index = multisig_pubkey_index(multisig, pubkey) signature_index = multisig_pubkey_index(multisig, pubkey)
return input_script_multisig( write_input_script_multisig_prefixed(
multisig, signature, signature_index, hash_type, coin w, multisig, signature, signature_index, hash_type, coin
) )
else: else:
raise wire.ProcessError("Invalid script type") raise wire.ProcessError("Invalid script type")
@ -110,11 +115,12 @@ def output_derive_script(address: str, coin: CoinInfo) -> bytes:
# see https://github.com/bitcoin/bips/blob/master/bip-0143.mediawiki#specification # see https://github.com/bitcoin/bips/blob/master/bip-0143.mediawiki#specification
# item 5 for details # item 5 for details
def bip143_derive_script_code( def write_bip143_script_code_prefixed(
txi: TxInput, public_keys: list[bytes], threshold: int, coin: CoinInfo w: Writer, txi: TxInput, public_keys: list[bytes], threshold: int, coin: CoinInfo
) -> bytearray: ) -> None:
if len(public_keys) > 1: if len(public_keys) > 1:
return output_script_multisig(public_keys, threshold) write_output_script_multisig(w, public_keys, threshold, prefixed=True)
return
p2pkh = ( p2pkh = (
txi.script_type == InputScriptType.SPENDWITNESS txi.script_type == InputScriptType.SPENDWITNESS
@ -122,11 +128,13 @@ def bip143_derive_script_code(
or txi.script_type == InputScriptType.SPENDADDRESS or txi.script_type == InputScriptType.SPENDADDRESS
or txi.script_type == InputScriptType.EXTERNAL or txi.script_type == InputScriptType.EXTERNAL
) )
if p2pkh: if p2pkh:
# for p2wpkh in p2sh or native p2wpkh # for p2wpkh in p2sh or native p2wpkh
# the scriptCode is a classic p2pkh # the scriptCode is a classic p2pkh
return output_script_p2pkh(common.ecdsa_hash_pubkey(public_keys[0], coin)) write_output_script_p2pkh(
w, common.ecdsa_hash_pubkey(public_keys[0], coin), prefixed=True
)
else: else:
raise wire.DataError("Unknown input script type for bip143 script code") raise wire.DataError("Unknown input script type for bip143 script code")
@ -136,13 +144,12 @@ def bip143_derive_script_code(
# https://github.com/bitcoin/bips/blob/master/bip-0016.mediawiki # https://github.com/bitcoin/bips/blob/master/bip-0016.mediawiki
def input_script_p2pkh_or_p2sh( def write_input_script_p2pkh_or_p2sh_prefixed(
pubkey: bytes, signature: bytes, hash_type: int w: Writer, pubkey: bytes, signature: bytes, hash_type: int
) -> bytearray: ) -> None:
w = utils.empty_bytearray(5 + len(signature) + 1 + 5 + len(pubkey)) write_bitcoin_varint(w, 1 + len(signature) + 1 + 1 + len(pubkey))
append_signature(w, signature, hash_type) append_signature(w, signature, hash_type)
append_pubkey(w, pubkey) append_pubkey(w, pubkey)
return w
def parse_input_script_p2pkh(script_sig: bytes) -> tuple[bytes, bytes, int]: def parse_input_script_p2pkh(script_sig: bytes) -> tuple[bytes, bytes, int]:
@ -162,15 +169,22 @@ def parse_input_script_p2pkh(script_sig: bytes) -> tuple[bytes, bytes, int]:
return pubkey, signature, hash_type return pubkey, signature, hash_type
def write_output_script_p2pkh(
w: Writer, pubkeyhash: bytes, prefixed: bool = False
) -> None:
if prefixed:
write_bitcoin_varint(w, 25)
w.append(0x76) # OP_DUP
w.append(0xA9) # OP_HASH160
w.append(0x14) # OP_DATA_20
write_bytes_fixed(w, pubkeyhash, 20)
w.append(0x88) # OP_EQUALVERIFY
w.append(0xAC) # OP_CHECKSIG
def output_script_p2pkh(pubkeyhash: bytes) -> bytearray: def output_script_p2pkh(pubkeyhash: bytes) -> bytearray:
utils.ensure(len(pubkeyhash) == 20) s = utils.empty_bytearray(25)
s = bytearray(25) write_output_script_p2pkh(s, pubkeyhash)
s[0] = 0x76 # OP_DUP
s[1] = 0xA9 # OP_HASH_160
s[2] = 0x14 # pushing 20 bytes
s[3:23] = pubkeyhash
s[23] = 0x88 # OP_EQUALVERIFY
s[24] = 0xAC # OP_CHECKSIG
return s return s
@ -224,17 +238,18 @@ def output_script_native_p2wpkh_or_p2wsh(witprog: bytes) -> bytearray:
# Uses normal P2SH output scripts. # Uses normal P2SH output scripts.
def input_script_p2wpkh_in_p2sh(pubkeyhash: bytes) -> bytearray: def write_input_script_p2wpkh_in_p2sh(
w: Writer, pubkeyhash: bytes, prefixed: bool = False
) -> None:
# 16 00 14 <pubkeyhash> # 16 00 14 <pubkeyhash>
# Signature is moved to the witness. # Signature is moved to the witness.
utils.ensure(len(pubkeyhash) == 20) if prefixed:
write_bitcoin_varint(w, 23)
w = utils.empty_bytearray(3 + len(pubkeyhash))
w.append(0x16) # length of the data w.append(0x16) # length of the data
w.append(0x00) # witness version byte w.append(0x00) # witness version byte
w.append(0x14) # P2WPKH witness program (pub key hash length) w.append(0x14) # P2WPKH witness program (pub key hash length)
write_bytes_fixed(w, pubkeyhash, 20) # pub key hash write_bytes_fixed(w, pubkeyhash, 20) # pub key hash
return w
# SegWit: P2WSH nested in P2SH # SegWit: P2WSH nested in P2SH
@ -245,31 +260,30 @@ def input_script_p2wpkh_in_p2sh(pubkeyhash: bytes) -> bytearray:
# Uses normal P2SH output scripts. # Uses normal P2SH output scripts.
def input_script_p2wsh_in_p2sh(script_hash: bytes) -> bytearray: def write_input_script_p2wsh_in_p2sh(
w: Writer, script_hash: bytes, prefixed: bool = False
) -> None:
# 22 00 20 <redeem script hash> # 22 00 20 <redeem script hash>
# Signature is moved to the witness. # Signature is moved to the witness.
if prefixed:
write_bitcoin_varint(w, 35)
if len(script_hash) != 32:
raise wire.DataError("Redeem script hash should be 32 bytes long")
w = utils.empty_bytearray(3 + len(script_hash))
w.append(0x22) # length of the data w.append(0x22) # length of the data
w.append(0x00) # witness version byte w.append(0x00) # witness version byte
w.append(0x20) # P2WSH witness program (redeem script hash length) w.append(0x20) # P2WSH witness program (redeem script hash length)
write_bytes_fixed(w, script_hash, 32) write_bytes_fixed(w, script_hash, 32)
return w
# SegWit: Witness getters # SegWit: Witness getters
# === # ===
def witness_p2wpkh(signature: bytes, pubkey: bytes, hash_type: int) -> bytearray: def write_witness_p2wpkh(
w = utils.empty_bytearray(1 + 5 + len(signature) + 1 + 5 + len(pubkey)) w: Writer, signature: bytes, pubkey: bytes, hash_type: int
) -> None:
write_bitcoin_varint(w, 0x02) # num of segwit items, in P2WPKH it's always 2 write_bitcoin_varint(w, 0x02) # num of segwit items, in P2WPKH it's always 2
write_signature_prefixed(w, signature, hash_type) write_signature_prefixed(w, signature, hash_type)
write_bytes_prefixed(w, pubkey) write_bytes_prefixed(w, pubkey)
return w
def parse_witness_p2wpkh(witness: bytes) -> tuple[bytes, bytes, int]: def parse_witness_p2wpkh(witness: bytes) -> tuple[bytes, bytes, int]:
@ -293,53 +307,39 @@ def parse_witness_p2wpkh(witness: bytes) -> tuple[bytes, bytes, int]:
return pubkey, signature, hash_type return pubkey, signature, hash_type
def witness_multisig( def write_witness_multisig(
w: Writer,
multisig: MultisigRedeemScriptType, multisig: MultisigRedeemScriptType,
signature: bytes, signature: bytes,
signature_index: int, signature_index: int,
hash_type: int, hash_type: int,
) -> bytearray: ) -> None:
# get other signatures, stretch with empty bytes to the number of the pubkeys # get other signatures, stretch with empty bytes to the number of the pubkeys
signatures = multisig.signatures + [b""] * ( signatures = multisig.signatures + [b""] * (
multisig_get_pubkey_count(multisig) - len(multisig.signatures) multisig_get_pubkey_count(multisig) - len(multisig.signatures)
) )
# fill in our signature # fill in our signature
if signatures[signature_index]: if signatures[signature_index]:
raise wire.DataError("Invalid multisig parameters") raise wire.DataError("Invalid multisig parameters")
signatures[signature_index] = signature signatures[signature_index] = signature
# filter empty
signatures = [s for s in signatures if s]
# witness program + signatures + redeem script # witness program + signatures + redeem script
num_of_witness_items = 1 + len(signatures) + 1 num_of_witness_items = 1 + sum(1 for s in signatures if s) + 1
# length of the redeem script
pubkeys = multisig_get_pubkeys(multisig)
redeem_script_length = output_script_multisig_length(pubkeys, multisig.m)
# length of the result
total_length = 1 + 1 # number of items, OP_FALSE
for s in signatures:
total_length += 1 + len(s) + 1 # length, signature, hash_type
total_length += 1 + redeem_script_length # length, script
w = utils.empty_bytearray(total_length)
write_bitcoin_varint(w, num_of_witness_items) write_bitcoin_varint(w, num_of_witness_items)
# Starts with OP_FALSE because of an old OP_CHECKMULTISIG bug, which # Starts with OP_FALSE because of an old OP_CHECKMULTISIG bug, which
# consumes one additional item on the stack: # consumes one additional item on the stack:
# https://bitcoin.org/en/developer-guide#standard-transactions # https://bitcoin.org/en/developer-guide#standard-transactions
write_bitcoin_varint(w, 0) write_bitcoin_varint(w, 0)
for s in signatures: for s in signatures:
write_signature_prefixed(w, s, hash_type) # size of the witness included if s:
write_signature_prefixed(w, s, hash_type) # size of the witness included
# redeem script # redeem script
write_bitcoin_varint(w, redeem_script_length) pubkeys = multisig_get_pubkeys(multisig)
write_output_script_multisig(w, pubkeys, multisig.m) write_output_script_multisig(w, pubkeys, multisig.m, prefixed=True)
return w
def parse_witness_multisig(witness: bytes) -> tuple[bytes, list[tuple[bytes, int]]]: def parse_witness_multisig(witness: bytes) -> tuple[bytes, list[tuple[bytes, int]]]:
@ -375,13 +375,14 @@ def parse_witness_multisig(witness: bytes) -> tuple[bytes, list[tuple[bytes, int
# Used either as P2SH, P2WSH, or P2WSH nested in P2SH. # Used either as P2SH, P2WSH, or P2WSH nested in P2SH.
def input_script_multisig( def write_input_script_multisig_prefixed(
w: Writer,
multisig: MultisigRedeemScriptType, multisig: MultisigRedeemScriptType,
signature: bytes, signature: bytes,
signature_index: int, signature_index: int,
hash_type: int, hash_type: int,
coin: CoinInfo, coin: CoinInfo,
) -> bytearray: ) -> None:
signatures = multisig.signatures # other signatures signatures = multisig.signatures # other signatures
if len(signatures[signature_index]) > 0: if len(signatures[signature_index]) > 0:
raise wire.DataError("Invalid multisig parameters") raise wire.DataError("Invalid multisig parameters")
@ -394,10 +395,10 @@ def input_script_multisig(
# length of the result # length of the result
total_length = 1 # OP_FALSE total_length = 1 # OP_FALSE
for s in signatures: for s in signatures:
total_length += 1 + len(s) + 1 # length, signature, hash_type if s:
total_length += 1 + redeem_script_length # length, script total_length += 1 + len(s) + 1 # length, signature, hash_type
total_length += op_push_length(redeem_script_length) + redeem_script_length
w = utils.empty_bytearray(total_length) write_bitcoin_varint(w, total_length)
# Starts with OP_FALSE because of an old OP_CHECKMULTISIG bug, which # Starts with OP_FALSE because of an old OP_CHECKMULTISIG bug, which
# consumes one additional item on the stack: # consumes one additional item on the stack:
@ -405,15 +406,13 @@ def input_script_multisig(
w.append(0x00) w.append(0x00)
for s in signatures: for s in signatures:
if len(s): if s:
append_signature(w, s, hash_type) append_signature(w, s, hash_type)
# redeem script # redeem script
write_op_push(w, redeem_script_length) write_op_push(w, redeem_script_length)
write_output_script_multisig(w, pubkeys, multisig.m) write_output_script_multisig(w, pubkeys, multisig.m)
return w
def parse_input_script_multisig( def parse_input_script_multisig(
script_sig: bytes, script_sig: bytes,
@ -448,7 +447,12 @@ def output_script_multisig(pubkeys: list[bytes], m: int) -> bytearray:
return w return w
def write_output_script_multisig(w: Writer, pubkeys: list[bytes], m: int) -> None: def write_output_script_multisig(
w: Writer,
pubkeys: list[bytes],
m: int,
prefixed: bool = False,
) -> None:
n = len(pubkeys) n = len(pubkeys)
if n < 1 or n > 15 or m < 1 or m > 15 or m > n: if n < 1 or n > 15 or m < 1 or m > 15 or m > n:
raise wire.DataError("Invalid multisig parameters") raise wire.DataError("Invalid multisig parameters")
@ -456,6 +460,9 @@ def write_output_script_multisig(w: Writer, pubkeys: list[bytes], m: int) -> Non
if len(pubkey) != 33: if len(pubkey) != 33:
raise wire.DataError("Invalid multisig parameters") raise wire.DataError("Invalid multisig parameters")
if prefixed:
write_bitcoin_varint(w, output_script_multisig_length(pubkeys, m))
w.append(0x50 + m) # numbers 1 to 16 are pushed as 0x50 + value w.append(0x50 + m) # numbers 1 to 16 are pushed as 0x50 + value
for p in pubkeys: for p in pubkeys:
append_pubkey(w, p) append_pubkey(w, p)
@ -525,24 +532,22 @@ def write_bip322_signature_proof(
public_key: bytes, public_key: bytes,
signature: bytes, signature: bytes,
) -> None: ) -> None:
script_sig = input_derive_script( write_input_script_prefixed(
script_type, multisig, coin, common.SIGHASH_ALL, public_key, signature w, script_type, multisig, coin, common.SIGHASH_ALL, public_key, signature
) )
if script_type in common.SEGWIT_INPUT_SCRIPT_TYPES: if script_type in common.SEGWIT_INPUT_SCRIPT_TYPES:
if multisig: if multisig:
# find the place of our signature based on the public key # find the place of our signature based on the public key
signature_index = multisig_pubkey_index(multisig, public_key) signature_index = multisig_pubkey_index(multisig, public_key)
witness = witness_multisig( write_witness_multisig(
multisig, signature, signature_index, common.SIGHASH_ALL w, multisig, signature, signature_index, common.SIGHASH_ALL
) )
else: else:
witness = witness_p2wpkh(signature, public_key, common.SIGHASH_ALL) write_witness_p2wpkh(w, signature, public_key, common.SIGHASH_ALL)
else: else:
# Zero entries in witness stack. # Zero entries in witness stack.
witness = bytearray(b"\x00") w.append(0x00)
write_bytes_prefixed(w, script_sig)
w.extend(witness)
def read_bip322_signature_proof(r: utils.BufferReader) -> tuple[bytes, bytes]: def read_bip322_signature_proof(r: utils.BufferReader) -> tuple[bytes, bytes]:

@ -8,47 +8,53 @@ from apps.common.writers import write_bytes_fixed, write_uint64_le
from . import scripts from . import scripts
from .multisig import multisig_get_pubkeys, multisig_pubkey_index from .multisig import multisig_get_pubkeys, multisig_pubkey_index
from .scripts import ( # noqa: F401 from .scripts import ( # noqa: F401
output_script_multisig,
output_script_p2pkh,
output_script_paytoopreturn, output_script_paytoopreturn,
write_output_script_multisig,
write_output_script_p2pkh,
) )
from .writers import write_op_push from .writers import op_push_length, write_bitcoin_varint, write_op_push
if False: if False:
from trezor.messages import MultisigRedeemScriptType from trezor.messages import MultisigRedeemScriptType
from apps.common.coininfo import CoinInfo from apps.common.coininfo import CoinInfo
from .writers import Writer
def input_derive_script(
def write_input_script_prefixed(
w: Writer,
script_type: InputScriptType, script_type: InputScriptType,
multisig: MultisigRedeemScriptType | None, multisig: MultisigRedeemScriptType | None,
coin: CoinInfo, coin: CoinInfo,
hash_type: int, hash_type: int,
pubkey: bytes, pubkey: bytes,
signature: bytes, signature: bytes,
) -> bytes: ) -> None:
if script_type == InputScriptType.SPENDADDRESS: if script_type == InputScriptType.SPENDADDRESS:
# p2pkh or p2sh # p2pkh or p2sh
return scripts.input_script_p2pkh_or_p2sh(pubkey, signature, hash_type) scripts.write_input_script_p2pkh_or_p2sh_prefixed(
w, pubkey, signature, hash_type
)
elif script_type == InputScriptType.SPENDMULTISIG: elif script_type == InputScriptType.SPENDMULTISIG:
# p2sh multisig # p2sh multisig
assert multisig is not None # checked in sanitize_tx_input assert multisig is not None # checked in sanitize_tx_input
signature_index = multisig_pubkey_index(multisig, pubkey) signature_index = multisig_pubkey_index(multisig, pubkey)
return input_script_multisig( return write_input_script_multisig_prefixed(
multisig, signature, signature_index, hash_type, coin w, multisig, signature, signature_index, hash_type, coin
) )
else: else:
raise wire.ProcessError("Invalid script type") raise wire.ProcessError("Invalid script type")
def input_script_multisig( def write_input_script_multisig_prefixed(
w: Writer,
multisig: MultisigRedeemScriptType, multisig: MultisigRedeemScriptType,
signature: bytes, signature: bytes,
signature_index: int, signature_index: int,
hash_type: int, hash_type: int,
coin: CoinInfo, coin: CoinInfo,
) -> bytearray: ) -> None:
signatures = multisig.signatures # other signatures signatures = multisig.signatures # other signatures
if len(signatures[signature_index]) > 0: if len(signatures[signature_index]) > 0:
raise wire.DataError("Invalid multisig parameters") raise wire.DataError("Invalid multisig parameters")
@ -61,21 +67,19 @@ def input_script_multisig(
# length of the result # length of the result
total_length = 0 total_length = 0
for s in signatures: for s in signatures:
total_length += 1 + len(s) + 1 # length, signature, hash_type if s:
total_length += 1 + redeem_script_length # length, script total_length += 1 + len(s) + 1 # length, signature, hash_type
total_length += op_push_length(redeem_script_length) + redeem_script_length
w = utils.empty_bytearray(total_length) write_bitcoin_varint(w, total_length)
for s in signatures: for s in signatures:
if len(s): if s:
scripts.append_signature(w, s, hash_type) scripts.append_signature(w, s, hash_type)
# redeem script # redeem script
write_op_push(w, redeem_script_length) write_op_push(w, redeem_script_length)
scripts.write_output_script_multisig(w, pubkeys, multisig.m) scripts.write_output_script_multisig(w, pubkeys, multisig.m)
return w
# A ticket purchase submission for an address hash. # A ticket purchase submission for an address hash.
def output_script_sstxsubmissionpkh(addr: str) -> bytearray: def output_script_sstxsubmissionpkh(addr: str) -> bytearray:
@ -86,12 +90,7 @@ def output_script_sstxsubmissionpkh(addr: str) -> bytearray:
w = utils.empty_bytearray(26) w = utils.empty_bytearray(26)
w.append(0xBA) # OP_SSTX w.append(0xBA) # OP_SSTX
w.append(0x76) # OP_DUP scripts.write_output_script_p2pkh(w, raw_address[2:])
w.append(0xA9) # OP_HASH160
w.append(0x14) # OP_DATA_20
write_bytes_fixed(w, raw_address[2:], 20)
w.append(0x88) # OP_EQUALVERIFY
w.append(0xAC) # OP_CHECKSIG
return w return w
@ -104,44 +103,27 @@ def output_script_sstxchange(addr: str) -> bytearray:
w = utils.empty_bytearray(26) w = utils.empty_bytearray(26)
w.append(0xBD) # OP_SSTXCHANGE w.append(0xBD) # OP_SSTXCHANGE
w.append(0x76) # OP_DUP scripts.write_output_script_p2pkh(w, raw_address[2:])
w.append(0xA9) # OP_HASH160
w.append(0x14) # OP_DATA_20
write_bytes_fixed(w, raw_address[2:], 20)
w.append(0x88) # OP_EQUALVERIFY
w.append(0xAC) # OP_CHECKSIG
return w return w
# Spend from a stake revocation. # Spend from a stake revocation.
def output_script_ssrtx(pkh: bytes) -> bytearray: def write_output_script_ssrtx_prefixed(w: Writer, pkh: bytes) -> None:
utils.ensure(len(pkh) == 20) utils.ensure(len(pkh) == 20)
s = bytearray(26) write_bitcoin_varint(w, 26)
s[0] = 0xBC # OP_SSRTX w.append(0xBC) # OP_SSRTX
s[1] = 0x76 # OP_DUP scripts.write_output_script_p2pkh(w, pkh)
s[2] = 0xA9 # OP_HASH160
s[3] = 0x14 # OP_DATA_20
s[4:24] = pkh
s[24] = 0x88 # OP_EQUALVERIFY
s[25] = 0xAC # OP_CHECKSIG
return s
# Spend from a stake generation. # Spend from a stake generation.
def output_script_ssgen(pkh: bytes) -> bytearray: def write_output_script_ssgen_prefixed(w: Writer, pkh: bytes) -> None:
utils.ensure(len(pkh) == 20) utils.ensure(len(pkh) == 20)
s = bytearray(26) write_bitcoin_varint(w, 26)
s[0] = 0xBB # OP_SSGEN w.append(0xBB) # OP_SSGEN
s[1] = 0x76 # OP_DUP scripts.write_output_script_p2pkh(w, pkh)
s[2] = 0xA9 # OP_HASH160
s[3] = 0x14 # OP_DATA_20
s[4:24] = pkh # Stake commitment OPRETURN.
s[24] = 0x88 # OP_EQUALVERIFY
s[25] = 0xAC # OP_CHECKSIG
return s
# Retrieve pkh bytes from a stake commitment OPRETURN.
def sstxcommitment_pkh(pkh: bytes, amount: int) -> bytes: def sstxcommitment_pkh(pkh: bytes, amount: int) -> bytes:
w = utils.empty_bytearray(30) w = utils.empty_bytearray(30)
write_bytes_fixed(w, pkh, 20) write_bytes_fixed(w, pkh, 20)

@ -459,8 +459,7 @@ class Bitcoin:
node = self.keychain.derive(txi.address_n) node = self.keychain.derive(txi.address_n)
key_sign_pub = node.public_key() key_sign_pub = node.public_key()
script_sig = self.input_derive_script(txi, key_sign_pub, b"") self.write_tx_input_derived(self.serialized_tx, txi, key_sign_pub, b"")
self.write_tx_input(self.serialized_tx, txi, script_sig)
def sign_bip143_input(self, txi: TxInput) -> tuple[bytes, bytes]: def sign_bip143_input(self, txi: TxInput) -> tuple[bytes, bytes]:
self.tx_info.check_input(txi) self.tx_info.check_input(txi)
@ -500,14 +499,16 @@ class Bitcoin:
if txi.multisig: if txi.multisig:
# find out place of our signature based on the pubkey # find out place of our signature based on the pubkey
signature_index = multisig.multisig_pubkey_index(txi.multisig, public_key) signature_index = multisig.multisig_pubkey_index(txi.multisig, public_key)
self.serialized_tx.extend( scripts.write_witness_multisig(
scripts.witness_multisig( self.serialized_tx,
txi.multisig, signature, signature_index, self.get_hash_type(txi) txi.multisig,
) signature,
signature_index,
self.get_hash_type(txi),
) )
else: else:
self.serialized_tx.extend( scripts.write_witness_p2wpkh(
scripts.witness_p2wpkh(signature, public_key, self.get_hash_type(txi)) self.serialized_tx, signature, public_key, self.get_hash_type(txi)
) )
async def get_legacy_tx_digest( async def get_legacy_tx_digest(
@ -585,8 +586,9 @@ class Bitcoin:
signature = ecdsa_sign(node, tx_digest) signature = ecdsa_sign(node, tx_digest)
# serialize input with correct signature # serialize input with correct signature
script_sig = self.input_derive_script(txi, node.public_key(), signature) self.write_tx_input_derived(
self.write_tx_input(self.serialized_tx, txi, script_sig) self.serialized_tx, txi, node.public_key(), signature
)
self.set_serialized_signature(i, signature) self.set_serialized_signature(i, signature)
async def serialize_output(self, i: int) -> None: async def serialize_output(self, i: int) -> None:
@ -659,6 +661,26 @@ class Bitcoin:
# the fork ID value. # the fork ID value.
return self.get_sighash_type(txi) & 0xFF return self.get_sighash_type(txi) & 0xFF
def write_tx_input_derived(
self,
w: writers.Writer,
txi: TxInput,
pubkey: bytes,
signature: bytes,
) -> None:
writers.write_bytes_reversed(w, txi.prev_hash, writers.TX_HASH_SIZE)
writers.write_uint32(w, txi.prev_index)
scripts.write_input_script_prefixed(
w,
txi.script_type,
txi.multisig,
self.coin,
self.get_hash_type(txi),
pubkey,
signature,
)
writers.write_uint32(w, txi.sequence)
@staticmethod @staticmethod
def write_tx_input( def write_tx_input(
w: writers.Writer, w: writers.Writer,
@ -726,18 +748,3 @@ class Bitcoin:
assert txo.address is not None # checked in sanitize_tx_output assert txo.address is not None # checked in sanitize_tx_output
return scripts.output_derive_script(txo.address, self.coin) return scripts.output_derive_script(txo.address, self.coin)
# Tx Inputs
# ===
def input_derive_script(
self, txi: TxInput, pubkey: bytes, signature: bytes
) -> bytes:
return scripts.input_derive_script(
txi.script_type,
txi.multisig,
self.coin,
self.get_hash_type(txi),
pubkey,
signature,
)

@ -29,8 +29,7 @@ class Bitcoinlike(Bitcoin):
multisig.multisig_pubkey_index(txi.multisig, public_key) multisig.multisig_pubkey_index(txi.multisig, public_key)
# serialize input with correct signature # serialize input with correct signature
script_sig = self.input_derive_script(txi, public_key, signature) self.write_tx_input_derived(self.serialized_tx, txi, public_key, signature)
self.write_tx_input(self.serialized_tx, txi, script_sig)
self.set_serialized_signature(i_sign, signature) self.set_serialized_signature(i_sign, signature)
async def sign_nonsegwit_input(self, i_sign: int) -> None: async def sign_nonsegwit_input(self, i_sign: int) -> None:

@ -147,27 +147,6 @@ class Decred(Bitcoin):
key_sign = self.keychain.derive(txi_sign.address_n) key_sign = self.keychain.derive(txi_sign.address_n)
key_sign_pub = key_sign.public_key() key_sign_pub = key_sign.public_key()
if txi_sign.decred_staking_spend == DecredStakingSpendType.SSRTX:
prev_pkscript = scripts_decred.output_script_ssrtx(
ecdsa_hash_pubkey(key_sign_pub, self.coin)
)
elif txi_sign.decred_staking_spend == DecredStakingSpendType.SSGen:
prev_pkscript = scripts_decred.output_script_ssgen(
ecdsa_hash_pubkey(key_sign_pub, self.coin)
)
elif txi_sign.script_type == InputScriptType.SPENDMULTISIG:
assert txi_sign.multisig is not None
prev_pkscript = scripts_decred.output_script_multisig(
multisig.multisig_get_pubkeys(txi_sign.multisig),
txi_sign.multisig.m,
)
elif txi_sign.script_type == InputScriptType.SPENDADDRESS:
prev_pkscript = scripts_decred.output_script_p2pkh(
ecdsa_hash_pubkey(key_sign_pub, self.coin)
)
else:
raise wire.DataError("Unsupported input script type")
h_witness = self.create_hash_writer() h_witness = self.create_hash_writer()
writers.write_uint32( writers.write_uint32(
h_witness, self.tx_info.tx.version | DECRED_SERIALIZE_WITNESS_SIGNING h_witness, self.tx_info.tx.version | DECRED_SERIALIZE_WITNESS_SIGNING
@ -176,7 +155,30 @@ class Decred(Bitcoin):
for ii in range(self.tx_info.tx.inputs_count): for ii in range(self.tx_info.tx.inputs_count):
if ii == i_sign: if ii == i_sign:
writers.write_bytes_prefixed(h_witness, prev_pkscript) if txi_sign.decred_staking_spend == DecredStakingSpendType.SSRTX:
scripts_decred.write_output_script_ssrtx_prefixed(
h_witness, ecdsa_hash_pubkey(key_sign_pub, self.coin)
)
elif txi_sign.decred_staking_spend == DecredStakingSpendType.SSGen:
scripts_decred.write_output_script_ssgen_prefixed(
h_witness, ecdsa_hash_pubkey(key_sign_pub, self.coin)
)
elif txi_sign.script_type == InputScriptType.SPENDMULTISIG:
assert txi_sign.multisig is not None
scripts_decred.write_output_script_multisig(
h_witness,
multisig.multisig_get_pubkeys(txi_sign.multisig),
txi_sign.multisig.m,
prefixed=True,
)
elif txi_sign.script_type == InputScriptType.SPENDADDRESS:
scripts_decred.write_output_script_p2pkh(
h_witness,
ecdsa_hash_pubkey(key_sign_pub, self.coin),
prefixed=True,
)
else:
raise wire.DataError("Unsupported input script type")
else: else:
write_bitcoin_varint(h_witness, 0) write_bitcoin_varint(h_witness, 0)
@ -193,8 +195,9 @@ class Decred(Bitcoin):
signature = ecdsa_sign(key_sign, sig_hash) signature = ecdsa_sign(key_sign, sig_hash)
# serialize input with correct signature # serialize input with correct signature
script_sig = self.input_derive_script(txi_sign, key_sign_pub, signature) self.write_tx_input_witness(
self.write_tx_input_witness(self.serialized_tx, txi_sign, script_sig) self.serialized_tx, txi_sign, key_sign_pub, signature
)
self.set_serialized_signature(i_sign, signature) self.set_serialized_signature(i_sign, signature)
async def step5_serialize_outputs(self) -> None: async def step5_serialize_outputs(self) -> None:
@ -306,17 +309,13 @@ class Decred(Bitcoin):
writers.write_uint32(w, tx.expiry) writers.write_uint32(w, tx.expiry)
def write_tx_input_witness( def write_tx_input_witness(
self, w: writers.Writer, i: TxInput, script_sig: bytes self, w: writers.Writer, txi: TxInput, pubkey: bytes, signature: bytes
) -> None: ) -> None:
writers.write_uint64(w, i.amount) writers.write_uint64(w, txi.amount)
writers.write_uint32(w, 0) # block height fraud proof writers.write_uint32(w, 0) # block height fraud proof
writers.write_uint32(w, 0xFFFF_FFFF) # block index fraud proof writers.write_uint32(w, 0xFFFF_FFFF) # block index fraud proof
writers.write_bytes_prefixed(w, script_sig) scripts_decred.write_input_script_prefixed(
w,
def input_derive_script(
self, txi: TxInput, pubkey: bytes, signature: bytes
) -> bytes:
return scripts_decred.input_derive_script(
txi.script_type, txi.script_type,
txi.multisig, txi.multisig,
self.coin, self.coin,

@ -76,10 +76,9 @@ class Bip143Hash:
writers.write_uint32(h_preimage, txi.prev_index) writers.write_uint32(h_preimage, txi.prev_index)
# scriptCode # scriptCode
script_code = scripts.bip143_derive_script_code( scripts.write_bip143_script_code_prefixed(
txi, public_keys, threshold, coin h_preimage, txi, public_keys, threshold, coin
) )
writers.write_bytes_prefixed(h_preimage, script_code)
# amount # amount
writers.write_uint64(h_preimage, txi.amount) writers.write_uint64(h_preimage, txi.amount)

@ -11,8 +11,6 @@ from .matchcheck import MultisigFingerprintChecker, WalletPathChecker
if False: if False:
from typing import Protocol from typing import Protocol
from trezor.messages import ( from trezor.messages import (
PrevInput,
PrevOutput,
PrevTx, PrevTx,
SignTx, SignTx,
TxInput, TxInput,
@ -39,22 +37,6 @@ if False:
) -> None: ) -> None:
... ...
@staticmethod
def write_tx_input(
w: writers.Writer,
txi: TxInput | PrevInput,
script: bytes,
) -> None:
...
@staticmethod
def write_tx_output(
w: writers.Writer,
txo: TxOutput | PrevOutput,
script_pubkey: bytes,
) -> None:
...
async def write_prev_tx_footer( async def write_prev_tx_footer(
self, w: writers.Writer, tx: PrevTx, prev_hash: bytes self, w: writers.Writer, tx: PrevTx, prev_hash: bytes
) -> None: ) -> None:
@ -167,7 +149,7 @@ class OriginalTxInfo(TxInfoBase):
def add_input(self, txi: TxInput) -> None: def add_input(self, txi: TxInput) -> None:
super().add_input(txi) super().add_input(txi)
self.signer.write_tx_input(self.h_tx, txi, txi.script_sig or bytes()) writers.write_tx_input(self.h_tx, txi, txi.script_sig or bytes())
# For verification use the first original input that specifies address_n. # For verification use the first original input that specifies address_n.
if not self.verification_input and txi.address_n: if not self.verification_input and txi.address_n:
@ -180,7 +162,7 @@ class OriginalTxInfo(TxInfoBase):
if self.index == 0: if self.index == 0:
writers.write_bitcoin_varint(self.h_tx, self.tx.outputs_count) writers.write_bitcoin_varint(self.h_tx, self.tx.outputs_count)
self.signer.write_tx_output(self.h_tx, txo, script_pubkey) writers.write_tx_output(self.h_tx, txo, script_pubkey)
async def finalize_tx_hash(self) -> None: async def finalize_tx_hash(self) -> None:
await self.signer.write_prev_tx_footer(self.h_tx, self.tx, self.orig_hash) await self.signer.write_prev_tx_footer(self.h_tx, self.tx, self.orig_hash)

@ -3,7 +3,6 @@ from micropython import const
from trezor import wire from trezor import wire
from trezor.crypto.hashlib import blake2b from trezor.crypto.hashlib import blake2b
from trezor.enums import InputScriptType
from trezor.messages import PrevTx, SignTx, TxInput, TxOutput from trezor.messages import PrevTx, SignTx, TxInput, TxOutput
from trezor.utils import HashWriter, ensure from trezor.utils import HashWriter, ensure
@ -11,13 +10,11 @@ from apps.common.coininfo import CoinInfo
from apps.common.keychain import Keychain from apps.common.keychain import Keychain
from apps.common.writers import write_bitcoin_varint from apps.common.writers import write_bitcoin_varint
from ..common import ecdsa_hash_pubkey from ..scripts import write_bip143_script_code_prefixed
from ..scripts import output_script_multisig, output_script_p2pkh
from ..writers import ( from ..writers import (
TX_HASH_SIZE, TX_HASH_SIZE,
get_tx_hash, get_tx_hash,
write_bytes_fixed, write_bytes_fixed,
write_bytes_prefixed,
write_bytes_reversed, write_bytes_reversed,
write_tx_output, write_tx_output,
write_uint32, write_uint32,
@ -97,8 +94,7 @@ class Zip243Hash:
write_bytes_reversed(h_preimage, txi.prev_hash, TX_HASH_SIZE) write_bytes_reversed(h_preimage, txi.prev_hash, TX_HASH_SIZE)
write_uint32(h_preimage, txi.prev_index) write_uint32(h_preimage, txi.prev_index)
# 13b. scriptCode # 13b. scriptCode
script_code = derive_script_code(txi, public_keys, threshold, coin) write_bip143_script_code_prefixed(h_preimage, txi, public_keys, threshold, coin)
write_bytes_prefixed(h_preimage, script_code)
# 13c. value # 13c. value
write_uint64(h_preimage, txi.amount) write_uint64(h_preimage, txi.amount)
# 13d. nSequence # 13d. nSequence
@ -174,17 +170,3 @@ class Zcashlike(Bitcoinlike):
write_uint32(w, tx.lock_time) write_uint32(w, tx.lock_time)
if tx.version >= 3: if tx.version >= 3:
write_uint32(w, tx.expiry) # expiryHeight write_uint32(w, tx.expiry) # expiryHeight
def derive_script_code(
txi: TxInput, public_keys: list[bytes], threshold: int, coin: CoinInfo
) -> bytearray:
if len(public_keys) > 1:
return output_script_multisig(public_keys, threshold)
p2pkh = txi.script_type in (InputScriptType.SPENDADDRESS, InputScriptType.EXTERNAL)
if p2pkh:
return output_script_p2pkh(ecdsa_hash_pubkey(public_keys[0], coin))
else:
raise wire.DataError("Unknown input script type for zip143 script code")

@ -1,12 +1,10 @@
from trezor import wire from trezor import utils, wire
from trezor.crypto import der from trezor.crypto import der
from trezor.crypto.curve import secp256k1 from trezor.crypto.curve import secp256k1
from trezor.crypto.hashlib import sha256 from trezor.crypto.hashlib import sha256
from .common import ecdsa_hash_pubkey from .common import ecdsa_hash_pubkey
from .scripts import ( from .scripts import (
input_script_p2wpkh_in_p2sh,
input_script_p2wsh_in_p2sh,
output_script_native_p2wpkh_or_p2wsh, output_script_native_p2wpkh_or_p2wsh,
output_script_p2pkh, output_script_p2pkh,
output_script_p2sh, output_script_p2sh,
@ -15,6 +13,8 @@ from .scripts import (
parse_output_script_multisig, parse_output_script_multisig,
parse_witness_multisig, parse_witness_multisig,
parse_witness_p2wpkh, parse_witness_p2wpkh,
write_input_script_p2wpkh_in_p2sh,
write_input_script_p2wsh_in_p2sh,
) )
if False: if False:
@ -56,7 +56,9 @@ class SignatureVerifier:
if len(script_sig) == 23: # P2WPKH nested in BIP16 P2SH if len(script_sig) == 23: # P2WPKH nested in BIP16 P2SH
public_key, signature, hash_type = parse_witness_p2wpkh(witness) public_key, signature, hash_type = parse_witness_p2wpkh(witness)
pubkey_hash = ecdsa_hash_pubkey(public_key, coin) pubkey_hash = ecdsa_hash_pubkey(public_key, coin)
if input_script_p2wpkh_in_p2sh(pubkey_hash) != script_sig: w = utils.empty_bytearray(23)
write_input_script_p2wpkh_in_p2sh(w, pubkey_hash)
if w != script_sig:
raise wire.DataError("Invalid public key hash") raise wire.DataError("Invalid public key hash")
script_hash = coin.script_hash(script_sig[1:]) script_hash = coin.script_hash(script_sig[1:])
if output_script_p2sh(script_hash) != script_pubkey: if output_script_p2sh(script_hash) != script_pubkey:
@ -66,7 +68,9 @@ class SignatureVerifier:
elif len(script_sig) == 35: # P2WSH nested in BIP16 P2SH elif len(script_sig) == 35: # P2WSH nested in BIP16 P2SH
script, self.signatures = parse_witness_multisig(witness) script, self.signatures = parse_witness_multisig(witness)
script_hash = sha256(script).digest() script_hash = sha256(script).digest()
if input_script_p2wsh_in_p2sh(script_hash) != script_sig: w = utils.empty_bytearray(35)
write_input_script_p2wsh_in_p2sh(w, script_hash)
if w != script_sig:
raise wire.DataError("Invalid script hash") raise wire.DataError("Invalid script hash")
script_hash = coin.script_hash(script_sig[1:]) script_hash = coin.script_hash(script_sig[1:])
if output_script_p2sh(script_hash) != script_pubkey: if output_script_p2sh(script_hash) != script_pubkey:

@ -79,6 +79,18 @@ def write_op_push(w: Writer, n: int) -> None:
w.append((n >> 24) & 0xFF) w.append((n >> 24) & 0xFF)
def op_push_length(n: int) -> int:
ensure(n >= 0 and n <= 0xFFFF_FFFF)
if n < 0x4C:
return 1
elif n < 0xFF:
return 2
elif n < 0xFFFF:
return 3
else:
return 4
def get_tx_hash(w: HashWriter, double: bool = False, reverse: bool = False) -> bytes: def get_tx_hash(w: HashWriter, double: bool = False, reverse: bool = False) -> bytes:
d = w.get_digest() d = w.get_digest()
if double: if double:

Loading…
Cancel
Save