1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-01-10 23:40:58 +00:00

stellar - refactor parser to return protobuf messages instead of map

This commit is contained in:
ZuluCrypto 2018-04-19 15:10:27 -06:00
parent 38647de7f9
commit 73b07a489b
No known key found for this signature in database
GPG Key ID: 0D1266F87C28A2E1

View File

@ -1,8 +1,34 @@
import base64
import struct
import binascii
import xdrlib
from . import messages as proto
# Memo types
MEMO_TYPE_TEXT = 0
MEMO_TYPE_ID = 1
MEMO_TYPE_HASH = 2
MEMO_TYPE_RETURN = 4
# Asset types
ASSET_TYPE_NATIVE = 0
ASSET_TYPE_ALPHA4 = 1
ASSET_TYPE_ALPHA12 = 2
# Operations
OP_CREATE_ACCOUNT = 0
OP_PAYMENT = 1
OP_PATH_PAYMENT = 2
OP_MANAGE_OFFER = 3
OP_CREATE_PASSIVE_OFFER = 4
OP_SET_OPTIONS = 5
OP_CHANGE_TRUST = 6
OP_ALLOW_TRUST = 7
OP_ACCOUNT_MERGE = 8
OP_INFLATION = 9 # Included for documentation purposes, not supported by Trezor
OP_MANAGE_DATA = 10
OP_BUMP_SEQUENCE = 11
def expand_path_or_default(client, address):
"""Uses client to parse address and returns an array of integers
If no address is specified, the default of m/44'/148'/0' is used
@ -31,7 +57,6 @@ def address_to_public_key(address_str):
"""Returns the raw 32 bytes representing a public key by extracting
it from the G... string
"""
final_bytes = bytearray()
decoded = base64.b32decode(address_str)
# skip 0th byte (version) and last two bytes (checksum)
@ -39,221 +64,232 @@ def address_to_public_key(address_str):
def parse_transaction_bytes(bytes):
"""Parses base64data into a StellarSignTx message
"""Parses base64data into a map with the following keys:
tx - a StellarSignTx describing the transaction header
operations - an array of protobuf message objects for each operation
"""
parsed = {}
parsed["protocol_version"] = 1
parsed["operations"] = []
tx = proto.StellarSignTx(
protocol_version=1
)
operations = []
unpacker = xdrlib.Unpacker(bytes)
parsed["source_account"] = _xdr_read_address(unpacker)
parsed["fee"] = unpacker.unpack_uint()
parsed["sequence_number"] = unpacker.unpack_uhyper()
tx.source_account = _xdr_read_address(unpacker)
tx.fee = unpacker.unpack_uint()
tx.sequence_number = unpacker.unpack_uhyper()
# Timebounds is an optional field
parsed["timebounds_start"] = 0
parsed["timebounds_end"] = 0
has_timebounds = unpacker.unpack_bool()
if has_timebounds:
max_timebound = 2**32-1 # max unsigned 32-bit int (trezor does not support the full 64-bit time value)
parsed["timebounds_start"] = unpacker.unpack_uhyper()
parsed["timebounds_end"] = unpacker.unpack_uhyper()
tx.timebounds_start = unpacker.unpack_uhyper()
tx.timebounds_end = unpacker.unpack_uhyper()
if parsed["timebounds_start"] > max_timebound or parsed["timebounds_start"] < 0:
if tx.timebounds_start > max_timebound or tx.timebounds_start < 0:
raise ValueError("Starting timebound out of range (must be between 0 and " + max_timebound)
if parsed["timebounds_end"] > max_timebound or parsed["timebounds_end"] < 0:
if tx.timebounds_end > max_timebound or tx.timebounds_end < 0:
raise ValueError("Ending timebound out of range (must be between 0 and " + max_timebound)
# memo type determines what optional fields are set
parsed["memo_type"] = unpacker.unpack_uint()
parsed["memo_text"] = None
parsed["memo_id"] = None
parsed["memo_hash"] = None
tx.memo_type = unpacker.unpack_uint()
# text
if parsed["memo_type"] == 1:
parsed["memo_text"] = unpacker.unpack_string()
if tx.memo_type == MEMO_TYPE_HASH:
tx.memo_text = unpacker.unpack_string()
# id (64-bit uint)
if parsed["memo_type"] == 2:
parsed["memo_id"] = unpacker.unpack_uhyper()
if tx.memo_type == MEMO_TYPE_ID:
tx.memo_id = unpacker.unpack_uhyper()
# hash / return are the same structure (32 bytes representing a hash)
if parsed["memo_type"] == 3 or parsed["memo_type"] == 4:
parsed["memo+hash"] = unpacker.unpack_fopaque(32)
if tx.memo_type == MEMO_TYPE_HASH or tx.memo_type == MEMO_TYPE_RETURN:
tx.memo_hash = unpacker.unpack_fopaque(32)
parsed["num_operations"] = unpacker.unpack_uint()
tx.num_operations = unpacker.unpack_uint()
for opIdx in range(0, parsed["num_operations"]):
parsed["operations"].append(_parse_operation_bytes(unpacker))
for i in range(tx.num_operations):
operations.append(_parse_operation_bytes(unpacker))
return parsed
def _parse_operation_bytes(unpacker):
"""Returns a dictionary describing the next operation as read from
the byte stream in unpacker
"""
op = {
"source_account": None,
"type": None
return {
"tx": tx,
"operations": operations
}
def _parse_operation_bytes(unpacker):
"""Returns a protobuf message representing the next operation as read from
the byte stream in unpacker
"""
# Check for and parse optional source account field
source_account = None
has_source_account = unpacker.unpack_bool()
if has_source_account:
op["source_account"] = unpacker.unpack_fopaque(32)
source_account = unpacker.unpack_fopaque(32)
op["type"] = unpacker.unpack_uint()
# Operation type (See OP_ constants)
type = unpacker.unpack_uint()
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L16
if op["type"] == 0:
op["new_account"] = _xdr_read_address(unpacker)
op["starting_balance"] = unpacker.unpack_hyper()
if type == OP_CREATE_ACCOUNT:
return proto.StellarCreateAccountOp(
source_account=source_account,
new_account=_xdr_read_address(unpacker),
starting_balance=unpacker.unpack_hyper()
)
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L54
if op["type"] == 1:
op["destination_account"] = _xdr_read_address(unpacker)
op["asset"] = _xdr_read_asset(unpacker)
op["amount"] = unpacker.unpack_hyper()
if type == OP_PAYMENT:
return proto.StellarPaymentOp(
source_account=source_account,
destination_account=_xdr_read_address(unpacker),
asset=_xdr_read_asset(unpacker),
amount=unpacker.unpack_hyper()
)
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L72
if op["type"] == 2:
op["send_asset"] = _xdr_read_asset(unpacker)
op["send_max"] = unpacker.unpack_hyper()
op["destination_account"] = _xdr_read_address(unpacker)
op["destination_asset"] = _xdr_read_asset(unpacker)
op["destination_amount"] = unpacker.unpack_hyper()
op["paths"] = []
if type == OP_PATH_PAYMENT:
op = proto.StellarPathPaymentOp(
source_account=source_account,
send_asset=_xdr_read_asset(unpacker),
send_max=unpacker.unpack_hyper(),
destination_account=_xdr_read_address(unpacker),
destination_asset=_xdr_read_asset(unpacker),
paths=[]
)
num_paths = unpacker.unpack_uint()
for i in range(0, num_paths):
op["paths"].append(_xdr_read_asset(unpacker))
for i in range(num_paths):
op.paths.append(_xdr_read_asset(unpacker))
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L93
if op["type"] == 3:
op["selling_asset"] = _xdr_read_asset(unpacker)
op["buying_asset"] = _xdr_read_asset(unpacker)
op["amount"] = unpacker.unpack_hyper()
op["price_n"] = unpacker.unpack_uint()
op["price_d"] = unpacker.unpack_uint()
op["offer_id"] = unpacker.unpack_uhyper()
return op
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L111
if op["type"] == 4:
op["selling_asset"] = _xdr_read_asset(unpacker)
op["buying_asset"] = _xdr_read_asset(unpacker)
op["amount"] = unpacker.unpack_hyper()
op["price_n"] = unpacker.unpack_uint()
op["price_d"] = unpacker.unpack_uint()
if type == OP_MANAGE_OFFER:
return proto.StellarManageOfferOp(
source_account=source_account,
selling_asset=_xdr_read_asset(unpacker),
buying_asset=_xdr_read_asset(unpacker),
amount=unpacker.unpack_hyper(),
price_n=unpacker.unpack_uint(),
price_d=unpacker.unpack_uint(),
offer_id=unpacker.unpack_uhyper()
)
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L129
if op["type"] == 5:
op["inflation_destination"] = None
op["clear_flags"] = None
op["set_flags"] = None
op["master_weight"] = None
op["low_threshold"] = None
op["medium_threshold"] = None
op["high_threshold"] = None
op["home_domain"] = None
op["signer_type"] = None
op["signer_key"] = None
op["signer_weight"] = None
if type == OP_CREATE_PASSIVE_OFFER:
return proto.StellarCreatePassiveOfferOp(
source_account=source_account,
selling_asset=_xdr_read_asset(unpacker),
buying_asset=_xdr_read_asset(unpacker),
amount=unpacker.unpack_hyper(),
price_n=unpacker.unpack_uint(),
price_d=unpacker.unpack_uint()
)
op["has_inflation_destination"] = unpacker.unpack_bool()
if op["has_inflation_destination"]:
op["inflation_destination"] = _xdr_read_address(unpacker)
if type == OP_SET_OPTIONS:
op = proto.StellarSetOptionsOp(
source_account=source_account
)
op["has_clear_flags"] = unpacker.unpack_bool()
if op["has_clear_flags"]:
op["clear_flags"] = unpacker.unpack_uint()
# Inflation destination
if unpacker.unpack_bool():
op.inflation_destination_account = _xdr_read_address(unpacker)
op["has_set_flags"] = unpacker.unpack_bool()
if op["has_set_flags"]:
op["set_flags"] = unpacker.unpack_uint()
# clear flags
if unpacker.unpack_bool():
op.clear_flags = unpacker.unpack_uint()
op["has_master_weight"] = unpacker.unpack_bool()
if op["has_master_weight"]:
op["master_weight"] = unpacker.unpack_uint()
# set flags
if unpacker.unpack_bool():
op.set_flags = unpacker.unpack_uint()
op["has_low_threshold"] = unpacker.unpack_bool()
if op["has_low_threshold"]:
op["low_threshold"] = unpacker.unpack_uint()
# master weight
if unpacker.unpack_bool():
op.master_weight = unpacker.unpack_uint()
op["has_medium_threshold"] = unpacker.unpack_bool()
if op["has_medium_threshold"]:
op["medium_threshold"] = unpacker.unpack_uint()
# low threshold
if unpacker.unpack_bool():
op.low_threshold = unpacker.unpack_uint()
op["has_high_threshold"] = unpacker.unpack_bool()
if op["has_high_threshold"]:
op["high_threshold"] = unpacker.unpack_uint()
# medium threshold
if unpacker.unpack_bool():
op.medium_threshold = unpacker.unpack_uint()
op["has_home_domain"] = unpacker.unpack_bool()
if op["has_home_domain"]:
op["home_domain"] = unpacker.unpack_string()
# high threshold
if unpacker.unpack_bool():
op.high_threshold = unpacker.unpack_uint()
op["has_signer"] = unpacker.unpack_bool()
if op["has_signer"]:
op["signer_type"] = unpacker.unpack_uint()
op["signer_key"] = unpacker.unpack_fopaque(32)
op["signer_weight"] = unpacker.unpack_uint()
# home domain
if unpacker.unpack_bool():
op.home_domain = unpacker.unpack_string()
# Change Trust
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L156
if op["type"] == 6:
op["asset"] = _xdr_read_asset(unpacker)
op["limit"] = unpacker.unpack_uhyper()
# signer
if unpacker.unpack_bool():
op.signer_type = unpacker.unpack_uint()
op.signer_key = unpacker.unpack_fopaque(32)
op.signer_weight = unpacker.unpack_uint()
# Allow Trust
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L173
if op["type"] == 7:
op["trusted_account"] = _xdr_read_address(unpacker)
op["asset_type"] = unpacker.unpack_uint()
return op
if op["asset_type"] == 1:
op["asset_code"] = unpacker.unpack_fstring(4)
if op["asset_type"] == 2:
op["asset_code"] = unpacker.unpack_fstring(12)
if type == OP_CHANGE_TRUST:
return proto.StellarChangeTrustOp(
source_account=source_account,
asset=_xdr_read_asset(unpacker),
limit=unpacker.unpack_uhyper()
)
op["is_authorized"] = unpacker.unpack_bool()
if type == OP_ALLOW_TRUST:
op = proto.StellarAllowTrustOp(
source_account=source_account,
trusted_account=_xdr_read_address(unpacker),
asset_type=unpacker.unpack_uint()
)
# Merge Account
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L251
if op["type"] == 8:
op["destination_account"] = _xdr_read_address(unpacker)
if op.asset_type == ASSET_TYPE_ALPHA4:
op.asset_code = unpacker.unpack_fstring(4)
if op.asset_type == ASSET_TYPE_ALPHA12:
op.asset_code = unpacker.unpack_fstring(12)
# Inflation is not implemented since any account can send this operation
op.is_authorized = unpacker.unpack_bool()
# Manage Data
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L218
if op["type"] == 10:
op["key"] = unpacker.unpack_string()
return op
op["value"] = None
op["has_value"] = unpacker.unpack_bool()
if op["has_value"]:
op["value"] = unpacker.unpack_opaque()
if type == OP_ACCOUNT_MERGE:
return proto.StellarAccountMergeOp(
source_account=source_account,
destination_account=_xdr_read_address(unpacker)
)
# Inflation is not implemented since anyone can submit this operation to the network
if type == OP_MANAGE_DATA:
op = proto.StellarManageDataOp(
source_account=source_account,
key=unpacker.unpack_string(),
)
# Only set value if the field is present
if unpacker.unpack_bool():
op.value = unpacker.unpack_opaque()
return op
# Bump Sequence
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L269
if op["type"] == 11:
op["bump_to"] = unpacker.unpack_uhyper()
if type == OP_BUMP_SEQUENCE:
return proto.StellarBumpSequenceOp(
source_account=source_account,
bump_to=unpacker.unpack_uhyper()
)
return op
raise ValueError("Unknown operation type: " + type)
def _xdr_read_asset(unpacker):
"""Reads a stellar Asset from unpacker"""
asset = {
"type": unpacker.unpack_uint(),
"code": None,
"issuer": None
}
asset = proto.StellarAssetType(
type=unpacker.unpack_uint()
)
# alphanum 4
if asset["type"] == 1:
asset["code"] = unpacker.unpack_fstring(4)
asset["issuer"] = _xdr_read_address(unpacker)
if asset.type == ASSET_TYPE_ALPHA4:
asset.code = unpacker.unpack_fstring(4)
asset.issuer = _xdr_read_address(unpacker)
if asset["type"] == 2:
asset["code"] = unpacker.unpack_fstring(12)
asset["issuer"] = _xdr_read_address(unpacker)
if asset.type == ASSET_TYPE_ALPHA12:
asset.code = unpacker.unpack_fstring(12)
asset.issuer = _xdr_read_address(unpacker)
return asset