1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-01-11 07:50:57 +00:00

Merge pull request #167 from zulucrypto/stellar

[Stellar] [WIP] Add Stellar support to trezorctl
This commit is contained in:
matejcik 2018-05-07 14:23:56 +02:00 committed by GitHub
commit 5329bc9463
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 517 additions and 0 deletions

View File

@ -34,6 +34,7 @@ from trezorlib import coins
from trezorlib import messages as proto
from trezorlib import protobuf
from trezorlib.ckd_public import PRIME_DERIVATION_FLAG
from trezorlib import stellar
class ChoiceType(click.Choice):
@ -961,6 +962,68 @@ def cosi_sign(connect, address, data, global_commitment, global_pubkey):
return client.cosi_sign(address_n, binascii.unhexlify(data), binascii.unhexlify(global_commitment), binascii.unhexlify(global_pubkey))
#
# Stellar functions
#
@cli.command(help='Get Stellar public address')
@click.option('-n', '--address', required=False, help="BIP32 path. Default primary account is m/44'/148'/0'. Always use hardened paths and the m/44'/148'/ prefix")
@click.pass_obj
def stellar_get_address(connect, address):
client = connect()
address_n = stellar.expand_path_or_default(client, address)
# StellarPublicKey response
response = client.stellar_get_public_key(address_n)
return stellar.address_from_public_key(response.public_key)
@cli.command(help='Sign a string with a Stellar key')
@click.option('-n', '--address', required=False, help="BIP32 path. Default primary account is m/44'/148'/0'. Always use hardened paths and the m/44'/148'/ prefix")
@click.argument('message')
@click.pass_obj
def stellar_sign_message(connect, address, message):
client = connect()
address_n = stellar.expand_path_or_default(client, address)
response = client.stellar_sign_message(address_n, message)
return base64.b64encode(response.signature)
@cli.command(help='Verify that a signature is valid')
@click.option('--message-is-b64/--no-message-is-b64', default=False, required=False, help="If set, the message argument will be interpreted as a base64-encoded string")
@click.argument('address')
@click.argument('signatureb64')
@click.argument('message')
@click.pass_obj
def stellar_verify_message(connect, address, message_is_b64, signatureb64, message):
if message_is_b64:
message = base64.b64decode(message)
else:
message = message.encode('utf-8')
pubkey_bytes = stellar.address_to_public_key(address)
client = connect()
is_verified = client.stellar_verify_message(pubkey_bytes, base64.b64decode(signatureb64), message)
if is_verified:
return "Success: message verified"
else:
print("ERROR: invalid signature, verification failed")
sys.exit(1)
@cli.command(help='Sign a base64-encoded transaction envelope')
@click.option('-n', '--address', required=False, help="BIP32 path. Default primary account is m/44'/148'/0'. Always use hardened paths and the m/44'/148'/ prefix")
@click.option('-n', '--network-passphrase', required=False, help="Network passphrase (blank for public network). Testnet is: 'Test SDF Network ; September 2015'")
@click.argument('b64envelope')
@click.pass_obj
def stellar_sign_transaction(connect, b64envelope, address, network_passphrase):
client = connect()
address_n = stellar.expand_path_or_default(client, address)
# raw signature bytes
resp = client.stellar_sign_transaction(base64.b64decode(b64envelope), address_n, network_passphrase)
return base64.b64encode(resp.signature)
#
# Main
#

View File

@ -35,6 +35,7 @@ from . import tools
from . import mapping
from . import nem
from .coins import slip44
from . import stellar
from .debuglink import DebugLink
from .protobuf import MessageType
@ -1174,6 +1175,54 @@ class ProtocolMixin(object):
return self.call(proto.SelfTest(payload=b'\x00\xFF\x55\xAA\x66\x99\x33\xCCABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!\x00\xFF\x55\xAA\x66\x99\x33\xCC'))
@expect(proto.StellarPublicKey)
def stellar_get_public_key(self, address_n):
return self.call(proto.StellarGetPublicKey(address_n=address_n))
def stellar_sign_transaction(self, tx_envelope, address_n, network_passphrase=None):
# default networkPassphrase to the public network
if network_passphrase is None:
network_passphrase = "Public Global Stellar Network ; September 2015"
tx, operations = stellar.parse_transaction_bytes(tx_envelope)
tx.network_passphrase = network_passphrase
tx.address_n = address_n
# Signing loop works as follows:
#
# 1. Start with tx (header information for the transaction) and operations (an array of operation protobuf messagess)
# 2. Send the tx header to the device
# 3. Receive a StellarTxOpRequest message
# 4. Send operations one by one until all operations have been sent. If there are more operations to sign, the device will send a StellarTxOpRequest message
# 5. The final message received will be StellarSignedTx which is returned from this method
resp = self.call(tx)
try:
while isinstance(resp, proto.StellarTxOpRequest):
resp = self.call(operations.pop(0))
except IndexError:
# pop from empty list
raise CallException("Stellar.UnexpectedEndOfOperations",
"Reached end of operations without a signature.") from None
if not isinstance(resp, proto.StellarSignedTx):
raise CallException(proto.FailureType.UnexpectedMessage, resp)
if operations:
raise CallException("Stellar.UnprocessedOperations",
"Received a signature before processing all operations.")
return resp
@expect(proto.StellarMessageSignature)
def stellar_sign_message(self, address_n, message):
return self.call(proto.StellarSignMessage(address_n=address_n, message=message))
def stellar_verify_message(self, pubkey_bytes, signature, message):
resp = self.call(proto.StellarVerifyMessage(public_key=pubkey_bytes, message=message, signature=signature))
return isinstance(resp, proto.Success)
class TrezorClient(ProtocolMixin, TextUIMixin, BaseClient):
def __init__(self, transport, *args, **kwargs):

