1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2024-12-22 22:38:08 +00:00

Merge pull request #85 from trezor/tsusanka/wip-segwit

Segwit support for sign_tx and get_address
This commit is contained in:
Jan Pochyla 2017-11-24 15:01:46 +01:00 committed by GitHub
commit c5cd844bef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 2027 additions and 446 deletions

View File

@ -28,13 +28,12 @@ def strip(address_type, raw_address):
def split(coin, raw_address):
l = None
for f in ['', '_p2sh', '_p2wpkh', '_p2wsh']:
at = getattr(coin, 'address_type' + f)
for f in ('address_type',
'address_type_p2sh',
'address_type_p2wpkh',
'address_type_p2wsh'):
at = getattr(coin, f)
if at is not None and check(at, raw_address):
l = length(coin.address_type)
break
if l is not None:
return raw_address[:l], raw_address[l:]
else:
raise ValueError('Invalid addressXXX')
l = length(at)
return raw_address[:l], raw_address[l:]
raise ValueError('Invalid address')

View File

@ -1,127 +1,257 @@
from trezor.messages.CoinType import CoinType
# the following list is generated using tools/coins-gen.py
# the following list is generated using tools/codegen/gen_coins.py
# do not edit manually!
COINS = [
CoinType(
coin_name='Bitcoin',
coin_shortcut='BTC',
coin_label='Bitcoin',
address_type=0,
address_type_p2sh=5,
maxfee_kb=500000,
minfee_kb=1000,
signed_message_header='Bitcoin Signed Message:\n',
xpub_magic=0x0488b21e,
hash_genesis_block='000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f',
xprv_magic=0x0488ade4,
xpub_magic=0x0488b21e,
bech32_prefix='bc',
bip44=0,
segwit=False,
segwit=True,
forkid=None,
default_fee_b={'Low': 10, 'Economy': 70, 'Normal': 140, 'High': 200},
dust_limit=546,
blocktime_minutes=10,
firmware='stable',
address_prefix='bitcoin:',
min_address_length=27,
max_address_length=34,
bitcore=['https://btc-bitcore3.trezor.io', 'https://btc-bitcore1.trezor.io'],
),
CoinType(
coin_name='Testnet',
coin_shortcut='TEST',
coin_label='Testnet',
address_type=111,
address_type_p2sh=196,
maxfee_kb=10000000,
minfee_kb=1000,
signed_message_header='Bitcoin Signed Message:\n',
xpub_magic=0x043587cf,
hash_genesis_block='000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943',
xprv_magic=0x04358394,
xpub_magic=0x043587cf,
bech32_prefix='tb',
bip44=1,
segwit=True,
forkid=None,
default_fee_b={'Normal': 10},
dust_limit=546,
blocktime_minutes=10,
firmware='stable',
address_prefix='bitcoin:',
min_address_length=27,
max_address_length=34,
bitcore=['https://testnet-bitcore3.trezor.io', 'https://testnet-bitcore4.trezor.io'],
),
CoinType(
coin_name='Bcash',
coin_shortcut='BCH',
coin_label='Bitcoin Cash',
address_type=0,
address_type_p2sh=5,
maxfee_kb=500000,
minfee_kb=1000,
signed_message_header='Bitcoin Signed Message:\n',
xpub_magic=0x0488b21e,
hash_genesis_block='000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f',
xprv_magic=0x0488ade4,
xpub_magic=0x0488b21e,
bech32_prefix=None,
bip44=145,
segwit=False,
forkid=0,
default_fee_b={'Low': 10, 'Economy': 70, 'Normal': 140, 'High': 200},
dust_limit=546,
blocktime_minutes=10,
firmware='stable',
address_prefix='bitcoincash:',
min_address_length=27,
max_address_length=34,
bitcore=['https://bch-bitcore2.trezor.io'],
),
CoinType(
coin_name='Bcash Testnet',
coin_shortcut='TBCH',
coin_label='Bitcoin Cash Testnet',
address_type=111,
address_type_p2sh=196,
maxfee_kb=10000000,
minfee_kb=1000,
signed_message_header='Bitcoin Signed Message:\n',
xpub_magic=0x043587cf,
hash_genesis_block='000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943',
xprv_magic=0x04358394,
xpub_magic=0x043587cf,
bech32_prefix=None,
bip44=1,
segwit=False,
forkid=0,
default_fee_b={'Normal': 10},
dust_limit=546,
blocktime_minutes=10,
firmware='debug',
address_prefix='bitcoincash:',
min_address_length=27,
max_address_length=34,
bitcore=[],
),
CoinType(
coin_name='Namecoin',
coin_shortcut='NMC',
coin_label='Namecoin',
address_type=52,
address_type_p2sh=5,
maxfee_kb=10000000,
minfee_kb=1000,
signed_message_header='Namecoin Signed Message:\n',
xpub_magic=0x019da462,
hash_genesis_block='000000000062b72c5e2ceb45fbc8587e807c155b0da735e6483dfba2f0a9c770',
xprv_magic=0x019d9cfe,
xpub_magic=0x019da462,
bech32_prefix=None,
bip44=7,
segwit=False,
forkid=None,
default_fee_b={'Normal': 10},
dust_limit=2940,
blocktime_minutes=10,
firmware='stable',
address_prefix='namecoin:',
min_address_length=27,
max_address_length=34,
bitcore=[],
),
CoinType(
coin_name='Litecoin',
coin_shortcut='LTC',
coin_label='Litecoin',
address_type=48,
address_type_p2sh=50,
maxfee_kb=40000000,
minfee_kb=100000,
signed_message_header='Litecoin Signed Message:\n',
xpub_magic=0x019da462,
hash_genesis_block='12a765e31ffd4059bada1e25190f6e98c99d9714d334efa41a195a7e7e04bfe2',
xprv_magic=0x019d9cfe,
xpub_magic=0x019da462,
bech32_prefix='ltc',
bip44=2,
segwit=True,
forkid=None,
default_fee_b={'Normal': 1000},
dust_limit=54600,
blocktime_minutes=2.5,
firmware='stable',
address_prefix='litecoin:',
min_address_length=27,
max_address_length=34,
bitcore=['https://ltc-bitcore3.trezor.io'],
),
CoinType(
coin_name='Dogecoin',
coin_shortcut='DOGE',
coin_label='Dogecoin',
address_type=30,
address_type_p2sh=22,
maxfee_kb=1000000000,
minfee_kb=1000,
signed_message_header='Dogecoin Signed Message:\n',
xpub_magic=0x02facafd,
hash_genesis_block='1a91e3dace36e2be3bf030a65679fe821aa1d6ef92e7c9902eb318182c355691',
xprv_magic=0x02fac398,
xpub_magic=0x02facafd,
bech32_prefix=None,
bip44=3,
segwit=False,
forkid=None,
default_fee_b={'Normal': 10},
dust_limit=10000000,
blocktime_minutes=1,
firmware='stable',
address_prefix='dogecoin:',
min_address_length=27,
max_address_length=34,
bitcore=[],
),
CoinType(
coin_name='Dash',
coin_shortcut='DASH',
coin_label='Dash',
address_type=76,
address_type_p2sh=16,
maxfee_kb=100000,
minfee_kb=10000,
signed_message_header='DarkCoin Signed Message:\n',
xpub_magic=0x02fe52cc,
hash_genesis_block='00000ffd590b1485b3caadc19b22e6379c733355108f107a430458cdf3407ab6',
xprv_magic=0x02fe52f8,
xpub_magic=0x02fe52cc,
bech32_prefix=None,
bip44=5,
segwit=False,
forkid=None,
default_fee_b={'Normal': 10},
dust_limit=5460,
blocktime_minutes=2.5,
firmware='stable',
address_prefix='dash:',
min_address_length=27,
max_address_length=34,
bitcore=['https://dash-bitcore1.trezor.io', 'https://dash-bitcore3.trezor.io'],
),
CoinType(
coin_name='Zcash',
coin_shortcut='ZEC',
coin_label='Zcash',
address_type=7352,
address_type_p2sh=7357,
maxfee_kb=1000000,
minfee_kb=1000,
signed_message_header='Zcash Signed Message:\n',
xpub_magic=0x0488b21e,
hash_genesis_block='00040fe8ec8471911baa1db1266ea15dd06b4a8a5c453883c000b031973dce08',
xprv_magic=0x0488ade4,
xpub_magic=0x0488b21e,
bech32_prefix=None,
bip44=133,
segwit=False,
forkid=None,
default_fee_b={'Normal': 10},
dust_limit=546,
blocktime_minutes=2.5,
firmware='stable',
address_prefix='zcash:',
min_address_length=35,
max_address_length=95,
bitcore=['https://zec-bitcore1.trezor.io/'],
),
CoinType(
coin_name='Zcash Testnet',
coin_shortcut='TAZ',
coin_label='Zcash Testnet',
address_type=7461,
address_type_p2sh=7354,
maxfee_kb=10000000,
minfee_kb=1000,
signed_message_header='Zcash Signed Message:\n',
xpub_magic=0x043587cf,
hash_genesis_block='05a60a92d99d85997cce3b87616c089f6124d7342af37106edc76126334a2c38',
xprv_magic=0x04358394,
xpub_magic=0x043587cf,
bech32_prefix=None,
bip44=1,
segwit=False,
forkid=None,
default_fee_b={'Normal': 10},
dust_limit=546,
blocktime_minutes=2.5,
firmware='debug',
address_prefix='zcash:',
min_address_length=35,
max_address_length=95,
bitcore=[],
),
]

View File

@ -8,17 +8,19 @@ async def layout_get_address(ctx, msg):
from trezor.messages.FailureType import ProcessError
from ..common import coins
from ..common import seed
from ..wallet.sign_tx import addresses
if msg.multisig:
raise wire.FailureError(ProcessError, 'GetAddress.multisig is unsupported')
address_n = msg.address_n or ()
coin_name = msg.coin_name or 'Bitcoin'
coin = coins.by_name(coin_name)
node = await seed.get_root(ctx)
node.derive_path(address_n)
coin = coins.by_name(coin_name)
address = node.address(coin.address_type)
address = addresses.get_address(msg.script_type, coin, node)
if msg.show_display:
await _show_address(ctx, address)

View File

@ -20,6 +20,8 @@ async def sign_tx(ctx, msg):
req = signer.send(res)
except signing.SigningError as e:
raise wire.FailureError(*e.args)
except signing.AddressError as e:
raise wire.FailureError(*e.args)
if req.__qualname__ == 'TxRequest':
if req.request_type == TXFINISHED:
break

View File

@ -0,0 +1,84 @@
from micropython import const
from trezor.crypto.hashlib import sha256, ripemd160
from trezor.crypto import base58, bech32
from trezor.utils import ensure
from trezor.messages.CoinType import CoinType
from trezor.messages import FailureType
from trezor.messages import InputScriptType
# supported witness version for bech32 addresses
_BECH32_WITVER = const(0x00)
class AddressError(Exception):
pass
def get_address(script_type: InputScriptType, coin: CoinType, node) -> str:
if script_type == InputScriptType.SPENDADDRESS: # p2pkh
return node.address(coin.address_type)
elif script_type == InputScriptType.SPENDWITNESS: # native p2wpkh
if not coin.segwit or not coin.bech32_prefix:
raise AddressError(FailureType.ProcessError,
'Segwit not enabled on this coin')
return address_p2wpkh(node.public_key(), coin.bech32_prefix)
elif script_type == InputScriptType.SPENDP2SHWITNESS: # p2wpkh using p2sh
if not coin.segwit or not coin.address_type_p2sh:
raise AddressError(FailureType.ProcessError,
'Segwit not enabled on this coin')
return address_p2wpkh_in_p2sh(node.public_key(), coin.address_type_p2sh)
else:
raise AddressError(FailureType.ProcessError,
'Invalid script type')
def address_p2wpkh_in_p2sh(pubkey: bytes, addrtype: int) -> str:
s = bytearray(21)
s[0] = addrtype
s[1:21] = address_p2wpkh_in_p2sh_raw(pubkey)
return base58.encode_check(bytes(s))
def address_p2wpkh_in_p2sh_raw(pubkey: bytes) -> bytes:
s = bytearray(22)
s[0] = 0x00 # OP_0
s[1] = 0x14 # pushing 20 bytes
s[2:22] = ecdsa_hash_pubkey(pubkey)
h = sha256(s).digest()
h = ripemd160(h).digest()
return h
def address_p2wpkh(pubkey: bytes, hrp: str) -> str:
pubkeyhash = ecdsa_hash_pubkey(pubkey)
address = bech32.encode(hrp, _BECH32_WITVER, pubkeyhash)
if address is None:
raise AddressError(FailureType.ProcessError,
'Invalid address')
return address
def decode_bech32_address(prefix: str, address: str) -> bytes:
witver, raw = bech32.decode(prefix, address)
if witver != _BECH32_WITVER:
raise AddressError(FailureType.ProcessError,
'Invalid address witness program')
return bytes(raw)
def ecdsa_hash_pubkey(pubkey: bytes) -> bytes:
if pubkey[0] == 0x04:
ensure(len(pubkey) == 65) # uncompressed format
elif pubkey[0] == 0x00:
ensure(len(pubkey) == 1) # point at infinity
else:
ensure(len(pubkey) == 33) # compresssed format
h = sha256(pubkey).digest()
h = ripemd160(h).digest()
return h

