@ -13,14 +13,45 @@
#
# You should have received a copy of the License along with this library.
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
import base64
import struct
import xdrlib
from decimal import Decimal
from typing import Union
from . import exceptions , messages
from . tools import expect
try :
from stellar_sdk import (
AccountMerge ,
AllowTrust ,
Asset ,
BumpSequence ,
ChangeTrust ,
CreateAccount ,
CreatePassiveSellOffer ,
HashMemo ,
IdMemo ,
ManageData ,
ManageSellOffer ,
Operation ,
PathPaymentStrictReceive ,
Payment ,
ReturnHashMemo ,
SetOptions ,
TextMemo ,
TransactionEnvelope ,
TrustLineEntryFlag ,
Price ,
Network ,
)
from stellar_sdk . xdr . signer_key_type import SignerKeyType
HAVE_STELLAR_SDK = True
DEFAULT_NETWORK_PASSPHRASE = Network . PUBLIC_NETWORK_PASSPHRASE
except ImportError :
HAVE_STELLAR_SDK = False
DEFAULT_NETWORK_PASSPHRASE = " Public Global Stellar Network ; September 2015 "
# Memo types
MEMO_TYPE_NONE = 0
MEMO_TYPE_TEXT = 1
@ -33,309 +64,191 @@ ASSET_TYPE_NATIVE = 0
ASSET_TYPE_ALPHA4 = 1
ASSET_TYPE_ALPHA12 = 2
# Operations
OP_CREATE_ACCOUNT = 0
OP_PAYMENT = 1
OP_PATH_PAYMENT = 2
OP_MANAGE_OFFER = 3
OP_CREATE_PASSIVE_OFFER = 4
OP_SET_OPTIONS = 5
OP_CHANGE_TRUST = 6
OP_ALLOW_TRUST = 7
OP_ACCOUNT_MERGE = 8
OP_INFLATION = 9 # Included for documentation purposes, not supported by Trezor
OP_MANAGE_DATA = 10
OP_BUMP_SEQUENCE = 11
DEFAULT_BIP32_PATH = " m/44h/148h/0h "
# Stellar's BIP32 differs to Bitcoin's see https://github.com/stellar/stellar-protocol/blob/master/ecosystem/sep-0005.md
DEFAULT_NETWORK_PASSPHRASE = " Public Global Stellar Network ; September 2015 "
def address_from_public_key ( pk_bytes ) :
""" Returns the base32-encoded version of pk_bytes (G...) """
final_bytes = bytearray ( )
# version
final_bytes . append ( 6 << 3 )
# public key
final_bytes . extend ( pk_bytes )
# checksum
final_bytes . extend ( struct . pack ( " <H " , _crc16_checksum ( final_bytes ) ) )
return base64 . b32encode ( final_bytes ) . decode ( )
def address_to_public_key ( address_str ) :
""" Returns the raw 32 bytes representing a public key by extracting
it from the G . . . string
"""
decoded = base64 . b32decode ( address_str )
# skip 0th byte (version) and last two bytes (checksum)
return decoded [ 1 : - 2 ]
def parse_transaction_bytes ( tx_bytes ) :
""" Parses base64data into a map with the following keys:
def from_envelope ( envelope : " TransactionEnvelope " ) :
""" Parses transaction envelope into a map with the following keys:
tx - a StellarSignTx describing the transaction header
operations - an array of protobuf message objects for each operation
"""
if not HAVE_STELLAR_SDK :
raise RuntimeError ( " Stellar SDK not available " )
tx = messages . StellarSignTx ( )
unpacker = xdrlib . Unpacker ( tx_bytes )
tx . source_account = _xdr_read_address ( unpacker )
tx . fee = unpacker . unpack_uint ( )
tx . sequence_number = unpacker . unpack_uhyper ( )
parsed_tx = envelope . transaction
tx . source_account = parsed_tx . source . account_id
tx . fee = parsed_tx . fee
tx . sequence_number = parsed_tx . sequence
# Timebounds is an optional field
if unpacker . unpack_bool ( ) :
max_timebound = 2 * * 32 - 1 # max unsigned 32-bit int
# (trezor does not support the full 64-bit time value)
tx . timebounds_start = unpacker . unpack_uhyper ( )
tx . timebounds_end = unpacker . unpack_uhyper ( )
if tx . timebounds_start > max_timebound or tx . timebounds_start < 0 :
raise ValueError (
" Starting timebound out of range (must be between 0 and "
+ max_timebound
)
if tx . timebounds_end > max_timebound or tx . timebounds_end < 0 :
raise ValueError (
" Ending timebound out of range (must be between 0 and " + max_timebound
)
# memo type determines what optional fields are set
tx . memo_type = unpacker . unpack_uint ( )
# text
if tx . memo_type == MEMO_TYPE_TEXT :
tx . memo_text = unpacker . unpack_string ( ) . decode ( )
# id (64-bit uint)
if tx . memo_type == MEMO_TYPE_ID :
tx . memo_id = unpacker . unpack_uhyper ( )
# hash / return are the same structure (32 bytes representing a hash)
if tx . memo_type == MEMO_TYPE_HASH or tx . memo_type == MEMO_TYPE_RETURN :
tx . memo_hash = unpacker . unpack_fopaque ( 32 )
tx . num_operations = unpacker . unpack_uint ( )
operations = [ ]
for _ in range ( tx . num_operations ) :
operations . append ( _parse_operation_bytes ( unpacker ) )
if parsed_tx . time_bounds :
tx . timebounds_start = parsed_tx . time_bounds . min_time
tx . timebounds_end = parsed_tx . time_bounds . max_time
memo = parsed_tx . memo
if isinstance ( memo , TextMemo ) :
# memo_text is specified as UTF-8 string, but returned as bytes from the XDR parser
tx . memo_type = MEMO_TYPE_TEXT
tx . memo_text = memo . memo_text . decode ( " utf-8 " )
elif isinstance ( memo , IdMemo ) :
tx . memo_type = MEMO_TYPE_ID
tx . memo_id = memo . memo_id
elif isinstance ( memo , HashMemo ) :
tx . memo_type = MEMO_TYPE_HASH
tx . memo_hash = memo . memo_hash
elif isinstance ( memo , ReturnHashMemo ) :
tx . memo_type = MEMO_TYPE_RETURN
tx . memo_hash = memo . memo_return
else :
tx . memo_type = MEMO_TYPE_NONE
tx . num_operations = len ( parsed_tx . operations )
operations = [ _read_operation ( op ) for op in parsed_tx . operations ]
return tx , operations
def _parse_operation_bytes ( unpacker ) :
""" Returns a protobuf message representing the next operation as read from
the byte stream in unpacker
"""
# Check for and parse optional source account field
source_account = None
if unpacker . unpack_bool ( ) :
source_account = unpacker . unpack_fopaque ( 32 )
# Operation type (See OP_ constants)
type = unpacker . unpack_uint ( )
if type == OP_CREATE_ACCOUNT :
def _read_operation ( op : " Operation " ) :
# TODO: Let's add muxed account support later.
if op . source :
source_account = op . source . account_id
else :
source_account = None
if isinstance ( op , CreateAccount ) :
return messages . StellarCreateAccountOp (
source_account = source_account ,
new_account = _xdr_read_address( unpacker ) ,
starting_balance = unpacker. unpack_hyper ( ) ,
new_account = op . destination ,
starting_balance = _read_amount ( op . starting_balance ) ,
)
if type == OP_PAYMENT :
if isinstance ( op , Payment ) :
return messages . StellarPaymentOp (
source_account = source_account ,
destination_account = _xdr_read_address( unpacker ) ,
asset = _ xdr_read_asset( unpacker ) ,
amount = unpacker. unpack_hyper ( ) ,
destination_account = op. destination . account_id ,
asset = _ read_asset( op . asset ) ,
amount = _read_amount( op . amount ) ,
)
if type == OP_PATH_PAYMENT :
op = messages . StellarPathPaymentOp (
if isinstance ( op , PathPaymentStrictReceive ) :
operation = messages . StellarPathPaymentOp (
source_account = source_account ,
send_asset = _ xdr_read_asset( unpacker ) ,
send_max = unpacker. unpack_hyper ( ) ,
destination_account = _xdr_read_address( unpacker ) ,
destination_asset = _ xdr_read_asset( unpacker ) ,
destination_amount = unpacker. unpack_hyper ( ) ,
paths = [ ] ,
send_asset = _ read_asset( op . send_asset ) ,
send_max = _read_amount( op . send_max ) ,
destination_account = op. destination . account_id ,
destination_asset = _ read_asset( op . dest_asset ) ,
destination_amount = _read_amount( op . dest_amount ) ,
paths = [ _read_asset ( asset ) for asset in op . path ] ,
)
num_paths = unpacker . unpack_uint ( )
for _ in range ( num_paths ) :
op . paths . append ( _xdr_read_asset ( unpacker ) )
return op
if type == OP_MANAGE_OFFER :
return operation
if isinstance ( op , ManageSellOffer ) :
price = _read_price ( op . price )
return messages . StellarManageOfferOp (
source_account = source_account ,
selling_asset = _ xdr_read_asset( unpacker ) ,
buying_asset = _ xdr_read_asset( unpacker ) ,
amount = unpacker. unpack_hyper ( ) ,
price_n = unpacker. unpack_uint ( ) ,
price_d = unpacker. unpack_uint ( ) ,
offer_id = unpacker. unpack_uhyper ( ) ,
selling_asset = _read_asset ( op . selling ) ,
buying_asset = _read_asset ( op . buying ) ,
amount = _read_amount ( op . amount ) ,
price_n = price. n ,
price_d = price. d ,
offer_id = op. offer_id ,
)
if type == OP_CREATE_PASSIVE_OFFER :
if isinstance ( op , CreatePassiveSellOffer ) :
price = _read_price ( op . price )
return messages . StellarCreatePassiveOfferOp (
source_account = source_account ,
selling_asset = _ xdr_read_asset( unpacker ) ,
buying_asset = _ xdr_read_asset( unpacker ) ,
amount = unpacker. unpack_hyper ( ) ,
price_n = unpacker. unpack_uint ( ) ,
price_d = unpacker. unpack_uint ( ) ,
selling_asset = _ read_asset( op . selling ) ,
buying_asset = _ read_asset( op . buying ) ,
amount = _read_amount( op . amount ) ,
price_n = price. n ,
price_d = price. d ,
)
if type == OP_SET_OPTIONS :
op = messages . StellarSetOptionsOp ( source_account = source_account )
# Inflation destination
if unpacker . unpack_bool ( ) :
op . inflation_destination_account = _xdr_read_address ( unpacker )
# clear flags
if unpacker . unpack_bool ( ) :
op . clear_flags = unpacker . unpack_uint ( )
# set flags
if unpacker . unpack_bool ( ) :
op . set_flags = unpacker . unpack_uint ( )
# master weight
if unpacker . unpack_bool ( ) :
op . master_weight = unpacker . unpack_uint ( )
# low threshold
if unpacker . unpack_bool ( ) :
op . low_threshold = unpacker . unpack_uint ( )
# medium threshold
if unpacker . unpack_bool ( ) :
op . medium_threshold = unpacker . unpack_uint ( )
# high threshold
if unpacker . unpack_bool ( ) :
op . high_threshold = unpacker . unpack_uint ( )
# home domain
if unpacker . unpack_bool ( ) :
op . home_domain = unpacker . unpack_string ( ) . decode ( )
# signer
if unpacker . unpack_bool ( ) :
op . signer_type = unpacker . unpack_uint ( )
op . signer_key = unpacker . unpack_fopaque ( 32 )
op . signer_weight = unpacker . unpack_uint ( )
return op
if type == OP_CHANGE_TRUST :
if isinstance ( op , SetOptions ) :
operation = messages . StellarSetOptionsOp (
source_account = source_account ,
inflation_destination_account = op . inflation_dest ,
clear_flags = op . clear_flags ,
set_flags = op . set_flags ,
master_weight = op . master_weight ,
low_threshold = op . low_threshold ,
medium_threshold = op . med_threshold ,
high_threshold = op . high_threshold ,
home_domain = op . home_domain ,
)
if op . signer :
signer_type = op . signer . signer_key . signer_key . type
if signer_type == SignerKeyType . SIGNER_KEY_TYPE_ED25519 :
signer_key = op . signer . signer_key . signer_key . ed25519 . uint256
elif signer_type == SignerKeyType . SIGNER_KEY_TYPE_HASH_X :
signer_key = op . signer . signer_key . signer_key . hash_x . uint256
elif signer_type == SignerKeyType . SIGNER_KEY_TYPE_PRE_AUTH_TX :
signer_key = op . signer . signer_key . signer_key . pre_auth_tx . uint256
else :
raise ValueError ( " Unsupported signer key type " )
operation . signer_type = signer_type . value
operation . signer_key = signer_key
operation . signer_weight = op . signer . weight
return operation
if isinstance ( op , ChangeTrust ) :
return messages . StellarChangeTrustOp (
source_account = source_account ,
asset = _ xdr_read_asset( unpacker ) ,
limit = unpacker. unpack_uhyper ( ) ,
asset = _read_asset ( op . asset ) ,
limit = _read_amount ( op . limit ) ,
)
if type == OP_ALLOW_TRUST :
op = messages . StellarAllowTrustOp (
if isinstance ( op , AllowTrust ) :
is_authorized = False
if op . authorize is True or TrustLineEntryFlag . AUTHORIZED_FLAG == op . authorize :
is_authorized = True
asset_type = (
ASSET_TYPE_ALPHA4 if len ( op . asset_code ) < = 4 else ASSET_TYPE_ALPHA12
)
return messages . StellarAllowTrustOp (
source_account = source_account ,
trusted_account = _xdr_read_address ( unpacker ) ,
asset_type = unpacker . unpack_uint ( ) ,
trusted_account = op . trustor ,
asset_type = asset_type ,
asset_code = op . asset_code ,
is_authorized = is_authorized ,
)
if op . asset_type == ASSET_TYPE_ALPHA4 :
op . asset_code = unpacker . unpack_fstring ( 4 ) . decode ( )
if op . asset_type == ASSET_TYPE_ALPHA12 :
op . asset_code = unpacker . unpack_fstring ( 12 ) . decode ( )
op . is_authorized = unpacker . unpack_bool ( )
return op
if type == OP_ACCOUNT_MERGE :
if isinstance ( op , AccountMerge ) :
return messages . StellarAccountMergeOp (
source_account = source_account ,
destination_account = _xdr_read_address( unpacker ) ,
destination_account = op . destination . account_id ,
)
# Inflation is not implemented since anyone can submit this operation to the network
if type == OP_MANAGE_DATA :
op = messages . StellarManageDataOp (
source_account = source_account , key = unpacker . unpack_string ( ) . decode ( )
if isinstance ( op , ManageData ) :
return messages . StellarManageDataOp (
source_account = source_account ,
key = op . data_name ,
value = op . data_value ,
)
# Only set value if the field is present
if unpacker . unpack_bool ( ) :
op . value = unpacker . unpack_opaque ( )
return op
# Bump Sequence
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L269
if type == OP_BUMP_SEQUENCE :
if isinstance ( op , BumpSequence ) :
return messages . StellarBumpSequenceOp (
source_account = source_account , bump_to = unpacker. unpack_uhyper ( )
source_account = source_account , bump_to = op . bump_to
)
raise ValueError ( f " Unknown operation type: { op . __class__ . __name__ } " )
raise ValueError ( " Unknown operation type: " + str ( type ) )
def _xdr_read_asset ( unpacker ) :
""" Reads a stellar Asset from unpacker """
asset = messages . StellarAssetType ( type = unpacker . unpack_uint ( ) )
if asset . type == ASSET_TYPE_ALPHA4 :
asset . code = unpacker . unpack_fstring ( 4 ) . decode ( )
asset . issuer = _xdr_read_address ( unpacker )
if asset . type == ASSET_TYPE_ALPHA12 :
asset . code = unpacker . unpack_fstring ( 12 ) . decode ( )
asset . issuer = _xdr_read_address ( unpacker )
return asset
def _xdr_read_address ( unpacker ) :
""" Reads a stellar address and returns the string representing the address
This method assumes the encoded address is a public address ( starting with G )
"""
# First 4 bytes are the address type
address_type = unpacker . unpack_uint ( )
if address_type != 0 :
raise ValueError ( " Unsupported address type " )
return address_from_public_key ( unpacker . unpack_fopaque ( 32 ) )
def _read_amount ( amount : str ) - > int :
return Operation . to_xdr_amount ( amount )
def _crc16_checksum ( bytes ) :
""" Returns the CRC-16 checksum of bytearray bytes
def _read_price ( price : Union [ " Price " , str , Decimal ] ) - > " Price " :
# In the coming stellar-sdk 5.x, the type of price must be Price,
# at that time we can remove this function
if isinstance ( price , Price ) :
return price
return Price . from_raw_price ( price )
Ported from Java implementation at : http : / / introcs . cs . princeton . edu / java / 61 data / CRC16CCITT . java . html
Initial value changed to 0x0000 to match Stellar configuration .
"""
crc = 0x0000
polynomial = 0x1021
for byte in bytes :
for i in range ( 8 ) :
bit = ( byte >> ( 7 - i ) & 1 ) == 1
c15 = ( crc >> 15 & 1 ) == 1
crc << = 1
if c15 ^ bit :
crc ^ = polynomial
return crc & 0xFFFF
def _read_asset ( asset : " Asset " ) - > messages . StellarAssetType :
""" Reads a stellar Asset from unpacker """
if asset . is_native ( ) :
return messages . StellarAssetType ( type = ASSET_TYPE_NATIVE )
if asset . guess_asset_type ( ) == " credit_alphanum4 " :
return messages . StellarAssetType (
type = ASSET_TYPE_ALPHA4 , code = asset . code , issuer = asset . issuer
)
if asset . guess_asset_type ( ) == " credit_alphanum12 " :
return messages . StellarAssetType (
type = ASSET_TYPE_ALPHA12 , code = asset . code , issuer = asset . issuer
)
raise ValueError ( " Unsupported asset type " )
# ====== Client functions ====== #