326
trezorlib/stellar.py Normal file
View File

@ -0,0 +1,326 @@
import base64
import struct
import xdrlib
from . import messages as proto
# Memo types
MEMO_TYPE_TEXT = 0
MEMO_TYPE_ID = 1
MEMO_TYPE_HASH = 2
MEMO_TYPE_RETURN = 4
# Asset types
ASSET_TYPE_NATIVE = 0
ASSET_TYPE_ALPHA4 = 1
ASSET_TYPE_ALPHA12 = 2
# Operations
OP_CREATE_ACCOUNT = 0
OP_PAYMENT = 1
OP_PATH_PAYMENT = 2
OP_MANAGE_OFFER = 3
OP_CREATE_PASSIVE_OFFER = 4
OP_SET_OPTIONS = 5
OP_CHANGE_TRUST = 6
OP_ALLOW_TRUST = 7
OP_ACCOUNT_MERGE = 8
OP_INFLATION = 9 # Included for documentation purposes, not supported by Trezor
OP_MANAGE_DATA = 10
OP_BUMP_SEQUENCE = 11
def expand_path_or_default(client, address):
"""Uses client to parse address and returns an array of integers
If no address is specified, the default of m/44'/148'/0' is used
"""
if address:
return client.expand_path(address)
else:
return client.expand_path("m/44'/148'/0'")
def address_from_public_key(pk_bytes):
"""Returns the base32-encoded version of pk_bytes (G...)
"""
final_bytes = bytearray()
# version
final_bytes.append(6 << 3)
# public key
final_bytes.extend(pk_bytes)
# checksum
final_bytes.extend(struct.pack("<H", _crc16_checksum(final_bytes)))
return base64.b32encode(final_bytes)
def address_to_public_key(address_str):
"""Returns the raw 32 bytes representing a public key by extracting
it from the G... string
"""
decoded = base64.b32decode(address_str)
# skip 0th byte (version) and last two bytes (checksum)
return decoded[1:-2]
def parse_transaction_bytes(tx_bytes):
"""Parses base64data into a map with the following keys:
tx - a StellarSignTx describing the transaction header
operations - an array of protobuf message objects for each operation
"""
tx = proto.StellarSignTx(
protocol_version=1
)
unpacker = xdrlib.Unpacker(tx_bytes)
tx.source_account = _xdr_read_address(unpacker)
tx.fee = unpacker.unpack_uint()
tx.sequence_number = unpacker.unpack_uhyper()
# Timebounds is an optional field
if unpacker.unpack_bool():
max_timebound = 2**32 - 1 # max unsigned 32-bit int (trezor does not support the full 64-bit time value)
tx.timebounds_start = unpacker.unpack_uhyper()
tx.timebounds_end = unpacker.unpack_uhyper()
if tx.timebounds_start > max_timebound or tx.timebounds_start < 0:
raise ValueError("Starting timebound out of range (must be between 0 and " + max_timebound)
if tx.timebounds_end > max_timebound or tx.timebounds_end < 0:
raise ValueError("Ending timebound out of range (must be between 0 and " + max_timebound)
# memo type determines what optional fields are set
tx.memo_type = unpacker.unpack_uint()
# text
if tx.memo_type == MEMO_TYPE_HASH:
tx.memo_text = unpacker.unpack_string()
# id (64-bit uint)
if tx.memo_type == MEMO_TYPE_ID:
tx.memo_id = unpacker.unpack_uhyper()
# hash / return are the same structure (32 bytes representing a hash)
if tx.memo_type == MEMO_TYPE_HASH or tx.memo_type == MEMO_TYPE_RETURN:
tx.memo_hash = unpacker.unpack_fopaque(32)
tx.num_operations = unpacker.unpack_uint()
operations = []
for i in range(tx.num_operations):
operations.append(_parse_operation_bytes(unpacker))
return tx, operations
def _parse_operation_bytes(unpacker):
"""Returns a protobuf message representing the next operation as read from
the byte stream in unpacker
"""
# Check for and parse optional source account field
source_account = None
if unpacker.unpack_bool():
source_account = unpacker.unpack_fopaque(32)
# Operation type (See OP_ constants)
type = unpacker.unpack_uint()
if type == OP_CREATE_ACCOUNT:
return proto.StellarCreateAccountOp(
source_account=source_account,
new_account=_xdr_read_address(unpacker),
starting_balance=unpacker.unpack_hyper()
)
if type == OP_PAYMENT:
return proto.StellarPaymentOp(
source_account=source_account,
destination_account=_xdr_read_address(unpacker),
asset=_xdr_read_asset(unpacker),
amount=unpacker.unpack_hyper()
)
if type == OP_PATH_PAYMENT:
op = proto.StellarPathPaymentOp(
source_account=source_account,
send_asset=_xdr_read_asset(unpacker),
send_max=unpacker.unpack_hyper(),
destination_account=_xdr_read_address(unpacker),
destination_asset=_xdr_read_asset(unpacker),
paths=[]
)
num_paths = unpacker.unpack_uint()
for i in range(num_paths):
op.paths.append(_xdr_read_asset(unpacker))
return op
if type == OP_MANAGE_OFFER:
return proto.StellarManageOfferOp(
source_account=source_account,
selling_asset=_xdr_read_asset(unpacker),
buying_asset=_xdr_read_asset(unpacker),
amount=unpacker.unpack_hyper(),
price_n=unpacker.unpack_uint(),
price_d=unpacker.unpack_uint(),
offer_id=unpacker.unpack_uhyper()
)
if type == OP_CREATE_PASSIVE_OFFER:
return proto.StellarCreatePassiveOfferOp(
source_account=source_account,
selling_asset=_xdr_read_asset(unpacker),
buying_asset=_xdr_read_asset(unpacker),
amount=unpacker.unpack_hyper(),
price_n=unpacker.unpack_uint(),
price_d=unpacker.unpack_uint()
)
if type == OP_SET_OPTIONS:
op = proto.StellarSetOptionsOp(
source_account=source_account
)
# Inflation destination
if unpacker.unpack_bool():
op.inflation_destination_account = _xdr_read_address(unpacker)
# clear flags
if unpacker.unpack_bool():
op.clear_flags = unpacker.unpack_uint()
# set flags
if unpacker.unpack_bool():
op.set_flags = unpacker.unpack_uint()
# master weight
if unpacker.unpack_bool():
op.master_weight = unpacker.unpack_uint()
# low threshold
if unpacker.unpack_bool():
op.low_threshold = unpacker.unpack_uint()
# medium threshold
if unpacker.unpack_bool():
op.medium_threshold = unpacker.unpack_uint()
# high threshold
if unpacker.unpack_bool():
op.high_threshold = unpacker.unpack_uint()
# home domain
if unpacker.unpack_bool():
op.home_domain = unpacker.unpack_string()
# signer
if unpacker.unpack_bool():
op.signer_type = unpacker.unpack_uint()
op.signer_key = unpacker.unpack_fopaque(32)
op.signer_weight = unpacker.unpack_uint()
return op
if type == OP_CHANGE_TRUST:
return proto.StellarChangeTrustOp(
source_account=source_account,
asset=_xdr_read_asset(unpacker),
limit=unpacker.unpack_uhyper()
)
if type == OP_ALLOW_TRUST:
op = proto.StellarAllowTrustOp(
source_account=source_account,
trusted_account=_xdr_read_address(unpacker),
asset_type=unpacker.unpack_uint()
)
if op.asset_type == ASSET_TYPE_ALPHA4:
op.asset_code = unpacker.unpack_fstring(4)
if op.asset_type == ASSET_TYPE_ALPHA12:
op.asset_code = unpacker.unpack_fstring(12)
op.is_authorized = unpacker.unpack_bool()
return op
if type == OP_ACCOUNT_MERGE:
return proto.StellarAccountMergeOp(
source_account=source_account,
destination_account=_xdr_read_address(unpacker)
)
# Inflation is not implemented since anyone can submit this operation to the network
if type == OP_MANAGE_DATA:
op = proto.StellarManageDataOp(
source_account=source_account,
key=unpacker.unpack_string(),
)
# Only set value if the field is present
if unpacker.unpack_bool():
op.value = unpacker.unpack_opaque()
return op
# Bump Sequence
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L269
if type == OP_BUMP_SEQUENCE:
return proto.StellarBumpSequenceOp(
source_account=source_account,
bump_to=unpacker.unpack_uhyper()
)
raise ValueError("Unknown operation type: " + type)
def _xdr_read_asset(unpacker):
"""Reads a stellar Asset from unpacker"""
asset = proto.StellarAssetType(
type=unpacker.unpack_uint()
)
if asset.type == ASSET_TYPE_ALPHA4:
asset.code = unpacker.unpack_fstring(4)
asset.issuer = _xdr_read_address(unpacker)
if asset.type == ASSET_TYPE_ALPHA12:
asset.code = unpacker.unpack_fstring(12)
asset.issuer = _xdr_read_address(unpacker)
return asset
def _xdr_read_address(unpacker):
"""Reads a stellar address and returns the 32-byte
data representing the address
"""
# First 4 bytes are the address type
address_type = unpacker.unpack_uint()
if address_type != 0:
raise ValueError("Unsupported address type")
return unpacker.unpack_fopaque(32)
def _crc16_checksum(bytes):
"""Returns the CRC-16 checksum of bytearray bytes
Ported from Java implementation at: http://introcs.cs.princeton.edu/java/61data/CRC16CCITT.java.html
Initial value changed to 0x0000 to match Stellar configuration.
"""
crc = 0x0000
polynomial = 0x1021
for byte in bytes:
for i in range(8):
bit = ((byte >> (7 - i) & 1) == 1)
c15 = ((crc >> 15 & 1) == 1)
crc <<= 1
if c15 ^ bit:
crc ^= polynomial
return crc & 0xffff

