@ -4,238 +4,189 @@ from trezor import log, wire
from trezor . crypto import base58 , hashlib
from trezor . crypto . curve import ed25519
from trezor . messages . CardanoSignedTx import CardanoSignedTx
from trezor . messages . CardanoTxAck import CardanoTxAck
from trezor . messages . CardanoTxRequest import CardanoTxRequest
from apps . cardano import CURVE , seed
from apps . cardano . address import (
derive_address_and_node ,
is_safe_output_address ,
validate_full_path ,
)
from apps . cardano . layout import confirm_sending , confirm_transaction , progress
from apps . common import cbor
from apps . common . paths import validate_path
from apps . common . seed import remove_ed25519_prefix
from . import CURVE , seed
from . address import (
derive_address_and_node ,
get_address_attributes ,
validate_full_path ,
validate_output_address ,
)
from . layout import confirm_sending , confirm_transaction
if False :
from typing import Dict , List , Tuple
from trezor . messages . CardanoSignTx import CardanoSignTx
from trezor . messages . CardanoTxInputType import CardanoTxInputType
from trezor . messages . CardanoTxOutputType import CardanoTxOutputType
# the maximum allowed change address. this should be large enough for normal
# use and still allow to quickly brute-force the correct bip32 path
MAX_CHANGE_ADDRESS_INDEX = const ( 1000000 )
ACCOUNT_PREFIX_DEPTH = const ( 2 )
ACCOUNT_PATH_INDEX = const ( 2 )
BIP_PATH_LENGTH = const ( 5 )
KNOWN_PROTOCOL_MAGICS = { 764824073 : " Mainnet " , 1097911063 : " Testnet " }
# we consider addresses from the external chain as possible change addresses as well
def is_change ( output , inputs ) :
for input in inputs :
inp = input . address_n
if (
not output [ : ACCOUNT_PREFIX_DEPTH ] == inp [ : ACCOUNT_PREFIX_DEPTH ]
or not output [ - 2 ] < 2
or not output [ - 1 ] < MAX_CHANGE_ADDRESS_INDEX
) :
return False
return True
async def show_tx (
ctx ,
outputs : list ,
outcoins : list ,
fee : int ,
network_name : str ,
raw_inputs : list ,
raw_outputs : list ,
) - > None :
for index , output in enumerate ( outputs ) :
if is_change ( raw_outputs [ index ] . address_n , raw_inputs ) :
continue
await confirm_sending ( ctx , outcoins [ index ] , output )
total_amount = sum ( outcoins )
await confirm_transaction ( ctx , total_amount , fee , network_name )
async def request_transaction ( ctx , tx_req : CardanoTxRequest , index : int ) :
tx_req . tx_index = index
return await ctx . call ( tx_req , CardanoTxAck )
LOVELACE_MAX_SUPPLY = 45_000_000_000 * 1_000_000
@seed.with_keychain
async def sign_tx ( ctx , msg , keychain : seed . Keychain ) :
progress. init ( msg . transactions_count , " Loading data " )
async def sign_tx (
ctx : wire . Context , msg : CardanoSignTx , keychain : seed . Keychain
) - > CardanoSignedTx :
try :
attested = len ( msg . inputs ) * [ False ]
input_coins_sum = 0
# request transactions
tx_req = CardanoTxRequest ( )
for index in range ( msg . transactions_count ) :
progress . advance ( )
tx_ack = await request_transaction ( ctx , tx_req , index )
tx_hash = hashlib . blake2b (
data = bytes ( tx_ack . transaction ) , outlen = 32
) . digest ( )
tx_decoded = cbor . decode ( tx_ack . transaction )
for i , input in enumerate ( msg . inputs ) :
if not attested [ i ] and input . prev_hash == tx_hash :
attested [ i ] = True
outputs = tx_decoded [ 1 ]
amount = outputs [ input . prev_index ] [ 1 ]
input_coins_sum + = amount
if not all ( attested ) :
raise wire . ProcessError (
" No tx data sent for input " + str ( attested . index ( False ) )
)
transaction = Transaction (
msg . inputs , msg . outputs , keychain , msg . protocol_magic , input_coins_sum
)
if msg . fee > LOVELACE_MAX_SUPPLY :
raise wire . ProcessError ( " Fee is out of range! " )
for i in msg . inputs :
await validate_path ( ctx , validate_full_path , keychain , i . address_n , CURVE )
_validate_outputs ( keychain , msg . outputs , msg . protocol_magic )
# display the transaction in UI
await _show_tx ( ctx , keychain , msg )
# sign the transaction bundle and prepare the result
tx_body , tx_hash = transaction . serialise_tx ( )
tx = CardanoSignedTx ( tx_body = tx_body , tx_hash = tx_hash )
serialized_tx , tx_hash = _serialize_tx ( keychain , msg )
tx = CardanoSignedTx ( serialized_tx = serialized_tx , tx_hash = tx_hash )
except ValueError as e :
if __debug__ :
log . exception ( __name__ , e )
raise wire . ProcessError ( " Signing failed " )
# display the transaction in UI
await show_tx (
ctx ,
transaction . output_addresses ,
transaction . outgoing_coins ,
transaction . fee ,
transaction . network_name ,
transaction . inputs ,
transaction . outputs ,
return tx
def _validate_outputs (
keychain : seed . Keychain , outputs : List [ CardanoTxOutputType ] , protocol_magic : int
) - > None :
total_amount = 0
for output in outputs :
total_amount + = output . amount
if output . address_n :
continue
elif output . address is not None :
validate_output_address ( output . address , protocol_magic )
else :
raise wire . ProcessError ( " Each output must have address or address_n field! " )
if total_amount > LOVELACE_MAX_SUPPLY :
raise wire . ProcessError ( " Total transaction amount is out of range! " )
def _serialize_tx ( keychain : seed . Keychain , msg : CardanoSignTx ) - > Tuple [ bytes , bytes ] :
tx_body = _build_tx_body ( keychain , msg )
tx_hash = _hash_tx_body ( tx_body )
witnesses_for_cbor = _build_witnesses (
keychain , msg . inputs , tx_hash , msg . protocol_magic
)
# byron witnesses have the key 2 in Shelley
witnesses = { 2 : witnesses_for_cbor }
return tx
serialized_tx = cbor . encode ( [ tx_body , witnesses , None ] )
return serialized_tx , tx_hash
class Transaction :
def __init__ (
self ,
inputs : list ,
outputs : list ,
keychain ,
protocol_magic : int ,
input_coins_sum : int ,
) :
self . inputs = inputs
self . outputs = outputs
self . keychain = keychain
# attributes have to be always empty in current Cardano
self . attributes = { }
self . network_name = KNOWN_PROTOCOL_MAGICS . get ( protocol_magic , " Unknown " )
self . protocol_magic = protocol_magic
self . input_coins_sum = input_coins_sum
def _process_outputs ( self ) :
change_addresses = [ ]
change_derivation_paths = [ ]
output_addresses = [ ]
outgoing_coins = [ ]
change_coins = [ ]
for output in self . outputs :
if output . address_n :
address , _ = derive_address_and_node ( self . keychain , output . address_n )
change_addresses . append ( address )
change_derivation_paths . append ( output . address_n )
change_coins . append ( output . amount )
else :
if output . address is None :
raise wire . ProcessError (
" Each output must have address or address_n field! "
)
if not is_safe_output_address ( output . address ) :
raise wire . ProcessError ( " Invalid output address! " )
outgoing_coins . append ( output . amount )
output_addresses . append ( output . address )
self . change_addresses = change_addresses
self . output_addresses = output_addresses
self . outgoing_coins = outgoing_coins
self . change_coins = change_coins
self . change_derivation_paths = change_derivation_paths
def _build_witnesses ( self , tx_aux_hash : str ) :
witnesses = [ ]
for input in self . inputs :
_ , node = derive_address_and_node ( self . keychain , input . address_n )
message = (
b " \x01 " + cbor . encode ( self . protocol_magic ) + b " \x58 \x20 " + tx_aux_hash
)
signature = ed25519 . sign_ext (
node . private_key ( ) , node . private_key_ext ( ) , message
)
extended_public_key = (
remove_ed25519_prefix ( node . public_key ( ) ) + node . chain_code ( )
)
witnesses . append (
[
( input . type or 0 ) ,
cbor . Tagged ( 24 , cbor . encode ( [ extended_public_key , signature ] ) ) ,
]
)
return witnesses
def _build_tx_body ( keychain : seed . Keychain , msg : CardanoSignTx ) - > Dict :
inputs_for_cbor = _build_inputs ( msg . inputs )
outputs_for_cbor = _build_outputs ( keychain , msg . outputs , msg . protocol_magic )
@staticmethod
def compute_fee ( input_coins_sum : int , outgoing_coins : list , change_coins : list ) :
outgoing_coins_sum = sum ( outgoing_coins )
change_coins_sum = sum ( change_coins )
tx_body = {
0 : inputs_for_cbor ,
1 : outputs_for_cbor ,
2 : msg . fee ,
3 : msg . ttl ,
}
return input_coins_sum - outgoing_coins_sum - change_coins_sum
return tx_body
def serialise_tx ( self ) :
self . _process_outputs ( )
def _build_inputs ( inputs : List [ CardanoTxInputType ] ) - > List [ Tuple [ bytes , int ] ] :
return [ ( input . prev_hash , input . prev_index ) for input in inputs ]
inputs_cbor = [ ]
for input in self . inputs :
inputs_cbor . append (
[
( input . type or 0 ) ,
cbor . Tagged ( 24 , cbor . encode ( [ input . prev_hash , input . prev_index ] ) ) ,
]
def _build_outputs (
keychain : seed . Keychain , outputs : List [ CardanoTxOutputType ] , protocol_magic : int
) - > List [ Tuple [ bytes , int ] ] :
result = [ ]
for output in outputs :
amount = output . amount
if output . address_n :
address , _ = derive_address_and_node (
keychain , output . address_n , protocol_magic
)
else :
address = output . address
inputs_cbor = cbor . IndefiniteLengthArray ( inputs_cbor )
result . append ( ( base58 . decode ( address ) , amount ) )
outputs_cbor = [ ]
for index , address in enumerate ( self . output_addresses ) :
outputs_cbor . append (
[ cbor . Raw ( base58 . decode ( address ) ) , self . outgoing_coins [ index ] ]
)
return result
for index , address in enumerate ( self . change_addresses ) :
outputs_cbor . append (
[ cbor . Raw ( base58 . decode ( address ) ) , self . change_coins [ index ] ]
)
outputs_cbor = cbor . IndefiniteLengthArray ( outputs_cbor )
def _hash_tx_body ( tx_body : Dict ) - > bytes :
tx_body_cbor = cbor . encode ( tx_body )
return hashlib . blake2b ( data = tx_body_cbor , outlen = 32 ) . digest ( )
tx_aux_cbor = [ inputs_cbor , outputs_cbor , self . attributes ]
tx_hash = hashlib . blake2b ( data = cbor . encode ( tx_aux_cbor ) , outlen = 32 ) . digest ( )
witnesses = self . _build_witnesses ( tx_hash )
tx_body = cbor . encode ( [ tx_aux_cbor , witnesses ] )
def _build_witnesses (
keychain : seed . Keychain ,
inputs : List [ CardanoTxInputType ] ,
tx_body_hash : bytes ,
protocol_magic : int ,
) - > List [ Tuple [ bytes , bytes , bytes , bytes ] ] :
result = [ ]
for input in inputs :
node = keychain . derive ( input . address_n )
self . fee = self . compute_fee (
self . input_coins_sum , self . outgoing_coins , self . change_coins
public_key = remove_ed25519_prefix ( node . public_key ( ) )
signature = ed25519 . sign_ext (
node . private_key ( ) , node . private_key_ext ( ) , tx_body_hash
)
chain_code = node . chain_code ( )
address_attributes = cbor . encode ( get_address_attributes ( protocol_magic ) )
result . append ( ( public_key , signature , chain_code , address_attributes ) )
return result
return tx_body , tx_hash
async def _show_tx (
ctx : wire . Context , keychain : seed . Keychain , msg : CardanoSignTx
) - > None :
total_amount = 0
for output in msg . outputs :
if _should_hide_output ( output . address_n , msg . inputs ) :
continue
total_amount + = output . amount
if not output . address :
address , _ = derive_address_and_node (
keychain , output . address_n , msg . protocol_magic
)
else :
address = output . address
await confirm_sending ( ctx , output . amount , address )
await confirm_transaction ( ctx , total_amount , msg . fee , msg . protocol_magic )
# addresses from the same account as inputs should be hidden
def _should_hide_output ( output : List [ int ] , inputs : List [ CardanoTxInputType ] ) - > bool :
for input in inputs :
inp = input . address_n
if (
len ( output ) != BIP_PATH_LENGTH
or output [ : ( ACCOUNT_PATH_INDEX + 1 ) ] != inp [ : ( ACCOUNT_PATH_INDEX + 1 ) ]
or output [ - 2 ] > = 2
or output [ - 1 ] > = MAX_CHANGE_ADDRESS_INDEX
) :
return False
return True