View File

@ -0,0 +1,122 @@
from trezor.messages.CoinType import CoinType
from trezor.messages.TxOutputType import TxOutputType
from trezor.messages.TxOutputBinType import TxOutputBinType
from trezor.messages.TxInputType import TxInputType
from trezor.messages.SignTx import SignTx
from trezor.messages.TxRequest import TxRequest
from trezor.messages.TransactionType import TransactionType
from trezor.messages.RequestType import TXINPUT, TXOUTPUT, TXMETA, TXFINISHED
from trezor.messages import InputScriptType
# Machine instructions
# ===
class UiConfirmOutput:
def __init__(self, output: TxOutputType, coin: CoinType):
self.output = output
self.coin = coin
class UiConfirmTotal:
def __init__(self, spending: int, fee: int, coin: CoinType):
self.spending = spending
self.fee = fee
self.coin = coin
class UiConfirmFeeOverThreshold:
def __init__(self, fee: int, coin: CoinType):
self.fee = fee
self.coin = coin
def confirm_output(output: TxOutputType, coin: CoinType):
return (yield UiConfirmOutput(output, coin))
def confirm_total(spending: int, fee: int, coin: CoinType):
return (yield UiConfirmTotal(spending, fee, coin))
def confirm_feeoverthreshold(fee: int, coin: CoinType):
return (yield UiConfirmFeeOverThreshold(fee, coin))
def request_tx_meta(tx_req: TxRequest, tx_hash: bytes=None):
tx_req.request_type = TXMETA
tx_req.details.tx_hash = tx_hash
tx_req.details.request_index = None
ack = yield tx_req
tx_req.serialized = None
return sanitize_tx_meta(ack.tx)
def request_tx_input(tx_req: TxRequest, i: int, tx_hash: bytes=None):
tx_req.request_type = TXINPUT
tx_req.details.request_index = i
tx_req.details.tx_hash = tx_hash
ack = yield tx_req
tx_req.serialized = None
return sanitize_tx_input(ack.tx)
def request_tx_output(tx_req: TxRequest, i: int, tx_hash: bytes=None):
tx_req.request_type = TXOUTPUT
tx_req.details.request_index = i
tx_req.details.tx_hash = tx_hash
ack = yield tx_req
tx_req.serialized = None
if tx_hash is None:
return sanitize_tx_output(ack.tx)
else:
return sanitize_tx_binoutput(ack.tx)
def request_tx_finish(tx_req: TxRequest):
tx_req.request_type = TXFINISHED
tx_req.details = None
yield tx_req
tx_req.serialized = None
# Data sanitizers
# ===
def sanitize_sign_tx(tx: SignTx) -> SignTx:
tx.version = tx.version if tx.version is not None else 1
tx.lock_time = tx.lock_time if tx.lock_time is not None else 0
tx.inputs_count = tx.inputs_count if tx.inputs_count is not None else 0
tx.outputs_count = tx.outputs_count if tx.outputs_count is not None else 0
tx.coin_name = tx.coin_name if tx.coin_name is not None else 'Bitcoin'
return tx
def sanitize_tx_meta(tx: TransactionType) -> TransactionType:
tx.version = tx.version if tx.version is not None else 1
tx.lock_time = tx.lock_time if tx.lock_time is not None else 0
tx.inputs_cnt = tx.inputs_cnt if tx.inputs_cnt is not None else 0
tx.outputs_cnt = tx.outputs_cnt if tx.outputs_cnt is not None else 0
return tx
def sanitize_tx_input(tx: TransactionType) -> TxInputType:
txi = tx.inputs[0]
if txi.script_type is None:
txi.script_type = InputScriptType.SPENDADDRESS
if txi.sequence is None:
txi.sequence = 0xffffffff
return txi
def sanitize_tx_output(tx: TransactionType) -> TxOutputType:
return tx.outputs[0]
def sanitize_tx_binoutput(tx: TransactionType) -> TxOutputBinType:
return tx.bin_outputs[0]

View File

@ -2,6 +2,7 @@ from trezor import ui
from trezor.utils import chunks
from trezor.ui.text import Text
from trezor.messages import ButtonRequestType
from trezor.messages import OutputScriptType
from apps.common.confirm import confirm
from apps.common.confirm import hold_to_confirm
@ -15,11 +16,14 @@ def split_address(address):
async def confirm_output(ctx, output, coin):
# TODO: handle OP_RETURN correctly
if output.script_type == OutputScriptType.PAYTOOPRETURN:
address = 'OP_RETURN' # TODO: handle OP_RETURN correctly
else:
address = output.address
content = Text('Confirm output', ui.ICON_RESET,
ui.BOLD, format_amount(output.amount, coin),
ui.NORMAL, 'to',
ui.MONO, *split_address(output.address))
ui.MONO, *split_address(address))
return await confirm(ctx, content, ButtonRequestType.ConfirmOutput)

View File

@ -0,0 +1,120 @@
from apps.wallet.sign_tx.writers import *
# TX Scripts
# ===
# -------------------------- First gen --------------------------
# =============== P2PK ===============
# obsolete
# =============== P2PKH ===============
def input_script_p2pkh_or_p2sh(pubkey: bytes, signature: bytes) -> bytearray:
w = bytearray_with_cap(5 + len(signature) + 1 + 5 + len(pubkey))
append_signature_and_pubkey(w, pubkey, signature)
return w
def output_script_p2pkh(pubkeyhash: bytes) -> bytearray:
s = bytearray(25)
s[0] = 0x76 # OP_DUP
s[1] = 0xA9 # OP_HASH_160
s[2] = 0x14 # pushing 20 bytes
s[3:23] = pubkeyhash
s[23] = 0x88 # OP_EQUALVERIFY
s[24] = 0xAC # OP_CHECKSIG
return s
# =============== P2SH ===============
# see https://github.com/bitcoin/bips/blob/master/bip-0016.mediawiki
# input script (scriptSig) is the same as input_script_p2pkh_or_p2sh
# output script (scriptPubKey) is A9 14 <scripthash> 87
def output_script_p2sh(scripthash: bytes) -> bytearray:
s = bytearray(23)
s[0] = 0xA9 # OP_HASH_160
s[1] = 0x14 # pushing 20 bytes
s[2:22] = scripthash
s[22] = 0x87 # OP_EQUAL
return s
# -------------------------- SegWit --------------------------
# =============== Native P2WPKH ===============
# see https://github.com/bitcoin/bips/blob/master/bip-0141.mediawiki#p2wpkh
# P2WPKH (Pay-to-Witness-Public-Key-Hash) is the segwit native P2PKH
# not backwards compatible
# input script is completely replaced by the witness and therefore empty
def input_script_native_p2wpkh_or_p2wsh() -> bytearray:
return bytearray(0)
# output script is either:
# 00 14 <20-byte-key-hash>
# 00 20 <32-byte-script-hash>
def output_script_native_p2wpkh_or_p2wsh(witprog: bytes) -> bytearray:
w = bytearray_with_cap(3 + len(witprog))
w.append(0x00) # witness version byte
w.append(len(witprog)) # pub key hash length is 20 (P2WPKH) or 32 (P2WSH) bytes
write_bytes(w, witprog) # pub key hash
return w
# =============== Native P2WPKH nested in P2SH ===============
# P2WPKH is nested in P2SH to be backwards compatible
# see https://github.com/bitcoin/bips/blob/master/bip-0141.mediawiki#witness-program
# input script (scriptSig) is 16 00 14 <pubkeyhash>
# signature is moved to the witness
def input_script_p2wpkh_in_p2sh(pubkeyhash: bytes) -> bytearray:
w = bytearray_with_cap(3 + len(pubkeyhash))
w.append(0x16) # 0x16 - length of the redeemScript
w.append(0x00) # witness version byte
w.append(0x14) # P2WPKH witness program (pub key hash length)
write_bytes(w, pubkeyhash) # pub key hash
return w
# output script (scriptPubKey) is A9 14 <scripthash> 87
# which is same as the output_script_p2sh
# =============== Native P2WSH ===============
# see https://github.com/bitcoin/bips/blob/master/bip-0141.mediawiki#p2wsh
# P2WSH (Pay-to-Witness-Script-Hash) is segwit native P2SH
# not backwards compatible
# input script is completely replaced by the witness and therefore empty
# same as input_script_native_p2wpkh_or_p2wsh
# output script consists of 00 20 <32-byte-key-hash>
# same as output_script_native_p2wpkh_or_p2wsh (only different length)
# -------------------------- Others --------------------------
# === OP_RETURN script
def output_script_paytoopreturn(data: bytes) -> bytearray:
w = bytearray_with_cap(1 + 5 + len(data))
w.append(0x6A) # OP_RETURN
write_op_push(w, len(data))
w.extend(data)
return w
# === helpers
def append_signature_and_pubkey(w: bytearray, pubkey: bytes, signature: bytes) -> bytearray:
write_op_push(w, len(signature) + 1)
write_bytes(w, signature)
w.append(0x01) # SIGHASH_ALL
write_op_push(w, len(pubkey))
write_bytes(w, pubkey)
return w

View File

@ -0,0 +1,76 @@
from trezor.crypto.hashlib import sha256
from trezor.messages.SignTx import SignTx
from trezor.messages import InputScriptType, FailureType
from apps.wallet.sign_tx.writers import *
class Bip143Error(ValueError):
pass
class Bip143:
def __init__(self):
self.h_prevouts = HashWriter(sha256)
self.h_sequence = HashWriter(sha256)
self.h_outputs = HashWriter(sha256)
def add_prevouts(self, txi: TxInputType):
write_bytes_rev(self.h_prevouts, txi.prev_hash)
write_uint32(self.h_prevouts, txi.prev_index)
def get_prevouts_hash(self) -> bytes:
return get_tx_hash(self.h_prevouts, True)
def add_sequence(self, txi: TxInputType):
write_uint32(self.h_sequence, txi.sequence)
def get_sequence_hash(self) -> bytes:
return get_tx_hash(self.h_sequence, True)
def add_output(self, txo_bin: TxOutputBinType):
write_tx_output(self.h_outputs, txo_bin)
def get_outputs_hash(self) -> bytes:
return get_tx_hash(self.h_outputs, True)
def preimage_hash(self, tx: SignTx, txi: TxInputType, pubkeyhash) -> bytes:
h_preimage = HashWriter(sha256)
write_uint32(h_preimage, tx.version) # nVersion
write_bytes(h_preimage, bytearray(self.get_prevouts_hash())) # hashPrevouts
write_bytes(h_preimage, bytearray(self.get_sequence_hash())) # hashSequence
write_bytes_rev(h_preimage, txi.prev_hash) # outpoint
write_uint32(h_preimage, txi.prev_index) # outpoint
script_code = self.derive_script_code(txi, pubkeyhash)
write_varint(h_preimage, len(script_code)) # scriptCode length
write_bytes(h_preimage, script_code) # scriptCode
write_uint64(h_preimage, txi.amount) # amount
write_uint32(h_preimage, txi.sequence) # nSequence
write_bytes(h_preimage, bytearray(self.get_outputs_hash())) # hashOutputs
write_uint32(h_preimage, tx.lock_time) # nLockTime
write_uint32(h_preimage, 0x00000001) # nHashType - only SIGHASH_ALL currently
return get_tx_hash(h_preimage, True)
# this not redeemScript nor scriptPubKey
# for P2WPKH this is always 0x1976a914{20-byte-pubkey-hash}88ac
def derive_script_code(self, txi: TxInputType, pubkeyhash: bytes) -> bytearray:
# p2wpkh in p2sh or native p2wpkh
is_segwit = (txi.script_type == InputScriptType.SPENDWITNESS or
txi.script_type == InputScriptType.SPENDP2SHWITNESS)
if is_segwit:
s = bytearray(25)
s[0] = 0x76 # OP_DUP
s[1] = 0xA9 # OP_HASH_160
s[2] = 0x14 # pushing 20 bytes
s[3:23] = pubkeyhash
s[23] = 0x88 # OP_EQUALVERIFY
s[24] = 0xAC # OP_CHECKSIG
return s
else:
raise Bip143Error(FailureType.DataError,
'Unknown input script type for bip143 script code')