View File

@ -0,0 +1,29 @@
# This file is part of the TREZOR project.
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
from .common import TrezorTest
from trezorlib import stellar
import pytest
@pytest.mark.xfail # requires trezor-mcu PR #259
class TestMsgStellarGetPublicKey(TrezorTest):
def test_stellar_get_address(self):
self.setup_mnemonic_nopin_nopassphrase()
# GAK5MSF74TJW6GLM7NLTL76YZJKM2S4CGP3UH4REJHPHZ4YBZW2GSBPW
response = self.client.stellar_get_public_key(self.client.expand_path("m/44'/148'/0'"))
assert stellar.address_from_public_key(response.public_key) == b'GAK5MSF74TJW6GLM7NLTL76YZJKM2S4CGP3UH4REJHPHZ4YBZW2GSBPW'

View File

@ -0,0 +1,50 @@
# This file is part of the TREZOR project.
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# XDR decoding tool available at:
# https://www.stellar.org/laboratory/#xdr-viewer
#
from base64 import b64decode, b64encode
from .common import TrezorTest
import pytest
@pytest.mark.xfail # requires trezor-mcu PR #259
class TestMsgStellarSignTransaction(TrezorTest):
def get_network_passphrase(self):
"""Use the same passphrase as the network that generated the test XDR/signatures"""
return "Integration Test Network ; zulucrypto"
def get_address_n(self):
"""BIP32 path of the default account"""
return self.client.expand_path("m/44'/148'/0'")
def test_sign_tx_bump_sequence_op(self):
self.setup_mnemonic_nopin_nopassphrase()
xdr = b64decode("AAAAABXWSL/k028ZbPtXNf/YylTNS4Iz90PyJEnefPMBzbRpAAAAZAAAAAEAAAAAAAAAAAAAAAAAAAABAAAAAAAAAAt//////////wAAAAAAAAAA")
response = self.client.stellar_sign_transaction(xdr, self.get_address_n(), self.get_network_passphrase())
assert b64encode(response.signature) == b'UAOL4ZPYIOzEgM66kBrhyNjLR66dNXtuNrmvd3m0/pc8qCSoLmYY4TybS0lHiMtb+LFZESTaxrpErMHz1sZ6DQ=='
def test_sign_tx_account_merge_op(self):
self.setup_mnemonic_nopin_nopassphrase()
xdr = b64decode("AAAAABXWSL/k028ZbPtXNf/YylTNS4Iz90PyJEnefPMBzbRpAAAAZAAAAAEAAAAAAAAAAAAAAAAAAAABAAAAAAAAAAgAAAAAXVVkJGaxhbhDFS6eIZFR28WJICfsQBAaUXvtXKAwwuAAAAAAAAAAAQHNtGkAAABAgjoPRj4sW5o7NAXzYOqPK0uxfPbeKb4Qw48LJiCH/XUZ6YVCiZogePC0Z5ISUlozMh6YO6HoYtuLPbm7jq+eCA==")
response = self.client.stellar_sign_transaction(xdr, self.get_address_n(), self.get_network_passphrase())
assert b64encode(response.signature) == b'gjoPRj4sW5o7NAXzYOqPK0uxfPbeKb4Qw48LJiCH/XUZ6YVCiZogePC0Z5ISUlozMh6YO6HoYtuLPbm7jq+eCA=='