1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2024-10-17 13:29:17 +00:00
trezor-firmware/python/src/trezorlib/stellar.py
matejcik 67b879ac07 python: restructure package
This deserves some explanation.

* tests were moved to separate python/tests subdir
* trezorlib was moved to python/src, so that it does not exist on
PYTHONPATH by default
(see https://blog.ionelmc.ro/2014/05/25/python-packaging/ for details)
* everything was updated to understand the new structure
* trezorctl was changed from a top-level executable script to a module
`trezorlib.cli.trezorctl` and is installed via the entry_points
mechanism.
This should make it work normally on Windows!

The package should be installable as normal through pip and pipenv, no
changes are needed on that side.

The source package from pypi will include unit tests. (Device tests were
completely moved out). Wheel will exclude them, because users don't need
them.
That shrinks the .whl from 520 kB to 270 - nice!

python: reorganize remaining unit tests
2019-08-12 12:57:25 +02:00

386 lines
12 KiB
Python

# This file is part of the Trezor project.
#
# Copyright (C) 2012-2019 SatoshiLabs and contributors
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License version 3
# as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the License along with this library.
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
import base64
import struct
import xdrlib
from . import messages
from .tools import CallException, expect
# Memo types
MEMO_TYPE_NONE = 0
MEMO_TYPE_TEXT = 1
MEMO_TYPE_ID = 2
MEMO_TYPE_HASH = 3
MEMO_TYPE_RETURN = 4
# Asset types
ASSET_TYPE_NATIVE = 0
ASSET_TYPE_ALPHA4 = 1
ASSET_TYPE_ALPHA12 = 2
# Operations
OP_CREATE_ACCOUNT = 0
OP_PAYMENT = 1
OP_PATH_PAYMENT = 2
OP_MANAGE_OFFER = 3
OP_CREATE_PASSIVE_OFFER = 4
OP_SET_OPTIONS = 5
OP_CHANGE_TRUST = 6
OP_ALLOW_TRUST = 7
OP_ACCOUNT_MERGE = 8
OP_INFLATION = 9 # Included for documentation purposes, not supported by Trezor
OP_MANAGE_DATA = 10
OP_BUMP_SEQUENCE = 11
DEFAULT_BIP32_PATH = "m/44h/148h/0h"
# Stellar's BIP32 differs to Bitcoin's see https://github.com/stellar/stellar-protocol/blob/master/ecosystem/sep-0005.md
DEFAULT_NETWORK_PASSPHRASE = "Public Global Stellar Network ; September 2015"
def address_from_public_key(pk_bytes):
"""Returns the base32-encoded version of pk_bytes (G...)
"""
final_bytes = bytearray()
# version
final_bytes.append(6 << 3)
# public key
final_bytes.extend(pk_bytes)
# checksum
final_bytes.extend(struct.pack("<H", _crc16_checksum(final_bytes)))
return base64.b32encode(final_bytes).decode()
def address_to_public_key(address_str):
"""Returns the raw 32 bytes representing a public key by extracting
it from the G... string
"""
decoded = base64.b32decode(address_str)
# skip 0th byte (version) and last two bytes (checksum)
return decoded[1:-2]
def parse_transaction_bytes(tx_bytes):
"""Parses base64data into a map with the following keys:
tx - a StellarSignTx describing the transaction header
operations - an array of protobuf message objects for each operation
"""
tx = messages.StellarSignTx()
unpacker = xdrlib.Unpacker(tx_bytes)
tx.source_account = _xdr_read_address(unpacker)
tx.fee = unpacker.unpack_uint()
tx.sequence_number = unpacker.unpack_uhyper()
# Timebounds is an optional field
if unpacker.unpack_bool():
max_timebound = 2 ** 32 - 1 # max unsigned 32-bit int
# (trezor does not support the full 64-bit time value)
tx.timebounds_start = unpacker.unpack_uhyper()
tx.timebounds_end = unpacker.unpack_uhyper()
if tx.timebounds_start > max_timebound or tx.timebounds_start < 0:
raise ValueError(
"Starting timebound out of range (must be between 0 and "
+ max_timebound
)
if tx.timebounds_end > max_timebound or tx.timebounds_end < 0:
raise ValueError(
"Ending timebound out of range (must be between 0 and " + max_timebound
)
# memo type determines what optional fields are set
tx.memo_type = unpacker.unpack_uint()
# text
if tx.memo_type == MEMO_TYPE_TEXT:
tx.memo_text = unpacker.unpack_string()
# id (64-bit uint)
if tx.memo_type == MEMO_TYPE_ID:
tx.memo_id = unpacker.unpack_uhyper()
# hash / return are the same structure (32 bytes representing a hash)
if tx.memo_type == MEMO_TYPE_HASH or tx.memo_type == MEMO_TYPE_RETURN:
tx.memo_hash = unpacker.unpack_fopaque(32)
tx.num_operations = unpacker.unpack_uint()
operations = []
for _ in range(tx.num_operations):
operations.append(_parse_operation_bytes(unpacker))
return tx, operations
def _parse_operation_bytes(unpacker):
"""Returns a protobuf message representing the next operation as read from
the byte stream in unpacker
"""
# Check for and parse optional source account field
source_account = None
if unpacker.unpack_bool():
source_account = unpacker.unpack_fopaque(32)
# Operation type (See OP_ constants)
type = unpacker.unpack_uint()
if type == OP_CREATE_ACCOUNT:
return messages.StellarCreateAccountOp(
source_account=source_account,
new_account=_xdr_read_address(unpacker),
starting_balance=unpacker.unpack_hyper(),
)
if type == OP_PAYMENT:
return messages.StellarPaymentOp(
source_account=source_account,
destination_account=_xdr_read_address(unpacker),
asset=_xdr_read_asset(unpacker),
amount=unpacker.unpack_hyper(),
)
if type == OP_PATH_PAYMENT:
op = messages.StellarPathPaymentOp(
source_account=source_account,
send_asset=_xdr_read_asset(unpacker),
send_max=unpacker.unpack_hyper(),
destination_account=_xdr_read_address(unpacker),
destination_asset=_xdr_read_asset(unpacker),
destination_amount=unpacker.unpack_hyper(),
paths=[],
)
num_paths = unpacker.unpack_uint()
for _ in range(num_paths):
op.paths.append(_xdr_read_asset(unpacker))
return op
if type == OP_MANAGE_OFFER:
return messages.StellarManageOfferOp(
source_account=source_account,
selling_asset=_xdr_read_asset(unpacker),
buying_asset=_xdr_read_asset(unpacker),
amount=unpacker.unpack_hyper(),
price_n=unpacker.unpack_uint(),
price_d=unpacker.unpack_uint(),
offer_id=unpacker.unpack_uhyper(),
)
if type == OP_CREATE_PASSIVE_OFFER:
return messages.StellarCreatePassiveOfferOp(
source_account=source_account,
selling_asset=_xdr_read_asset(unpacker),
buying_asset=_xdr_read_asset(unpacker),
amount=unpacker.unpack_hyper(),
price_n=unpacker.unpack_uint(),
price_d=unpacker.unpack_uint(),
)
if type == OP_SET_OPTIONS:
op = messages.StellarSetOptionsOp(source_account=source_account)
# Inflation destination
if unpacker.unpack_bool():
op.inflation_destination_account = _xdr_read_address(unpacker)
# clear flags
if unpacker.unpack_bool():
op.clear_flags = unpacker.unpack_uint()
# set flags
if unpacker.unpack_bool():
op.set_flags = unpacker.unpack_uint()
# master weight
if unpacker.unpack_bool():
op.master_weight = unpacker.unpack_uint()
# low threshold
if unpacker.unpack_bool():
op.low_threshold = unpacker.unpack_uint()
# medium threshold
if unpacker.unpack_bool():
op.medium_threshold = unpacker.unpack_uint()
# high threshold
if unpacker.unpack_bool():
op.high_threshold = unpacker.unpack_uint()
# home domain
if unpacker.unpack_bool():
op.home_domain = unpacker.unpack_string()
# signer
if unpacker.unpack_bool():
op.signer_type = unpacker.unpack_uint()
op.signer_key = unpacker.unpack_fopaque(32)
op.signer_weight = unpacker.unpack_uint()
return op
if type == OP_CHANGE_TRUST:
return messages.StellarChangeTrustOp(
source_account=source_account,
asset=_xdr_read_asset(unpacker),
limit=unpacker.unpack_uhyper(),
)
if type == OP_ALLOW_TRUST:
op = messages.StellarAllowTrustOp(
source_account=source_account,
trusted_account=_xdr_read_address(unpacker),
asset_type=unpacker.unpack_uint(),
)
if op.asset_type == ASSET_TYPE_ALPHA4:
op.asset_code = unpacker.unpack_fstring(4)
if op.asset_type == ASSET_TYPE_ALPHA12:
op.asset_code = unpacker.unpack_fstring(12)
op.is_authorized = unpacker.unpack_bool()
return op
if type == OP_ACCOUNT_MERGE:
return messages.StellarAccountMergeOp(
source_account=source_account,
destination_account=_xdr_read_address(unpacker),
)
# Inflation is not implemented since anyone can submit this operation to the network
if type == OP_MANAGE_DATA:
op = messages.StellarManageDataOp(
source_account=source_account, key=unpacker.unpack_string()
)
# Only set value if the field is present
if unpacker.unpack_bool():
op.value = unpacker.unpack_opaque()
return op
# Bump Sequence
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L269
if type == OP_BUMP_SEQUENCE:
return messages.StellarBumpSequenceOp(
source_account=source_account, bump_to=unpacker.unpack_uhyper()
)
raise ValueError("Unknown operation type: " + str(type))
def _xdr_read_asset(unpacker):
"""Reads a stellar Asset from unpacker"""
asset = messages.StellarAssetType(type=unpacker.unpack_uint())
if asset.type == ASSET_TYPE_ALPHA4:
asset.code = unpacker.unpack_fstring(4)
asset.issuer = _xdr_read_address(unpacker)
if asset.type == ASSET_TYPE_ALPHA12:
asset.code = unpacker.unpack_fstring(12)
asset.issuer = _xdr_read_address(unpacker)
return asset
def _xdr_read_address(unpacker):
"""Reads a stellar address and returns the string representing the address
This method assumes the encoded address is a public address (starting with G)
"""
# First 4 bytes are the address type
address_type = unpacker.unpack_uint()
if address_type != 0:
raise ValueError("Unsupported address type")
return address_from_public_key(unpacker.unpack_fopaque(32))
def _crc16_checksum(bytes):
"""Returns the CRC-16 checksum of bytearray bytes
Ported from Java implementation at: http://introcs.cs.princeton.edu/java/61data/CRC16CCITT.java.html
Initial value changed to 0x0000 to match Stellar configuration.
"""
crc = 0x0000
polynomial = 0x1021
for byte in bytes:
for i in range(8):
bit = (byte >> (7 - i) & 1) == 1
c15 = (crc >> 15 & 1) == 1
crc <<= 1
if c15 ^ bit:
crc ^= polynomial
return crc & 0xFFFF
# ====== Client functions ====== #
@expect(messages.StellarAddress, field="address")
def get_address(client, address_n, show_display=False):
return client.call(
messages.StellarGetAddress(address_n=address_n, show_display=show_display)
)
def sign_tx(
client, tx, operations, address_n, network_passphrase=DEFAULT_NETWORK_PASSPHRASE
):
tx.network_passphrase = network_passphrase
tx.address_n = address_n
tx.num_operations = len(operations)
# Signing loop works as follows:
#
# 1. Start with tx (header information for the transaction) and operations (an array of operation protobuf messagess)
# 2. Send the tx header to the device
# 3. Receive a StellarTxOpRequest message
# 4. Send operations one by one until all operations have been sent. If there are more operations to sign, the device will send a StellarTxOpRequest message
# 5. The final message received will be StellarSignedTx which is returned from this method
resp = client.call(tx)
try:
while isinstance(resp, messages.StellarTxOpRequest):
resp = client.call(operations.pop(0))
except IndexError:
# pop from empty list
raise CallException(
"Stellar.UnexpectedEndOfOperations",
"Reached end of operations without a signature.",
) from None
if not isinstance(resp, messages.StellarSignedTx):
raise CallException(messages.FailureType.UnexpectedMessage, resp)
if operations:
raise CallException(
"Stellar.UnprocessedOperations",
"Received a signature before processing all operations.",
)
return resp