View File

@ -1,197 +1,117 @@
from trezor.crypto.hashlib import sha256, ripemd160
from micropython import const
from trezor.crypto.hashlib import sha256
from trezor.crypto.curve import secp256k1
from trezor.crypto import base58, der
from trezor.utils import ensure
from trezor.messages.CoinType import CoinType
from trezor.messages.SignTx import SignTx
from trezor.messages.TxOutputType import TxOutputType
from trezor.messages.TxOutputBinType import TxOutputBinType
from trezor.messages.TxInputType import TxInputType
from trezor.messages.TxRequest import TxRequest
from trezor.messages.TransactionType import TransactionType
from trezor.messages.RequestType import TXINPUT, TXOUTPUT, TXMETA, TXFINISHED
from trezor.messages.TxRequestSerializedType import TxRequestSerializedType
from trezor.messages.TxRequestDetailsType import TxRequestDetailsType
from trezor.messages import OutputScriptType, InputScriptType, FailureType
from trezor.messages import OutputScriptType
from apps.common import address_type
from apps.common import coins
from apps.wallet.sign_tx.addresses import *
from apps.wallet.sign_tx.helpers import *
from apps.wallet.sign_tx.segwit_bip143 import *
from apps.wallet.sign_tx.scripts import *
from apps.wallet.sign_tx.writers import *
# the number of bip32 levels used in a wallet (chain and address)
_BIP32_WALLET_DEPTH = const(2)
# Machine instructions
# ===
# the chain id used for change
_BIP32_CHANGE_CHAIN = const(1)
# the maximum allowed change address. this should be large enough for normal
# use and still allow to quickly brute-force the correct bip32 path
_BIP32_MAX_LAST_ELEMENT = const(1000000)
class SigningError(ValueError):
pass
class UiConfirmOutput:
def __init__(self, output: TxOutputType, coin: CoinType):
self.output = output
self.coin = coin
class UiConfirmTotal:
def __init__(self, spending: int, fee: int, coin: CoinType):
self.spending = spending
self.fee = fee
self.coin = coin
class UiConfirmFeeOverThreshold:
def __init__(self, fee: int, coin: CoinType):
self.fee = fee
self.coin = coin
def confirm_output(output: TxOutputType, coin: CoinType):
return (yield UiConfirmOutput(output, coin))
def confirm_total(spending: int, fee: int, coin: CoinType):
return (yield UiConfirmTotal(spending, fee, coin))
def confirm_feeoverthreshold(fee: int, coin: CoinType):
return (yield UiConfirmFeeOverThreshold(fee, coin))
def request_tx_meta(tx_req: TxRequest, tx_hash: bytes=None):
tx_req.request_type = TXMETA
tx_req.details.tx_hash = tx_hash
tx_req.details.request_index = None
ack = yield tx_req
tx_req.serialized = None
return sanitize_tx_meta(ack.tx)
def request_tx_input(tx_req: TxRequest, i: int, tx_hash: bytes=None):
tx_req.request_type = TXINPUT
tx_req.details.request_index = i
tx_req.details.tx_hash = tx_hash
ack = yield tx_req
tx_req.serialized = None
return sanitize_tx_input(ack.tx)
def request_tx_output(tx_req: TxRequest, i: int, tx_hash: bytes=None):
tx_req.request_type = TXOUTPUT
tx_req.details.request_index = i
tx_req.details.tx_hash = tx_hash
ack = yield tx_req
tx_req.serialized = None
if tx_hash is None:
return sanitize_tx_output(ack.tx)
else:
return sanitize_tx_binoutput(ack.tx)
def request_tx_finish(tx_req: TxRequest):
tx_req.request_type = TXFINISHED
tx_req.details = None
yield tx_req
tx_req.serialized = None
# Data sanitizers
# ===
def sanitize_sign_tx(tx: SignTx) -> SignTx:
tx.version = tx.version if tx.version is not None else 1
tx.lock_time = tx.lock_time if tx.lock_time is not None else 0
tx.inputs_count = tx.inputs_count if tx.inputs_count is not None else 0
tx.outputs_count = tx.outputs_count if tx.outputs_count is not None else 0
tx.coin_name = tx.coin_name if tx.coin_name is not None else 'Bitcoin'
return tx
def sanitize_tx_meta(tx: TransactionType) -> TransactionType:
tx.version = tx.version if tx.version is not None else 1
tx.lock_time = tx.lock_time if tx.lock_time is not None else 0
tx.inputs_cnt = tx.inputs_cnt if tx.inputs_cnt is not None else 0
tx.outputs_cnt = tx.outputs_cnt if tx.outputs_cnt is not None else 0
return tx
def sanitize_tx_input(tx: TransactionType) -> TxInputType:
txi = tx.inputs[0]
txi.script_type = (
txi.script_type if txi.script_type is not None else InputScriptType.SPENDADDRESS)
return txi
def sanitize_tx_output(tx: TransactionType) -> TxOutputType:
return tx.outputs[0]
def sanitize_tx_binoutput(tx: TransactionType) -> TxOutputBinType:
return tx.bin_outputs[0]
# Transaction signing
# ===
# see https://github.com/trezor/trezor-mcu/blob/master/firmware/signing.c#L84
# for pseudo code overview
# ===
async def sign_tx(tx: SignTx, root):
# Phase 1
# - check inputs, previous transactions, and outputs
# - ask for confirmations
# - check fee
async def check_tx_fee(tx: SignTx, root):
tx = sanitize_sign_tx(tx)
coin = coins.by_name(tx.coin_name)
# Phase 1
# - check inputs, previous transactions, and outputs
# - ask for confirmations
# - check fee
total_in = 0 # sum of input amounts
total_out = 0 # sum of output amounts
change_out = 0 # change output amount
# h_first is used to make sure the inputs and outputs streamed in Phase 1
# are the same as in Phase 2. it is thus not required to fully hash the
# tx, as the SignTx info is streamed only once
h_first = HashWriter(sha256) # not a real tx hash
bip143 = Bip143()
txo_bin = TxOutputBinType()
tx_req = TxRequest()
tx_req.details = TxRequestDetailsType()
total_in = 0 # sum of input amounts
total_out = 0 # sum of output amounts
change_out = 0 # change output amount
wallet_path = [] # common prefix of input paths
segwit = {} # dict of booleans stating if input is segwit
for i in range(tx.inputs_count):
# STAGE_REQUEST_1_INPUT
txi = await request_tx_input(tx_req, i)
wallet_path = input_extract_wallet_path(txi, wallet_path)
write_tx_input_check(h_first, txi)
total_in += await get_prevtx_output_value(
tx_req, txi.prev_hash, txi.prev_index)
bip143.add_prevouts(txi)
bip143.add_sequence(txi)
is_segwit = (txi.script_type == InputScriptType.SPENDWITNESS or
txi.script_type == InputScriptType.SPENDP2SHWITNESS)
if is_segwit:
if not coin.segwit:
raise SigningError(FailureType.DataError,
'Segwit not enabled on this coin')
if not txi.amount:
raise SigningError(FailureType.DataError,
'Segwit input without amount')
segwit[i] = True
total_in += txi.amount
elif txi.script_type == InputScriptType.SPENDADDRESS:
segwit[i] = False
total_in += await get_prevtx_output_value(
tx_req, txi.prev_hash, txi.prev_index)
else:
raise SigningError(FailureType.DataError,
'Wrong input script type')
for o in range(tx.outputs_count):
# STAGE_REQUEST_3_OUTPUT
txo = await request_tx_output(tx_req, o)
if output_is_change(txo):
txo_bin.amount = txo.amount
txo_bin.script_pubkey = output_derive_script(txo, coin, root)
if output_is_change(txo, wallet_path):
if change_out != 0:
raise SigningError(FailureType.ProcessError,
'Only one change output is valid')
change_out = txo.amount
else:
if not await confirm_output(txo, coin):
raise SigningError(FailureType.ActionCancelled,
'Output cancelled')
txo_bin.amount = txo.amount
txo_bin.script_pubkey = output_derive_script(txo, coin, root)
elif not await confirm_output(txo, coin):
raise SigningError(FailureType.ActionCancelled,
'Output cancelled')
write_tx_output(h_first, txo_bin)
bip143.add_output(txo_bin)
total_out += txo_bin.amount
fee = total_in - total_out
if fee < 0:
raise SigningError(FailureType.NotEnoughFunds,
'Not enough funds')
if fee > coin.maxfee_kb * ((estimate_tx_size(tx.inputs_count, tx.outputs_count) + 999) // 1000):
tx_size_b = estimate_tx_size(tx.inputs_count, tx.outputs_count)
if fee > coin.maxfee_kb * ((tx_size_b + 999) // 1000):
if not await confirm_feeoverthreshold(fee, coin):
raise SigningError(FailureType.ActionCancelled,
'Signing cancelled')
@ -200,75 +120,115 @@ async def sign_tx(tx: SignTx, root):
raise SigningError(FailureType.ActionCancelled,
'Total cancelled')
return h_first, bip143, segwit, total_in, wallet_path
async def sign_tx(tx: SignTx, root):
tx = sanitize_sign_tx(tx)
# Phase 1
h_first, bip143, segwit, authorized_in, wallet_path = await check_tx_fee(tx, root)
# Phase 2
# - sign inputs
# - check that nothing changed
coin = coins.by_name(tx.coin_name)
tx_ser = TxRequestSerializedType()
for i_sign in range(tx.inputs_count):
# hash of what we are signing with this input
h_sign = HashWriter(sha256)
# same as h_first, checked at the end of this iteration
h_second = HashWriter(sha256)
txo_bin = TxOutputBinType()
tx_req = TxRequest()
tx_req.details = TxRequestDetailsType()
tx_req.serialized = None
for i_sign in range(tx.inputs_count):
txi_sign = None
key_sign = None
key_sign_pub = None
write_uint32(h_sign, tx.version)
if segwit[i_sign]:
# STAGE_REQUEST_SEGWIT_INPUT
txi_sign = await request_tx_input(tx_req, i_sign)
write_varint(h_sign, tx.inputs_count)
is_segwit = (txi_sign.script_type == InputScriptType.SPENDWITNESS or
txi_sign.script_type == InputScriptType.SPENDP2SHWITNESS)
if not is_segwit:
raise SigningError(FailureType.ProcessError,
'Transaction has changed during signing')
input_check_wallet_path(txi_sign, wallet_path)
for i in range(tx.inputs_count):
# STAGE_REQUEST_4_INPUT
txi = await request_tx_input(tx_req, i)
write_tx_input_check(h_second, txi)
if i == i_sign:
txi_sign = txi
key_sign = node_derive(root, txi.address_n)
key_sign_pub = key_sign.public_key()
txi.script_sig = input_derive_script(txi, key_sign_pub)
else:
txi.script_sig = bytes()
write_tx_input(h_sign, txi)
key_sign = node_derive(root, txi_sign.address_n)
key_sign_pub = key_sign.public_key()
txi_sign.script_sig = input_derive_script(txi_sign, key_sign_pub)
w_txi = bytearray_with_cap(
7 + len(txi_sign.prev_hash) + 4 + len(txi_sign.script_sig) + 4)
if i_sign == 0: # serializing first input => prepend headers
write_bytes(w_txi, get_tx_header(tx, True))
write_tx_input(w_txi, txi_sign)
tx_ser.serialized_tx = w_txi
tx_req.serialized = tx_ser
write_varint(h_sign, tx.outputs_count)
else:
# hash of what we are signing with this input
h_sign = HashWriter(sha256)
# same as h_first, checked before signing the digest
h_second = HashWriter(sha256)
for o in range(tx.outputs_count):
# STAGE_REQUEST_4_OUTPUT
txo = await request_tx_output(tx_req, o)
txo_bin.amount = txo.amount
txo_bin.script_pubkey = output_derive_script(txo, coin, root)
write_tx_output(h_second, txo_bin)
write_tx_output(h_sign, txo_bin)
write_uint32(h_sign, tx.version)
write_varint(h_sign, tx.inputs_count)
write_uint32(h_sign, tx.lock_time)
for i in range(tx.inputs_count):
# STAGE_REQUEST_4_INPUT
txi = await request_tx_input(tx_req, i)
input_check_wallet_path(txi, wallet_path)
write_tx_input_check(h_second, txi)
if i == i_sign:
txi_sign = txi
key_sign = node_derive(root, txi.address_n)
key_sign_pub = key_sign.public_key()
txi_sign.script_sig = output_script_p2pkh(
ecdsa_hash_pubkey(key_sign_pub))
else:
txi.script_sig = bytes()
write_tx_input(h_sign, txi)
write_uint32(h_sign, 0x00000001) # SIGHASH_ALL hash_type
write_varint(h_sign, tx.outputs_count)
# check the control digests
if get_tx_hash(h_first, False) != get_tx_hash(h_second, False):
raise SigningError(FailureType.ProcessError,
'Transaction has changed during signing')
for o in range(tx.outputs_count):
# STAGE_REQUEST_4_OUTPUT
txo = await request_tx_output(tx_req, o)
txo_bin.amount = txo.amount
txo_bin.script_pubkey = output_derive_script(txo, coin, root)
write_tx_output(h_second, txo_bin)
write_tx_output(h_sign, txo_bin)
# compute the signature from the tx digest
signature = ecdsa_sign(key_sign, get_tx_hash(h_sign, True))
tx_ser.signature_index = i_sign
tx_ser.signature = signature
write_uint32(h_sign, tx.lock_time)
# serialize input with correct signature
txi_sign.script_sig = input_derive_script(
txi_sign, key_sign_pub, signature)
w_txi_sign = bytearray_with_cap(
len(txi_sign.prev_hash) + 4 + 5 + len(txi_sign.script_sig) + 4)
if i_sign == 0: # serializing first input => prepend tx version and inputs count
write_uint32(w_txi_sign, tx.version)
write_varint(w_txi_sign, tx.inputs_count)
write_tx_input(w_txi_sign, txi_sign)
tx_ser.serialized_tx = w_txi_sign
write_uint32(h_sign, 0x00000001) # SIGHASH_ALL hash_type
tx_req.serialized = tx_ser
# check the control digests
if get_tx_hash(h_first, False) != get_tx_hash(h_second, False):
raise SigningError(FailureType.ProcessError,
'Transaction has changed during signing')
# compute the signature from the tx digest
signature = ecdsa_sign(key_sign, get_tx_hash(h_sign, True))
tx_ser.signature_index = i_sign
tx_ser.signature = signature
# serialize input with correct signature
txi_sign.script_sig = input_derive_script(
txi_sign, key_sign_pub, signature)
w_txi_sign = bytearray_with_cap(
5 + len(txi_sign.prev_hash) + 4 + len(txi_sign.script_sig) + 4)
if i_sign == 0: # serializing first input => prepend headers
write_bytes(w_txi_sign, get_tx_header(tx))
write_tx_input(w_txi_sign, txi_sign)
tx_ser.serialized_tx = w_txi_sign
tx_req.serialized = tx_ser
for o in range(tx.outputs_count):
# STAGE_REQUEST_5_OUTPUT
@ -282,14 +242,47 @@ async def sign_tx(tx: SignTx, root):
if o == 0: # serializing first output => prepend outputs count
write_varint(w_txo_bin, tx.outputs_count)
write_tx_output(w_txo_bin, txo_bin)
if o == tx.outputs_count - 1: # serializing last output => append tx lock_time
write_uint32(w_txo_bin, tx.lock_time)
tx_ser.signature_index = None
tx_ser.signature = None
tx_ser.serialized_tx = w_txo_bin
tx_req.serialized = tx_ser
any_segwit = True in segwit.values()
for i in range(tx.inputs_count):
if segwit[i]:
# STAGE_REQUEST_SEGWIT_WITNESS
txi = await request_tx_input(tx_req, i)
input_check_wallet_path(txi, wallet_path)
is_segwit = (txi.script_type == InputScriptType.SPENDWITNESS or
txi.script_type == InputScriptType.SPENDP2SHWITNESS)
if not is_segwit or txi.amount > authorized_in:
raise SigningError(FailureType.ProcessError,
'Transaction has changed during signing')
authorized_in -= txi.amount
key_sign = node_derive(root, txi.address_n)
key_sign_pub = key_sign.public_key()
bip143_hash = bip143.preimage_hash(tx, txi, ecdsa_hash_pubkey(key_sign_pub))
signature = ecdsa_sign(key_sign, bip143_hash)
witness = get_p2wpkh_witness(signature, key_sign_pub)
tx_ser.serialized_tx = witness
tx_ser.signature_index = i
tx_ser.signature = signature
elif any_segwit:
tx_ser.serialized_tx = bytearray(1) # empty witness for non-segwit inputs
tx_ser.signature_index = None
tx_ser.signature = None
tx_req.serialized = tx_ser
write_uint32(tx_ser.serialized_tx, tx.lock_time)
await request_tx_finish(tx_req)
@ -327,66 +320,88 @@ async def get_prevtx_output_value(tx_req: TxRequest, prev_hash: bytes, prev_inde
return total_out
def get_tx_hash(w, double: bool, reverse: bool=False) -> bytes:
d = w.getvalue()
if double:
d = sha256(d).digest()
if reverse:
d = bytes(reversed(d))
return d
def estimate_tx_size(inputs, outputs):
def estimate_tx_size(inputs: int, outputs: int) -> int:
return 10 + inputs * 149 + outputs * 35
# TX Helpers
# ===
def get_tx_header(tx: SignTx, segwit=False):
w_txi = bytearray()
write_uint32(w_txi, tx.version)
if segwit:
write_varint(w_txi, 0x00) # segwit witness marker
write_varint(w_txi, 0x01) # segwit witness flag
write_varint(w_txi, tx.inputs_count)
return w_txi
def get_p2wpkh_witness(signature: bytes, pubkey: bytes):
w = bytearray_with_cap(1 + 5 + len(signature) + 1 + 5 + len(pubkey))
write_varint(w, 0x02) # num of segwit items, in P2WPKH it's always 2
append_signature_and_pubkey(w, pubkey, signature)
return w
# TX Outputs
# ===
def output_derive_script(o: TxOutputType, coin: CoinType, root) -> bytes:
if o.script_type == OutputScriptType.PAYTOADDRESS:
ra = output_paytoaddress_extract_raw_address(o, coin, root)
ra = address_type.strip(coin.address_type, ra)
return script_paytoaddress_new(ra)
elif o.script_type == OutputScriptType.PAYTOSCRIPTHASH:
ra = output_paytoaddress_extract_raw_address(o, coin, root, p2sh=True)
ra = address_type.strip(coin.address_type_p2sh, ra)
return script_paytoscripthash_new(ra)
elif o.script_type == OutputScriptType.PAYTOOPRETURN:
if o.amount == 0:
return script_paytoopreturn_new(o.op_return_data)
else:
raise SigningError(FailureType.SyntaxError,
if o.script_type == OutputScriptType.PAYTOOPRETURN:
if o.amount != 0:
raise SigningError(FailureType.DataError,
'OP_RETURN output with non-zero amount')
return output_script_paytoopreturn(o.op_return_data)
if o.address_n: # change output
if o.address:
raise SigningError(FailureType.DataError, 'Address in change output')
o.address = get_address_for_change(o, coin, root)
else:
raise SigningError(FailureType.SyntaxError,
'Invalid output script type')
if not o.address:
raise SigningError(FailureType.DataError, 'Missing address')
if coin.bech32_prefix and o.address.startswith(coin.bech32_prefix): # p2wpkh or p2wsh
witprog = decode_bech32_address(coin.bech32_prefix, o.address)
return output_script_native_p2wpkh_or_p2wsh(witprog)
raw_address = base58.decode_check(o.address)
if address_type.check(coin.address_type, raw_address): # p2pkh
pubkeyhash = address_type.strip(coin.address_type, raw_address)
return output_script_p2pkh(pubkeyhash)
elif address_type.check(coin.address_type_p2sh, raw_address): # p2sh
scripthash = address_type.strip(coin.address_type_p2sh, raw_address)
return output_script_p2sh(scripthash)
raise SigningError(FailureType.DataError, 'Invalid address type')
def output_paytoaddress_extract_raw_address(
o: TxOutputType, coin: CoinType, root, p2sh: bool=False) -> bytes:
addr_type = coin.address_type_p2sh if p2sh else coin.address_type
# TODO: dont encode/decode more then necessary
if o.address_n is not None:
node = node_derive(root, o.address_n)
address = node.address(addr_type)
return base58.decode_check(address)
if o.address:
raw = base58.decode_check(o.address)
if not address_type.check(addr_type, raw):
raise SigningError(FailureType.SyntaxError,
'Invalid address type')
return raw
raise SigningError(FailureType.SyntaxError,
'Missing address')
def get_address_for_change(o: TxOutputType, coin: CoinType, root):
if o.script_type == OutputScriptType.PAYTOADDRESS:
input_script_type = InputScriptType.SPENDADDRESS
elif o.script_type == OutputScriptType.PAYTOMULTISIG:
input_script_type = InputScriptType.SPENDMULTISIG
elif o.script_type == OutputScriptType.PAYTOWITNESS:
input_script_type = InputScriptType.SPENDWITNESS
elif o.script_type == OutputScriptType.PAYTOP2SHWITNESS:
input_script_type = InputScriptType.SPENDP2SHWITNESS
else:
raise SigningError(FailureType.DataError, 'Invalid script type')
return get_address(input_script_type, coin, node_derive(root, o.address_n))
def output_is_change(o: TxOutputType) -> bool:
return bool(o.address_n)
def output_is_change(o: TxOutputType, wallet_path: list) -> bool:
address_n = o.address_n
return (address_n is not None and wallet_path is not None
and wallet_path == address_n[:-_BIP32_WALLET_DEPTH]
and address_n[-2] == _BIP32_CHANGE_CHAIN
and address_n[-1] <= _BIP32_MAX_LAST_ELEMENT)
# Tx Inputs
@ -395,14 +410,35 @@ def output_is_change(o: TxOutputType) -> bool:
def input_derive_script(i: TxInputType, pubkey: bytes, signature: bytes=None) -> bytes:
if i.script_type == InputScriptType.SPENDADDRESS:
if signature is None:
return script_paytoaddress_new(ecdsa_hash_pubkey(pubkey))
else:
return script_spendaddress_new(pubkey, signature)
return input_script_p2pkh_or_p2sh(pubkey, signature) # p2pkh or p2sh
if i.script_type == InputScriptType.SPENDP2SHWITNESS: # p2wpkh using p2sh
return input_script_p2wpkh_in_p2sh(ecdsa_hash_pubkey(pubkey))
elif i.script_type == InputScriptType.SPENDWITNESS: # native p2wpkh or p2wsh
return input_script_native_p2wpkh_or_p2wsh()
else:
raise SigningError(FailureType.SyntaxError,
'Unknown input script type')
raise SigningError(FailureType.ProcessError, 'Invalid script type')
def input_extract_wallet_path(txi: TxInputType, wallet_path: list) -> list:
if wallet_path is None:
return None # there was a mismatch in previous inputs
address_n = txi.address_n[:-_BIP32_WALLET_DEPTH]
if not address_n:
return None # input path is too short
if not wallet_path:
return address_n # this is the first input
if wallet_path == address_n:
return address_n # paths match
return None # paths don't match
def input_check_wallet_path(txi: TxInputType, wallet_path: list) -> list:
if wallet_path is None:
return # there was a mismatch in Phase 1, ignore it now
address_n = txi.address_n[:-_BIP32_WALLET_DEPTH]
if wallet_path != address_n:
raise SigningError(FailureType.ProcessError,
'Transaction has changed during signing')
def node_derive(root, address_n: list):
@ -411,178 +447,7 @@ def node_derive(root, address_n: list):
return node
def ecdsa_hash_pubkey(pubkey: bytes) -> bytes:
if pubkey[0] == 0x04:
ensure(len(pubkey) == 65) # uncompressed format
elif pubkey[0] == 0x00:
ensure(len(pubkey) == 1) # point at infinity
else:
ensure(len(pubkey) == 33) # compresssed format
h = sha256(pubkey).digest()
h = ripemd160(h).digest()
return h
def ecdsa_sign(node, digest: bytes) -> bytes:
sig = secp256k1.sign(node.private_key(), digest)
sigder = der.encode_seq((sig[1:33], sig[33:65]))
return sigder
# TX Scripts
# ===
def script_paytoaddress_new(pubkeyhash: bytes) -> bytearray:
s = bytearray(25)
s[0] = 0x76 # OP_DUP
s[1] = 0xA9 # OP_HASH_160
s[2] = 0x14 # pushing 20 bytes
s[3:23] = pubkeyhash
s[23] = 0x88 # OP_EQUALVERIFY
s[24] = 0xAC # OP_CHECKSIG
return s
def script_paytoscripthash_new(scripthash: bytes) -> bytearray:
s = bytearray(23)
s[0] = 0xA9 # OP_HASH_160
s[1] = 0x14 # pushing 20 bytes
s[2:22] = scripthash
s[22] = 0x87 # OP_EQUAL
return s
def script_paytoopreturn_new(data: bytes) -> bytearray:
w = bytearray_with_cap(1 + 5 + len(data))
w.append(0x6A) # OP_RETURN
write_op_push(w, len(data))
w.extend(data)
return w
def script_spendaddress_new(pubkey: bytes, signature: bytes) -> bytearray:
w = bytearray_with_cap(5 + len(signature) + 1 + 5 + len(pubkey))
write_op_push(w, len(signature) + 1)
write_bytes(w, signature)
w.append(0x01)
write_op_push(w, len(pubkey))
write_bytes(w, pubkey)
return w
# TX Serialization
# ===
_DEFAULT_SEQUENCE = 4294967295
def write_tx_input(w, i: TxInputType):
i_sequence = i.sequence if i.sequence is not None else _DEFAULT_SEQUENCE
write_bytes_rev(w, i.prev_hash)
write_uint32(w, i.prev_index)
write_varint(w, len(i.script_sig))
write_bytes(w, i.script_sig)
write_uint32(w, i_sequence)
def write_tx_input_check(w, i: TxInputType):
i_sequence = i.sequence if i.sequence is not None else _DEFAULT_SEQUENCE
write_bytes(w, i.prev_hash)
write_uint32(w, i.prev_index)
write_uint32(w, len(i.address_n))
for n in i.address_n:
write_uint32(w, n)
write_uint32(w, i_sequence)
def write_tx_output(w, o: TxOutputBinType):
write_uint64(w, o.amount)
write_varint(w, len(o.script_pubkey))
write_bytes(w, o.script_pubkey)
def write_op_push(w, n: int):
if n < 0x4C:
w.append(n & 0xFF)
elif n < 0xFF:
w.append(0x4C)
w.append(n & 0xFF)
elif n < 0xFFFF:
w.append(0x4D)
w.append(n & 0xFF)
w.append((n >> 8) & 0xFF)
else:
w.append(0x4E)
w.append(n & 0xFF)
w.append((n >> 8) & 0xFF)
w.append((n >> 16) & 0xFF)
w.append((n >> 24) & 0xFF)
# Buffer IO & Serialization
# ===
def write_varint(w, n: int):
if n < 253:
w.append(n & 0xFF)
elif n < 65536:
w.append(253)
w.append(n & 0xFF)
w.append((n >> 8) & 0xFF)
else:
w.append(254)
w.append(n & 0xFF)
w.append((n >> 8) & 0xFF)
w.append((n >> 16) & 0xFF)
w.append((n >> 24) & 0xFF)
def write_uint32(w, n: int):
w.append(n & 0xFF)
w.append((n >> 8) & 0xFF)
w.append((n >> 16) & 0xFF)
w.append((n >> 24) & 0xFF)
def write_uint64(w, n: int):
w.append(n & 0xFF)
w.append((n >> 8) & 0xFF)
w.append((n >> 16) & 0xFF)
w.append((n >> 24) & 0xFF)
w.append((n >> 32) & 0xFF)
w.append((n >> 40) & 0xFF)
w.append((n >> 48) & 0xFF)
w.append((n >> 56) & 0xFF)
def write_bytes(w, buf: bytearray):
w.extend(buf)
def write_bytes_rev(w, buf: bytearray):
w.extend(bytearray(reversed(buf)))
def bytearray_with_cap(cap: int) -> bytearray:
b = bytearray(cap)
b[:] = bytes()
return b
class HashWriter:
def __init__(self, hashfunc):
self.ctx = hashfunc()
self.buf = bytearray(1) # used in append()
def extend(self, buf: bytearray):
self.ctx.update(buf)
def append(self, b: int):
self.buf[0] = b
self.ctx.update(self.buf)
def getvalue(self) -> bytes:
return self.ctx.digest()

View File

@ -0,0 +1,131 @@
from trezor.crypto.hashlib import sha256
from apps.wallet.sign_tx.writers import *
# TX Serialization
# ===
def write_tx_input(w, i: TxInputType):
write_bytes_rev(w, i.prev_hash)
write_uint32(w, i.prev_index)
write_varint(w, len(i.script_sig))
write_bytes(w, i.script_sig)
write_uint32(w, i.sequence)
def write_tx_input_check(w, i: TxInputType):
write_bytes(w, i.prev_hash)
write_uint32(w, i.prev_index)
write_uint32(w, i.script_type)
write_uint32(w, len(i.address_n))
for n in i.address_n:
write_uint32(w, n)
write_uint32(w, i.sequence)
write_uint32(w, i.amount or 0)
def write_tx_output(w, o: TxOutputBinType):
write_uint64(w, o.amount)
write_varint(w, len(o.script_pubkey))
write_bytes(w, o.script_pubkey)
def write_op_push(w, n: int):
if n < 0x4C:
w.append(n & 0xFF)
elif n < 0xFF:
w.append(0x4C)
w.append(n & 0xFF)
elif n < 0xFFFF:
w.append(0x4D)
w.append(n & 0xFF)
w.append((n >> 8) & 0xFF)
else:
w.append(0x4E)
w.append(n & 0xFF)
w.append((n >> 8) & 0xFF)
w.append((n >> 16) & 0xFF)
w.append((n >> 24) & 0xFF)
# Buffer IO & Serialization
# ===
def write_varint(w, n: int):
if n < 253:
w.append(n & 0xFF)
elif n < 65536:
w.append(253)
w.append(n & 0xFF)
w.append((n >> 8) & 0xFF)
else:
w.append(254)
w.append(n & 0xFF)
w.append((n >> 8) & 0xFF)
w.append((n >> 16) & 0xFF)
w.append((n >> 24) & 0xFF)
def write_uint32(w, n: int):
w.append(n & 0xFF)
w.append((n >> 8) & 0xFF)
w.append((n >> 16) & 0xFF)
w.append((n >> 24) & 0xFF)
def write_uint64(w, n: int):
w.append(n & 0xFF)
w.append((n >> 8) & 0xFF)
w.append((n >> 16) & 0xFF)
w.append((n >> 24) & 0xFF)
w.append((n >> 32) & 0xFF)
w.append((n >> 40) & 0xFF)
w.append((n >> 48) & 0xFF)
w.append((n >> 56) & 0xFF)
def write_bytes(w, buf: bytearray):
w.extend(buf)
def write_bytes_rev(w, buf: bytearray):
w.extend(bytearray(reversed(buf)))
def bytearray_with_cap(cap: int) -> bytearray:
b = bytearray(cap)
b[:] = bytes()
return b
# Hashes
# ===
def get_tx_hash(w, double: bool, reverse: bool=False) -> bytes:
d = w.getvalue()
if double:
d = sha256(d).digest()
if reverse:
d = bytes(reversed(d))
return d
class HashWriter:
def __init__(self, hashfunc):
self.ctx = hashfunc()
self.buf = bytearray(1) # used in append()
def extend(self, buf: bytearray):
self.ctx.update(buf)
def append(self, b: int):
self.buf[0] = b
self.ctx.update(self.buf)
def getvalue(self) -> bytes:
return self.ctx.digest()

123
src/trezor/crypto/bech32.py Normal file
View File

@ -0,0 +1,123 @@
# Copyright (c) 2017 Pieter Wuille
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Reference implementation for Bech32 and segwit addresses."""
CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l"
def bech32_polymod(values):
"""Internal function that computes the Bech32 checksum."""
generator = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3]
chk = 1
for value in values:
top = chk >> 25
chk = (chk & 0x1ffffff) << 5 ^ value
for i in range(5):
chk ^= generator[i] if ((top >> i) & 1) else 0
return chk
def bech32_hrp_expand(hrp):
"""Expand the HRP into values for checksum computation."""
return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp]
def bech32_verify_checksum(hrp, data):
"""Verify a checksum given HRP and converted data characters."""
return bech32_polymod(bech32_hrp_expand(hrp) + data) == 1
def bech32_create_checksum(hrp, data):
"""Compute the checksum values given HRP and data."""
values = bech32_hrp_expand(hrp) + data
polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ 1
return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)]
def bech32_encode(hrp, data):
"""Compute a Bech32 string given HRP and data values."""
combined = data + bech32_create_checksum(hrp, data)
return hrp + '1' + ''.join([CHARSET[d] for d in combined])
def bech32_decode(bech):
"""Validate a Bech32 string, and determine HRP and data."""
if ((any(ord(x) < 33 or ord(x) > 126 for x in bech)) or
(bech.lower() != bech and bech.upper() != bech)):
return (None, None)
bech = bech.lower()
pos = bech.rfind('1')
if pos < 1 or pos + 7 > len(bech) or len(bech) > 90:
return (None, None)
if not all(x in CHARSET for x in bech[pos+1:]):
return (None, None)
hrp = bech[:pos]
data = [CHARSET.find(x) for x in bech[pos+1:]]
if not bech32_verify_checksum(hrp, data):
return (None, None)
return (hrp, data[:-6])
def convertbits(data, frombits, tobits, pad=True):
"""General power-of-2 base conversion."""
acc = 0
bits = 0
ret = []
maxv = (1 << tobits) - 1
max_acc = (1 << (frombits + tobits - 1)) - 1
for value in data:
if value < 0 or (value >> frombits):
return None
acc = ((acc << frombits) | value) & max_acc
bits += frombits
while bits >= tobits:
bits -= tobits
ret.append((acc >> bits) & maxv)
if pad:
if bits:
ret.append((acc << (tobits - bits)) & maxv)
elif bits >= frombits or ((acc << (tobits - bits)) & maxv):
return None
return ret
def decode(hrp, addr):
"""Decode a segwit address."""
hrpgot, data = bech32_decode(addr)
if hrpgot != hrp:
return (None, None)
decoded = convertbits(data[1:], 5, 8, False)
if decoded is None or len(decoded) < 2 or len(decoded) > 40:
return (None, None)
if data[0] > 16:
return (None, None)
if data[0] == 0 and len(decoded) != 20 and len(decoded) != 32:
return (None, None)
return (data[0], decoded)
def encode(hrp, witver, witprog):
"""Encode a segwit address."""
ret = bech32_encode(hrp, [witver] + convertbits(witprog, 8, 5))
if decode(hrp, ret) == (None, None):
return None
return ret

View File

@ -17,8 +17,6 @@ pytest \
--ignore test_msg_ethereum_signmessage.py \
--ignore test_msg_ethereum_signtx.py \
--ignore test_msg_ethereum_verifymessage.py \
--ignore test_msg_getaddress_segwit_native.py \
--ignore test_msg_getaddress_segwit.py \
--ignore test_msg_getaddress_show.py \
--ignore test_msg_loaddevice_xprv.py \
--ignore test_msg_nem_getaddress.py \
@ -30,8 +28,6 @@ pytest \
--ignore test_msg_signmessage_segwit_native.py \
--ignore test_msg_signmessage_segwit.py \
--ignore test_msg_signtx_bch.py \
--ignore test_msg_signtx_segwit_native.py \
--ignore test_msg_signtx_segwit.py \
--ignore test_msg_signtx_zcash.py \
--ignore test_msg_verifymessage_segwit_native.py \
--ignore test_msg_verifymessage_segwit.py \

View File

@ -0,0 +1,48 @@
from common import *
from apps.wallet.sign_tx.signing import *
from apps.common import coins
from trezor.crypto import bip32, bip39
class TestSegwitAddress(unittest.TestCase):
# pylint: disable=C0301
def test_p2wpkh_in_p2sh_address(self):
coin = coins.by_name('Testnet')
address = address_p2wpkh_in_p2sh(
unhexlify('03a1af804ac108a8a51782198c2d034b28bf90c8803f5a53f76276fa69a4eae77f'),
coin.address_type_p2sh
)
self.assertEqual(address, '2Mww8dCYPUpKHofjgcXcBCEGmniw9CoaiD2')
def test_p2wpkh_in_p2sh_raw_address(self):
raw = address_p2wpkh_in_p2sh_raw(
unhexlify('03a1af804ac108a8a51782198c2d034b28bf90c8803f5a53f76276fa69a4eae77f')
)
self.assertEqual(raw, unhexlify('336caa13e08b96080a32b5d818d59b4ab3b36742'))
def test_p2wpkh_in_p2sh_node_derive_address(self):
coin = coins.by_name('Testnet')
seed = bip39.seed(' '.join(['all'] * 12), '')
root = bip32.from_seed(seed, 'secp256k1')
node = node_derive(root, [49 | 0x80000000, 1 | 0x80000000, 0 | 0x80000000, 1, 0])
address = address_p2wpkh_in_p2sh(node.public_key(), coin.address_type_p2sh)
self.assertEqual(address, '2N1LGaGg836mqSQqiuUBLfcyGBhyZbremDX')
node = node_derive(root, [49 | 0x80000000, 1 | 0x80000000, 0 | 0x80000000, 1, 1])
address = address_p2wpkh_in_p2sh(node.public_key(), coin.address_type_p2sh)
self.assertEqual(address, '2NFWLCJQBSpz1oUJwwLpX8ECifFWGznBVqs')
node = node_derive(root, [49 | 0x80000000, 1 | 0x80000000, 0 | 0x80000000, 0, 0])
address = address_p2wpkh_in_p2sh(node.public_key(), coin.address_type_p2sh)
self.assertEqual(address, '2N4Q5FhU2497BryFfUgbqkAJE87aKHUhXMp')
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,96 @@
from common import *
from apps.wallet.sign_tx.signing import *
from apps.common import coins
from trezor.messages.SignTx import SignTx
from trezor.messages.TxInputType import TxInputType
from trezor.messages.TxOutputType import TxOutputType
from trezor.messages import OutputScriptType
from trezor.crypto import bip32, bip39
class TestSegwitBip143NativeP2WPKH(unittest.TestCase):
# pylint: disable=C0301
tx = SignTx(coin_name='Bitcoin', version=1, lock_time=0x00000011, inputs_count=2, outputs_count=2)
inp1 = TxInputType(address_n=[0],
# Trezor expects hash in reversed format
prev_hash=unhexlify('9f96ade4b41d5433f4eda31e1738ec2b36f6e7d1420d94a6af99801a88f7f7ff'),
prev_index=0,
amount=625000000, # 6.25 btc
script_type=InputScriptType.SPENDWITNESS,
sequence=0xffffffee)
inp2 = TxInputType(address_n=[1],
# Trezor expects hash in reversed format
prev_hash=unhexlify('8ac60eb9575db5b2d987e29f301b5b819ea83a5c6579d282d189cc04b8e151ef'),
prev_index=1,
amount=600000000, # 6 btc
script_type=InputScriptType.SPENDWITNESS,
sequence=0xffffffff)
out1 = TxOutputType(address='1Cu32FVupVCgHkMMRJdYJugxwo2Aprgk7H', # derived
amount=0x0000000006b22c20,
script_type=OutputScriptType.PAYTOADDRESS,
address_n=None)
out2 = TxOutputType(address='16TZ8J6Q5iZKBWizWzFAYnrsaox5Z5aBRV', # derived
amount=0x000000000d519390,
script_type=OutputScriptType.PAYTOADDRESS,
address_n=None)
def test_prevouts(self):
bip143 = Bip143()
bip143.add_prevouts(self.inp1)
bip143.add_prevouts(self.inp2)
self.assertEqual(hexlify(bip143.get_prevouts_hash()), b'96b827c8483d4e9b96712b6713a7b68d6e8003a781feba36c31143470b4efd37')
def test_sequence(self):
bip143 = Bip143()
bip143.add_sequence(self.inp1)
bip143.add_sequence(self.inp2)
self.assertEqual(hexlify(bip143.get_sequence_hash()), b'52b0a642eea2fb7ae638c36f6252b6750293dbe574a806984b8e4d8548339a3b')
def test_outputs(self):
seed = bip39.seed('alcohol woman abuse must during monitor noble actual mixed trade anger aisle', '')
root = bip32.from_seed(seed, 'secp256k1')
coin = coins.by_name(self.tx.coin_name)
bip143 = Bip143()
for txo in [self.out1, self.out2]:
txo_bin = TxOutputBinType()
txo_bin.amount = txo.amount
txo_bin.script_pubkey = output_derive_script(txo, coin, root)
bip143.add_output(txo_bin)
self.assertEqual(hexlify(bip143.get_outputs_hash()),
b'863ef3e1a92afbfdb97f31ad0fc7683ee943e9abcf2501590ff8f6551f47e5e5')
def test_preimage_testdata(self):
seed = bip39.seed('alcohol woman abuse must during monitor noble actual mixed trade anger aisle', '')
root = bip32.from_seed(seed, 'secp256k1')
coin = coins.by_name(self.tx.coin_name)
bip143 = Bip143()
bip143.add_prevouts(self.inp1)
bip143.add_prevouts(self.inp2)
bip143.add_sequence(self.inp1)
bip143.add_sequence(self.inp2)
for txo in [self.out1, self.out2]:
txo_bin = TxOutputBinType()
txo_bin.amount = txo.amount
txo_bin.script_pubkey = output_derive_script(txo, coin, root)
bip143.add_output(txo_bin)
# test data public key hash
# only for input 2 - input 1 is not segwit
result = bip143.preimage_hash(self.tx, self.inp2, unhexlify('1d0f172a0ecb48aee1be1f2687d2963ae33f71a1'))
self.assertEqual(hexlify(result), b'c37af31116d1b27caf68aae9e3ac82f1477929014d5b917657d0eb49478cb670')
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,83 @@
from common import *
from apps.wallet.sign_tx.signing import *
from apps.common import coins
from trezor.messages.SignTx import SignTx
from trezor.messages.TxInputType import TxInputType
from trezor.messages.TxOutputType import TxOutputType
from trezor.messages import OutputScriptType
from trezor.crypto import bip32, bip39
class TestSegwitBip143(unittest.TestCase):
# pylint: disable=C0301
tx = SignTx(coin_name='Bitcoin', version=1, lock_time=0x00000492, inputs_count=1, outputs_count=2)
inp1 = TxInputType(address_n=[0],
# Trezor expects hash in reversed format
prev_hash=unhexlify('77541aeb3c4dac9260b68f74f44c973081a9d4cb2ebe8038b2d70faa201b6bdb'),
prev_index=1,
amount=1000000000, # 10 btc
script_type=InputScriptType.SPENDP2SHWITNESS, # todo is this correct?
sequence=0xfffffffe)
out1 = TxOutputType(address='1Fyxts6r24DpEieygQiNnWxUdb18ANa5p7',
amount=0x000000000bebb4b8,
script_type=OutputScriptType.PAYTOADDRESS,
address_n=None)
out2 = TxOutputType(address='1Q5YjKVj5yQWHBBsyEBamkfph3cA6G9KK8',
amount=0x000000002faf0800,
script_type=OutputScriptType.PAYTOADDRESS,
address_n=None)
def test_bip143_prevouts(self):
bip143 = Bip143()
bip143.add_prevouts(self.inp1)
self.assertEqual(hexlify(bip143.get_prevouts_hash()), b'b0287b4a252ac05af83d2dcef00ba313af78a3e9c329afa216eb3aa2a7b4613a')
def test_bip143_sequence(self):
bip143 = Bip143()
bip143.add_sequence(self.inp1)
self.assertEqual(hexlify(bip143.get_sequence_hash()), b'18606b350cd8bf565266bc352f0caddcf01e8fa789dd8a15386327cf8cabe198')
def test_bip143_outputs(self):
seed = bip39.seed('alcohol woman abuse must during monitor noble actual mixed trade anger aisle', '')
root = bip32.from_seed(seed, 'secp256k1')
coin = coins.by_name(self.tx.coin_name)
bip143 = Bip143()
for txo in [self.out1, self.out2]:
txo_bin = TxOutputBinType()
txo_bin.amount = txo.amount
txo_bin.script_pubkey = output_derive_script(txo, coin, root)
bip143.add_output(txo_bin)
self.assertEqual(hexlify(bip143.get_outputs_hash()),
b'de984f44532e2173ca0d64314fcefe6d30da6f8cf27bafa706da61df8a226c83')
def test_bip143_preimage_testdata(self):
seed = bip39.seed('alcohol woman abuse must during monitor noble actual mixed trade anger aisle', '')
root = bip32.from_seed(seed, 'secp256k1')
coin = coins.by_name(self.tx.coin_name)
bip143 = Bip143()
bip143.add_prevouts(self.inp1)
bip143.add_sequence(self.inp1)
for txo in [self.out1, self.out2]:
txo_bin = TxOutputBinType()
txo_bin.amount = txo.amount
txo_bin.script_pubkey = output_derive_script(txo, coin, root)
bip143.add_output(txo_bin)
# test data public key hash
result = bip143.preimage_hash(self.tx, self.inp1, unhexlify('79091972186c449eb1ded22b78e40d009bdf0089'))
self.assertEqual(hexlify(result), b'64f3b0f4dd2bb3aa1ce8566d220cc74dda9df97d8490cc81d89d735c92e59fb6')
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,216 @@
from common import *
from trezor.utils import chunks
from trezor.crypto import bip32, bip39
from trezor.messages.SignTx import SignTx
from trezor.messages.TxInputType import TxInputType
from trezor.messages.TxOutputType import TxOutputType
from trezor.messages.TxRequest import TxRequest
from trezor.messages.TxAck import TxAck
from trezor.messages.TransactionType import TransactionType
from trezor.messages.RequestType import TXINPUT, TXOUTPUT, TXMETA, TXFINISHED
from trezor.messages.TxRequestDetailsType import TxRequestDetailsType
from trezor.messages.TxRequestSerializedType import TxRequestSerializedType
from trezor.messages import InputScriptType
from trezor.messages import OutputScriptType
from apps.common import coins
from apps.wallet.sign_tx import signing
class TestSignSegwitTxNativeP2WPKH(unittest.TestCase):
# pylint: disable=C0301
def test_send_native_p2wpkh(self):
coin = coins.by_name('Testnet')
seed = bip39.seed(' '.join(['all'] * 12), '')
root = bip32.from_seed(seed, 'secp256k1')
inp1 = TxInputType(
# 49'/1'/0'/0/0" - tb1qqzv60m9ajw8drqulta4ld4gfx0rdh82un5s65s
address_n=[49 | 0x80000000, 1 | 0x80000000, 0 | 0x80000000, 0, 0],
amount=12300000,
prev_hash=unhexlify('09144602765ce3dd8f4329445b20e3684e948709c5cdcaf12da3bb079c99448a'),
prev_index=0,
script_type=InputScriptType.SPENDWITNESS,
sequence=0xffffffff,
)
out1 = TxOutputType(
address='2N4Q5FhU2497BryFfUgbqkAJE87aKHUhXMp',
amount=5000000,
script_type=OutputScriptType.PAYTOADDRESS,
address_n=None, # @todo ask honza about sanitizing
)
out2 = TxOutputType(
address='tb1q694ccp5qcc0udmfwgp692u2s2hjpq5h407urtu',
script_type=OutputScriptType.PAYTOADDRESS,
amount=12300000 - 11000 - 5000000,
address_n=None,
)
tx = SignTx(coin_name='Testnet', version=None, lock_time=None, inputs_count=1, outputs_count=2)
messages = [
None,
# check fee
TxRequest(request_type=TXINPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None)),
TxAck(tx=TransactionType(inputs=[inp1])),
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None), serialized=None),
TxAck(tx=TransactionType(outputs=[out1])),
signing.UiConfirmOutput(out1, coin),
True,
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=1, tx_hash=None), serialized=None),
TxAck(tx=TransactionType(outputs=[out2])),
signing.UiConfirmOutput(out2, coin),
True,
signing.UiConfirmTotal(12300000 - 11000, 11000, coin),
True,
# sign tx
TxRequest(request_type=TXINPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None), serialized=None),
TxAck(tx=TransactionType(inputs=[inp1])),
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None), serialized=TxRequestSerializedType(
# returned serialized inp1
serialized_tx=unhexlify('010000000001018a44999c07bba32df1cacdc50987944e68e3205b4429438fdde35c76024614090000000000ffffffff'),
)),
TxAck(tx=TransactionType(outputs=[out1])),
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=1, tx_hash=None), serialized=TxRequestSerializedType(
# returned serialized out1
serialized_tx=unhexlify('02404b4c000000000017a9147a55d61848e77ca266e79a39bfc85c580a6426c987'),
signature_index=None,
signature=None,
)),
TxAck(tx=TransactionType(outputs=[out2])),
# segwit
TxRequest(request_type=TXINPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None), serialized=TxRequestSerializedType(
# returned serialized out2
serialized_tx=unhexlify('a8386f0000000000160014d16b8c0680c61fc6ed2e407455715055e41052f5'),
signature_index=None,
signature=None,
)),
TxAck(tx=TransactionType(inputs=[inp1])),
TxRequest(request_type=TXFINISHED, details=None, serialized=TxRequestSerializedType(
serialized_tx=unhexlify('02483045022100a7ca8f097525f9044e64376dc0a0f5d4aeb8d15d66808ba97979a0475b06b66502200597c8ebcef63e047f9aeef1a8001d3560470cf896c12f6990eec4faec599b950121033add1f0e8e3c3136f7428dd4a4de1057380bd311f5b0856e2269170b4ffa65bf00000000'),
signature_index=0,
signature=unhexlify('3045022100a7ca8f097525f9044e64376dc0a0f5d4aeb8d15d66808ba97979a0475b06b66502200597c8ebcef63e047f9aeef1a8001d3560470cf896c12f6990eec4faec599b95'),
)),
]
signer = signing.sign_tx(tx, root)
for request, response in chunks(messages, 2):
self.assertEqualEx(signer.send(request), response)
with self.assertRaises(StopIteration):
signer.send(None)
def test_send_native_p2wpkh_change(self):
coin = coins.by_name('Testnet')
seed = bip39.seed(' '.join(['all'] * 12), '')
root = bip32.from_seed(seed, 'secp256k1')
inp1 = TxInputType(
# 49'/1'/0'/0/0" - tb1qqzv60m9ajw8drqulta4ld4gfx0rdh82un5s65s
address_n=[49 | 0x80000000, 1 | 0x80000000, 0 | 0x80000000, 0, 0],
amount=12300000,
prev_hash=unhexlify('09144602765ce3dd8f4329445b20e3684e948709c5cdcaf12da3bb079c99448a'),
prev_index=0,
script_type=InputScriptType.SPENDWITNESS,
sequence=0xffffffff,
)
out1 = TxOutputType(
address='2N4Q5FhU2497BryFfUgbqkAJE87aKHUhXMp',
amount=5000000,
script_type=OutputScriptType.PAYTOADDRESS,
address_n=None, # @todo ask honza about sanitizing
)
out2 = TxOutputType(
address=None,
address_n=[49 | 0x80000000, 1 | 0x80000000, 0 | 0x80000000, 1, 0],
script_type=OutputScriptType.PAYTOWITNESS,
amount=12300000 - 11000 - 5000000,
)
tx = SignTx(coin_name='Testnet', version=None, lock_time=None, inputs_count=1, outputs_count=2)
messages = [
None,
# check fee
TxRequest(request_type=TXINPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None)),
TxAck(tx=TransactionType(inputs=[inp1])),
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None), serialized=None),
TxAck(tx=TransactionType(outputs=[out1])),
signing.UiConfirmOutput(out1, coin),
True,
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=1, tx_hash=None), serialized=None),
TxAck(tx=TransactionType(outputs=[out2])),
signing.UiConfirmTotal(5000000, 11000, coin),
True,
# sign tx
TxRequest(request_type=TXINPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None), serialized=None),
TxAck(tx=TransactionType(inputs=[inp1])),
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None), serialized=TxRequestSerializedType(
# returned serialized inp1
serialized_tx=unhexlify('010000000001018a44999c07bba32df1cacdc50987944e68e3205b4429438fdde35c76024614090000000000ffffffff'),
)),
# the out has to be cloned not to send the same object which was modified
TxAck(tx=TransactionType(outputs=[TxOutputType(**out1.__dict__)])),
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=1, tx_hash=None), serialized=TxRequestSerializedType(
# returned serialized out1
serialized_tx=unhexlify('02404b4c000000000017a9147a55d61848e77ca266e79a39bfc85c580a6426c987'),
signature_index=None,
signature=None,
)),
TxAck(tx=TransactionType(outputs=[TxOutputType(**out2.__dict__)])),
# segwit
TxRequest(request_type=TXINPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None), serialized=TxRequestSerializedType(
# returned serialized out2
serialized_tx=unhexlify('a8386f0000000000160014d16b8c0680c61fc6ed2e407455715055e41052f5'),
signature_index=None,
signature=None,
)),
TxAck(tx=TransactionType(inputs=[inp1])),
TxRequest(request_type=TXFINISHED, details=None, serialized=TxRequestSerializedType(
serialized_tx=unhexlify('02483045022100a7ca8f097525f9044e64376dc0a0f5d4aeb8d15d66808ba97979a0475b06b66502200597c8ebcef63e047f9aeef1a8001d3560470cf896c12f6990eec4faec599b950121033add1f0e8e3c3136f7428dd4a4de1057380bd311f5b0856e2269170b4ffa65bf00000000'),
signature_index=0,
signature=unhexlify('3045022100a7ca8f097525f9044e64376dc0a0f5d4aeb8d15d66808ba97979a0475b06b66502200597c8ebcef63e047f9aeef1a8001d3560470cf896c12f6990eec4faec599b95'),
)),
]
signer = signing.sign_tx(tx, root)
for request, response in chunks(messages, 2):
self.assertEqualEx(signer.send(request), response)
with self.assertRaises(StopIteration):
signer.send(None)
def assertEqualEx(self, a, b):
# hack to avoid adding __eq__ to signing.Ui* classes
if ((isinstance(a, signing.UiConfirmOutput) and isinstance(b, signing.UiConfirmOutput)) or
(isinstance(a, signing.UiConfirmTotal) and isinstance(b, signing.UiConfirmTotal))):
return self.assertEqual(a.__dict__, b.__dict__)
else:
return self.assertEqual(a, b)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,337 @@
from common import *
from trezor.utils import chunks
from trezor.crypto import bip32, bip39
from trezor.messages.SignTx import SignTx
from trezor.messages.TxInputType import TxInputType
from trezor.messages.TxOutputType import TxOutputType
from trezor.messages.TxRequest import TxRequest
from trezor.messages.TxAck import TxAck
from trezor.messages.TransactionType import TransactionType
from trezor.messages.RequestType import TXINPUT, TXOUTPUT, TXMETA, TXFINISHED
from trezor.messages.TxRequestDetailsType import TxRequestDetailsType
from trezor.messages.TxRequestSerializedType import TxRequestSerializedType
from trezor.messages import InputScriptType
from trezor.messages import OutputScriptType
from apps.common import coins
from apps.wallet.sign_tx import signing
class TestSignSegwitTxP2WPKHInP2SH(unittest.TestCase):
# pylint: disable=C0301
def test_send_p2wpkh_in_p2sh(self):
coin = coins.by_name('Testnet')
seed = bip39.seed(' '.join(['all'] * 12), '')
root = bip32.from_seed(seed, 'secp256k1')
inp1 = TxInputType(
# 49'/1'/0'/1/0" - 2N1LGaGg836mqSQqiuUBLfcyGBhyZbremDX
address_n=[49 | 0x80000000, 1 | 0x80000000, 0 | 0x80000000, 1, 0],
amount=123456789,
prev_hash=unhexlify('20912f98ea3ed849042efed0fdac8cb4fc301961c5988cba56902d8ffb61c337'),
prev_index=0,
script_type=InputScriptType.SPENDP2SHWITNESS,
sequence=0xffffffff,
)
out1 = TxOutputType(
address='mhRx1CeVfaayqRwq5zgRQmD7W5aWBfD5mC',
amount=12300000,
script_type=OutputScriptType.PAYTOADDRESS,
address_n=None,
)
out2 = TxOutputType(
address='2N1LGaGg836mqSQqiuUBLfcyGBhyZbremDX',
script_type=OutputScriptType.PAYTOADDRESS,
amount=123456789 - 11000 - 12300000,
address_n=None,
)
tx = SignTx(coin_name='Testnet', version=None, lock_time=None, inputs_count=1, outputs_count=2)
messages = [
None,
# check fee
TxRequest(request_type=TXINPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None)),
TxAck(tx=TransactionType(inputs=[inp1])),
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None), serialized=None),
TxAck(tx=TransactionType(outputs=[out1])),
signing.UiConfirmOutput(out1, coin),
True,
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=1, tx_hash=None), serialized=None),
TxAck(tx=TransactionType(outputs=[out2])),
signing.UiConfirmOutput(out2, coin),
True,
signing.UiConfirmTotal(123445789, 11000, coin),
True,
# sign tx
TxRequest(request_type=TXINPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None), serialized=None),
TxAck(tx=TransactionType(inputs=[inp1])),
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None), serialized=TxRequestSerializedType(
# returned serialized inp1
serialized_tx=unhexlify('0100000000010137c361fb8f2d9056ba8c98c5611930fcb48cacfdd0fe2e0449d83eea982f91200000000017160014d16b8c0680c61fc6ed2e407455715055e41052f5ffffffff'),
)),
TxAck(tx=TransactionType(outputs=[out1])),
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=1, tx_hash=None), serialized=TxRequestSerializedType(
# returned serialized out1
serialized_tx=unhexlify('02e0aebb00000000001976a91414fdede0ddc3be652a0ce1afbc1b509a55b6b94888ac'),
signature_index=None,
signature=None,
)),
TxAck(tx=TransactionType(outputs=[out2])),
# segwit
TxRequest(request_type=TXINPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None), serialized=TxRequestSerializedType(
# returned serialized out2
serialized_tx=unhexlify('3df39f060000000017a91458b53ea7f832e8f096e896b8713a8c6df0e892ca87'),
signature_index=None,
signature=None,
)),
TxAck(tx=TransactionType(inputs=[inp1])),
TxRequest(request_type=TXFINISHED, details=None, serialized=TxRequestSerializedType(
serialized_tx=unhexlify('02483045022100ccd253bfdf8a5593cd7b6701370c531199f0f05a418cd547dfc7da3f21515f0f02203fa08a0753688871c220648f9edadbdb98af42e5d8269364a326572cf703895b012103e7bfe10708f715e8538c92d46ca50db6f657bbc455b7494e6a0303ccdb868b7900000000'),
signature_index=0,
signature=unhexlify('3045022100ccd253bfdf8a5593cd7b6701370c531199f0f05a418cd547dfc7da3f21515f0f02203fa08a0753688871c220648f9edadbdb98af42e5d8269364a326572cf703895b'),
)),
]
signer = signing.sign_tx(tx, root)
for request, response in chunks(messages, 2):
self.assertEqualEx(signer.send(request), response)
with self.assertRaises(StopIteration):
signer.send(None)
def test_send_p2wpkh_in_p2sh_change(self):
coin = coins.by_name('Testnet')
seed = bip39.seed(' '.join(['all'] * 12), '')
root = bip32.from_seed(seed, 'secp256k1')
inp1 = TxInputType(
# 49'/1'/0'/1/0" - 2N1LGaGg836mqSQqiuUBLfcyGBhyZbremDX
address_n=[49 | 0x80000000, 1 | 0x80000000, 0 | 0x80000000, 1, 0],
amount=123456789,
prev_hash=unhexlify('20912f98ea3ed849042efed0fdac8cb4fc301961c5988cba56902d8ffb61c337'),
prev_index=0,
script_type=InputScriptType.SPENDP2SHWITNESS,
sequence=0xffffffff,
)
out1 = TxOutputType(
address='mhRx1CeVfaayqRwq5zgRQmD7W5aWBfD5mC',
amount=12300000,
script_type=OutputScriptType.PAYTOADDRESS,
address_n=None,
)
out2 = TxOutputType(
address_n=[49 | 0x80000000, 1 | 0x80000000, 0 | 0x80000000, 1, 0],
script_type=OutputScriptType.PAYTOP2SHWITNESS,
amount=123456789 - 11000 - 12300000,
address=None, # todo ask about sanitizing
)
tx = SignTx(coin_name='Testnet', version=None, lock_time=None, inputs_count=1, outputs_count=2)
messages = [
None,
# check fee
TxRequest(request_type=TXINPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None)),
TxAck(tx=TransactionType(inputs=[inp1])),
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None),
serialized=None),
TxAck(tx=TransactionType(outputs=[out1])),
signing.UiConfirmOutput(out1, coin),
True,
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=1, tx_hash=None),
serialized=None),
TxAck(tx=TransactionType(outputs=[out2])),
signing.UiConfirmTotal(12300000, 11000, coin),
True,
# sign tx
TxRequest(request_type=TXINPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None),
serialized=None),
TxAck(tx=TransactionType(inputs=[inp1])),
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None),
serialized=TxRequestSerializedType(
# returned serialized inp1
serialized_tx=unhexlify(
'0100000000010137c361fb8f2d9056ba8c98c5611930fcb48cacfdd0fe2e0449d83eea982f91200000000017160014d16b8c0680c61fc6ed2e407455715055e41052f5ffffffff'),
)),
# the out has to be cloned not to send the same object which was modified
TxAck(tx=TransactionType(outputs=[TxOutputType(**out1.__dict__)])),
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=1, tx_hash=None),
serialized=TxRequestSerializedType(
# returned serialized out1
serialized_tx=unhexlify(
'02e0aebb00000000001976a91414fdede0ddc3be652a0ce1afbc1b509a55b6b94888ac'),
signature_index=None,
signature=None,
)),
TxAck(tx=TransactionType(outputs=[TxOutputType(**out2.__dict__)])),
# segwit
TxRequest(request_type=TXINPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None),
serialized=TxRequestSerializedType(
# returned serialized out2
serialized_tx=unhexlify(
'3df39f060000000017a91458b53ea7f832e8f096e896b8713a8c6df0e892ca87'),
signature_index=None,
signature=None,
)),
TxAck(tx=TransactionType(inputs=[inp1])),
TxRequest(request_type=TXFINISHED, details=None, serialized=TxRequestSerializedType(
serialized_tx=unhexlify(
'02483045022100ccd253bfdf8a5593cd7b6701370c531199f0f05a418cd547dfc7da3f21515f0f02203fa08a0753688871c220648f9edadbdb98af42e5d8269364a326572cf703895b012103e7bfe10708f715e8538c92d46ca50db6f657bbc455b7494e6a0303ccdb868b7900000000'),
signature_index=0,
signature=unhexlify('3045022100ccd253bfdf8a5593cd7b6701370c531199f0f05a418cd547dfc7da3f21515f0f02203fa08a0753688871c220648f9edadbdb98af42e5d8269364a326572cf703895b'),
)),
]
signer = signing.sign_tx(tx, root)
for request, response in chunks(messages, 2):
self.assertEqualEx(signer.send(request), response)
with self.assertRaises(StopIteration):
signer.send(None)
# see https://github.com/trezor/trezor-mcu/commit/6b615ce40567cc0da0b3b38ff668916aaae9dd4b#commitcomment-25505919
# for the rational behind this attack
def test_send_p2wpkh_in_p2sh_attack_amount(self):
coin = coins.by_name('Testnet')
seed = bip39.seed(' '.join(['all'] * 12), '')
root = bip32.from_seed(seed, 'secp256k1')
inp1 = TxInputType(
# 49'/1'/0'/1/0" - 2N1LGaGg836mqSQqiuUBLfcyGBhyZbremDX
address_n=[49 | 0x80000000, 1 | 0x80000000, 0 | 0x80000000, 1, 0],
amount=10,
prev_hash=unhexlify('20912f98ea3ed849042efed0fdac8cb4fc301961c5988cba56902d8ffb61c337'),
prev_index=0,
script_type=InputScriptType.SPENDP2SHWITNESS,
sequence=0xffffffff,
)
inpattack = TxInputType(
# 49'/1'/0'/1/0" - 2N1LGaGg836mqSQqiuUBLfcyGBhyZbremDX
address_n=[49 | 0x80000000, 1 | 0x80000000, 0 | 0x80000000, 1, 0],
amount=9, # modified!
prev_hash=unhexlify('20912f98ea3ed849042efed0fdac8cb4fc301961c5988cba56902d8ffb61c337'),
prev_index=0,
script_type=InputScriptType.SPENDP2SHWITNESS,
sequence=0xffffffff,
)
out1 = TxOutputType(
address='mhRx1CeVfaayqRwq5zgRQmD7W5aWBfD5mC',
amount=8,
script_type=OutputScriptType.PAYTOADDRESS,
address_n=None,
)
out2 = TxOutputType(
address_n=[49 | 0x80000000, 1 | 0x80000000, 0 | 0x80000000, 1, 0],
script_type=OutputScriptType.PAYTOP2SHWITNESS,
amount=1,
address=None,
)
tx = SignTx(coin_name='Testnet', version=None, lock_time=None, inputs_count=1, outputs_count=2)
messages = [
None,
# check fee
TxRequest(request_type=TXINPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None)),
TxAck(tx=TransactionType(inputs=[inpattack])),
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None),
serialized=None),
TxAck(tx=TransactionType(outputs=[out1])),
signing.UiConfirmOutput(out1, coin),
True,
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=1, tx_hash=None),
serialized=None),
TxAck(tx=TransactionType(outputs=[out2])),
signing.UiConfirmTotal(8, 0, coin),
True,
TxRequest(request_type=TXINPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None),
serialized=None),
TxAck(tx=TransactionType(inputs=[inp1])),
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None),
serialized=TxRequestSerializedType(
# returned serialized inpattack
serialized_tx=unhexlify(
'0100000000010137c361fb8f2d9056ba8c98c5611930fcb48cacfdd0fe2e0449d83eea982f91200000000017160014d16b8c0680c61fc6ed2e407455715055e41052f5ffffffff'),
)),
# the out has to be cloned not to send the same object which was modified
TxAck(tx=TransactionType(outputs=[TxOutputType(**out1.__dict__)])),
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=1, tx_hash=None),
serialized=TxRequestSerializedType(
# returned serialized out1
serialized_tx=unhexlify(
'0208000000000000001976a91414fdede0ddc3be652a0ce1afbc1b509a55b6b94888ac'),
signature_index=None,
signature=None,
)),
TxAck(tx=TransactionType(outputs=[TxOutputType(**out2.__dict__)])),
# segwit
TxRequest(request_type=TXINPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None),
serialized=TxRequestSerializedType(
# returned serialized out2
serialized_tx=unhexlify(
'010000000000000017a91458b53ea7f832e8f096e896b8713a8c6df0e892ca87'),
signature_index=None,
signature=None,
)),
TxAck(tx=TransactionType(inputs=[inp1])),
TxRequest(request_type=TXFINISHED, details=None)
]
signer = signing.sign_tx(tx, root)
i = 0
messages_count = int(len(messages) / 2)
for request, response in chunks(messages, 2):
if i == messages_count - 1: # last message should throw SigningError
self.assertRaises(signing.SigningError, signer.send, request)
else:
self.assertEqualEx(signer.send(request), response)
i += 1
with self.assertRaises(StopIteration):
signer.send(None)
def assertEqualEx(self, a, b):
# hack to avoid adding __eq__ to signing.Ui* classes
if ((isinstance(a, signing.UiConfirmOutput) and isinstance(b, signing.UiConfirmOutput)) or
(isinstance(a, signing.UiConfirmTotal) and isinstance(b, signing.UiConfirmTotal))):
return self.assertEqual(a.__dict__, b.__dict__)
else:
return self.assertEqual(a, b)
if __name__ == '__main__':
unittest.main()

View File

@ -45,6 +45,7 @@ class TestSignTx(unittest.TestCase):
# amount=390000,
prev_hash=unhexlify('d5f65ee80147b4bcc70b75e4bbf2d7382021b871bd8867ef8fa525ef50864882'),
prev_index=0,
amount=None,
script_type=None,
sequence=None)
out1 = TxOutputType(address='1MJ2tj2ThBE62zXbBYA5ZaN3fdve5CPAz1',

View File

@ -0,0 +1,133 @@
# Copyright (c) 2017 Pieter Wuille
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Reference tests for segwit adresses"""
from common import *
from trezor.crypto import bech32
def segwit_scriptpubkey(witver, witprog):
"""Construct a Segwit scriptPubKey for a given witness program."""
return bytes([witver + 0x50 if witver else 0, len(witprog)] + witprog)
VALID_CHECKSUM = [
"A12UEL5L",
"an83characterlonghumanreadablepartthatcontainsthenumber1andtheexcludedcharactersbio1tt5tgs",
"abcdef1qpzry9x8gf2tvdw0s3jn54khce6mua7lmqqqxw",
"11qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqc8247j",
"split1checkupstagehandshakeupstreamerranterredcaperred2y9e3w",
]
INVALID_CHECKSUM = [
" 1nwldj5",
"\x7F" + "1axkwrx",
"an84characterslonghumanreadablepartthatcontainsthenumber1andtheexcludedcharactersbio1569pvx",
"pzry9x0s0muk",
"1pzry9x0s0muk",
"x1b4n0q5v",
"li1dgmt3",
"de1lg7wt\xff",
]
VALID_ADDRESS = [
["BC1QW508D6QEJXTDG4Y5R3ZARVARY0C5XW7KV8F3T4", "0014751e76e8199196d454941c45d1b3a323f1433bd6"],
["tb1qrp33g0q5c5txsp9arysrx4k6zdkfs4nce4xj0gdcccefvpysxf3q0sl5k7",
"00201863143c14c5166804bd19203356da136c985678cd4d27a1b8c6329604903262"],
["bc1pw508d6qejxtdg4y5r3zarvary0c5xw7kw508d6qejxtdg4y5r3zarvary0c5xw7k7grplx",
"5128751e76e8199196d454941c45d1b3a323f1433bd6751e76e8199196d454941c45d1b3a323f1433bd6"],
["BC1SW50QA3JX3S", "6002751e"],
["bc1zw508d6qejxtdg4y5r3zarvaryvg6kdaj", "5210751e76e8199196d454941c45d1b3a323"],
["tb1qqqqqp399et2xygdj5xreqhjjvcmzhxw4aywxecjdzew6hylgvsesrxh6hy",
"0020000000c4a5cad46221b2a187905e5266362b99d5e91c6ce24d165dab93e86433"],
]
INVALID_ADDRESS = [
"tc1qw508d6qejxtdg4y5r3zarvary0c5xw7kg3g4ty",
"bc1qw508d6qejxtdg4y5r3zarvary0c5xw7kv8f3t5",
"BC13W508D6QEJXTDG4Y5R3ZARVARY0C5XW7KN40WF2",
"bc1rw5uspcuh",
"bc10w508d6qejxtdg4y5r3zarvary0c5xw7kw508d6qejxtdg4y5r3zarvary0c5xw7kw5rljs90",
"BC1QR508D6QEJXTDG4Y5R3ZARVARYV98GJ9P",
"tb1qrp33g0q5c5txsp9arysrx4k6zdkfs4nce4xj0gdcccefvpysxf3q0sL5k7",
"bc1zw508d6qejxtdg4y5r3zarvaryvqyzf3du",
"tb1qrp33g0q5c5txsp9arysrx4k6zdkfs4nce4xj0gdcccefvpysxf3pjxtptv",
"bc1gmk9yu",
]
INVALID_ADDRESS_ENC = [
("BC", 0, 20),
("bc", 0, 21),
("bc", 17, 32),
("bc", 1, 1),
("bc", 16, 41),
]
class TestCryptoBech32(unittest.TestCase):
"""Unit test class for segwit addressess."""
def test_valid_checksum(self):
"""Test checksum creation and validation."""
for test in VALID_CHECKSUM:
hrp, _ = bech32.bech32_decode(test)
self.assertIsNotNone(hrp)
pos = test.rfind('1')
test = test[:pos+1] + chr(ord(test[pos + 1]) ^ 1) + test[pos+2:]
hrp, _ = bech32.bech32_decode(test)
self.assertIsNone(hrp)
def test_invalid_checksum(self):
"""Test validation of invalid checksums."""
for test in INVALID_CHECKSUM:
hrp, _ = bech32.bech32_decode(test)
self.assertIsNone(hrp)
def test_valid_address(self):
"""Test whether valid addresses decode to the correct output."""
for (address, hexscript) in VALID_ADDRESS:
hrp = "bc"
witver, witprog = bech32.decode(hrp, address)
if witver is None:
hrp = "tb"
witver, witprog = bech32.decode(hrp, address)
self.assertIsNotNone(witver)
scriptpubkey = segwit_scriptpubkey(witver, witprog)
self.assertEqual(scriptpubkey, unhexlify(hexscript))
addr = bech32.encode(hrp, witver, witprog)
self.assertEqual(address.lower(), addr)
def test_invalid_address(self):
"""Test whether invalid addresses fail to decode."""
for test in INVALID_ADDRESS:
witver, _ = bech32.decode("bc", test)
self.assertIsNone(witver)
witver, _ = bech32.decode("tb", test)
self.assertIsNone(witver)
def test_invalid_address_enc(self):
"""Test whether address encoding fails on invalid input."""
for hrp, version, length in INVALID_ADDRESS_ENC:
code = bech32.encode(hrp, version, [0] * length)
self.assertIsNone(code)
if __name__ == "__main__":
unittest.main()

View File

@ -4,17 +4,30 @@ import json
fields = [
'coin_name',
'coin_shortcut',
'coin_label',
'address_type',
'address_type_p2sh',
'maxfee_kb',
'minfee_kb',
'signed_message_header',
'xpub_magic',
'hash_genesis_block',
'xprv_magic',
'xpub_magic',
'bech32_prefix',
'bip44',
'segwit',
'forkid',
'default_fee_b',
'dust_limit',
'blocktime_minutes',
'firmware',
'address_prefix',
'min_address_length',
'max_address_length',
'bitcore',
]
coins = json.load(open('../../../trezor-common/coins.json', 'r'))
coins = json.load(open('../../vendor/trezor-common/coins.json', 'r'))
print('COINS = [')
for c in coins:

@ -1 +1 @@
Subproject commit 0001cb18c065d7f3aacd6f8d7c5be42cd7477a94
Subproject commit 9c1e61e4608e1d49043fb668eb9c7621fcc20648