diff --git a/trezorctl b/trezorctl index 115993eea..bb24a7fe2 100755 --- a/trezorctl +++ b/trezorctl @@ -27,6 +27,7 @@ import functools import json import os import sys +import trezorlib.stellar as stellar from trezorlib.client import TrezorClient, TrezorClientVerbose, CallException, format_protobuf from trezorlib.transport import get_transport, enumerate_devices, TransportException @@ -865,6 +866,65 @@ def cosi_sign(connect, address, data, global_commitment, global_pubkey): return client.cosi_sign(address_n, binascii.unhexlify(data), binascii.unhexlify(global_commitment), binascii.unhexlify(global_pubkey)) +# +# Stellar functions +# +@cli.command(help='Get Stellar public address') +@click.option('-n', '--address', required=False, help="BIP32 path. Default primary account is m/44'/148'/0'. Always use hardened paths and the m/44'/148'/ prefix") +@click.pass_obj +def stellar_get_address(connect, address): + client = connect() + address_n = stellar.expand_path_or_default(client, address) + # StellarPublicKey response + response = client.stellar_get_public_key(address_n) + return stellar.address_from_public_key(response.public_key) + +@cli.command(help='Sign a string with a Stellar key') +@click.option('-n', '--address', required=False, help="BIP32 path. Default primary account is m/44'/148'/0'. Always use hardened paths and the m/44'/148'/ prefix") +@click.argument('message') +@click.pass_obj +def stellar_sign_message(connect, address, message): + client = connect() + address_n = stellar.expand_path_or_default(client, address) + response = client.stellar_sign_message(address_n, message) + return base64.b64encode(response.signature) + +@cli.command(help='Verify that a signature is valid') +@click.option('--message-is-b64/--no-message-is-b64', default=False, required=False, help="If set, the message argument will be interpreted as a base64-encoded string") +@click.argument('address') +@click.argument('signatureb64') +@click.argument('message') +@click.pass_obj +def stellar_verify_message(connect, address, message_is_b64, signatureb64, message): + if message_is_b64: + message = base64.b64decode(message) + else: + message = message.encode('utf-8') + + pubkey_bytes = stellar.address_to_public_key(address) + + client = connect() + is_verified = client.stellar_verify_message(pubkey_bytes, base64.b64decode(signatureb64), message) + + if is_verified: + return "Success: message verified" + else: + print("ERROR: invalid signature, verification failed") + sys.exit(1) + +@cli.command(help='Sign a base64-encoded transaction envelope') +@click.option('-n', '--address', required=False, help="BIP32 path. Default primary account is m/44'/148'/0'. Always use hardened paths and the m/44'/148'/ prefix") +@click.option('-n', '--network-passphrase', required=False, help="Network passphrase (blank for public network). Testnet is: 'Test SDF Network ; September 2015'") +@click.argument('b64envelope') +@click.pass_obj +def stellar_sign_transaction(connect, b64envelope, address, network_passphrase): + client = connect() + address_n = stellar.expand_path_or_default(client, address) + # raw signature bytes + resp = client.stellar_sign_transaction(base64.b64decode(b64envelope), address_n, network_passphrase) + + return base64.b64encode(resp.signature) + # # Main # diff --git a/trezorlib/client.py b/trezorlib/client.py index 68e259ac8..66263924e 100644 --- a/trezorlib/client.py +++ b/trezorlib/client.py @@ -37,6 +37,7 @@ from . import nem from .coins import coins_slip44 from .debuglink import DebugLink from .protobuf import MessageType +from . import stellar as stellar if sys.version_info.major < 3: @@ -1100,6 +1101,172 @@ class ProtocolMixin(object): return self.call(proto.SelfTest(payload=b'\x00\xFF\x55\xAA\x66\x99\x33\xCCABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!\x00\xFF\x55\xAA\x66\x99\x33\xCC')) + @expect(proto.StellarPublicKey) + def stellar_get_public_key(self, address_n): + return self.call(proto.StellarGetPublicKey(address_n=address_n)) + + def stellar_sign_transaction(self, txEnvelope, address_n, networkPassphrase=None): + # default networkPassphrase to the public network + if networkPassphrase is None: + networkPassphrase = "Public Global Stellar Network ; September 2015" + + parsed = stellar.parse_transaction_bytes(txEnvelope) + + # Will return a StellarTxOpRequest + resp = self.call(proto.StellarSignTx( + protocol_version=parsed["protocol_version"], + address_n=address_n, + network_passphrase=networkPassphrase, + source_account=parsed["source_account"], + fee=parsed["fee"], + sequence_number=parsed["sequence_number"], + timebounds_start=parsed["timebounds_start"], + timebounds_end=parsed["timebounds_end"], + memo_type=parsed["memo_type"], + memo_text=parsed["memo_text"], + memo_id=parsed["memo_id"], + memo_hash=parsed["memo_hash"], + num_operations=parsed["num_operations"] + )) + if resp.__class__.__name__ != "StellarTxOpRequest": + raise CallException("Unexpected response to transaction") + + for opIdx in range(0, parsed["num_operations"]): + op = parsed["operations"][opIdx] + resp = None + + # Create account + if op["type"] == 0: + resp = self.call(proto.StellarCreateAccountOp( + source_account=op["source_account"], + new_account=op["new_account"], + starting_balance=op["starting_balance"] + )) + # Payment + if op["type"] == 1: + asset = types.StellarAssetType(type=op["asset"]["type"], code=op["asset"]["code"], + issuer=op["asset"]["issuer"]) + resp = self.call(proto.StellarPaymentOp( + source_account=op["source_account"], + destination_account=op["destination_account"], + amount=op["amount"], + asset=asset + )) + # Path Payment + if op["type"] == 2: + destination_asset = types.StellarAssetType(type=op["destination_asset"]["type"], + code=op["destination_asset"]["code"], + issuer=op["destination_asset"]["issuer"]) + resp = self.call(proto.StellarPathPaymentOp( + source_account=op["source_account"], + send_max=op["send_max"], + destination_account=op["destination_account"], + destination_asset=destination_asset, + destination_amount=op["destination_amount"], + paths=op["paths"] + )) + # Manage Offer + if op["type"] == 3: + selling_asset = types.StellarAssetType(type=op["selling_asset"]["type"], + code=op["selling_asset"]["code"], + issuer=op["selling_asset"]["issuer"]) + buying_asset = types.StellarAssetType(type=op["buying_asset"]["type"], code=op["buying_asset"]["code"], + issuer=op["buying_asset"]["issuer"]) + resp = self.call(proto.StellarManageOfferOp( + source_account=op["source_account"], + selling_asset=selling_asset, + buying_asset=buying_asset, + amount=op["amount"], + price_n=op["price_n"], + price_d=op["price_d"], + offer_id=op["offer_id"] + )) + # Passive Offer + if op["type"] == 4: + selling_asset = types.StellarAssetType(type=op["selling_asset"]["type"], + code=op["selling_asset"]["code"], + issuer=op["selling_asset"]["issuer"]) + buying_asset = types.StellarAssetType(type=op["buying_asset"]["type"], code=op["buying_asset"]["code"], + issuer=op["buying_asset"]["issuer"]) + resp = self.call(proto.StellarCreatePassiveOfferOp( + source_account=op["source_account"], + selling_asset=selling_asset, + buying_asset=buying_asset, + amount=op["amount"], + price_n=op["price_n"], + price_d=op["price_d"] + )) + # Set Options + if op["type"] == 5: + resp = self.call(proto.StellarSetOptionsOp( + source_account=op["source_account"], + inflation_destination_account=op["inflation_destination"], + clear_flags=op["clear_flags"], + set_flags=op["set_flags"], + master_weight=op["master_weight"], + low_threshold=op["low_threshold"], + medium_threshold=op["medium_threshold"], + high_threshold=op["high_threshold"], + home_domain=op["home_domain"], + signer_type=op["signer_type"], + signer_key=op["signer_key"], + signer_weight=op["signer_weight"], + )) + # Change Trust + if op["type"] == 6: + asset = types.StellarAssetType(type=op["asset"]["type"], code=op["asset"]["code"], + issuer=op["asset"]["issuer"]) + resp = self.call(proto.StellarChangeTrustOp( + source_account=op["source_account"], + limit=op["limit"], + asset=asset + )) + # Allow Trust + if op["type"] == 7: + resp = self.call(proto.StellarAllowTrustOp( + source_account=op["source_account"], + trusted_account=op["trusted_account"], + asset_type=op["asset_type"], + asset_code=op["asset_code"], + is_authorized=op["is_authorized"] + )) + # Merge Account + if op["type"] == 8: + resp = self.call(proto.StellarAccountMergeOp( + source_account=op["source_account"], + destination_account=op["destination_account"] + )) + # Manage data + if op["type"] == 10: + resp = self.call(proto.StellarManageDataOp( + source_account=op["source_account"], + key=op["key"], + value=op["value"] + )) + # Merge Account + if op["type"] == 11: + resp = self.call(proto.StellarBumpSequenceOp( + source_account=op["source_account"], + bump_to=op["bump_to"] + )) + + # Exit if the response was a StellarSignedTx + if resp.__class__.__name__ == "StellarSignedTx": + return resp + + raise CallException("Reached end of operations without a signature") + + @expect(proto.StellarMessageSignature) + def stellar_sign_message(self, address_n, message): + return self.call(proto.StellarSignMessage(address_n=address_n, message=message)) + + def stellar_verify_message(self, pubkey_bytes, signature, message): + resp = self.call(proto.StellarVerifyMessage(public_key=pubkey_bytes, message=message, signature=signature)) + + if isinstance(resp, proto.Success): + return True + return False + class TrezorClient(ProtocolMixin, TextUIMixin, BaseClient): pass diff --git a/trezorlib/stellar.py b/trezorlib/stellar.py new file mode 100644 index 000000000..b369ff5c4 --- /dev/null +++ b/trezorlib/stellar.py @@ -0,0 +1,290 @@ +import base64 +import struct +import binascii +import xdrlib + +def expand_path_or_default(client, address): + """Uses client to parse address and returns an array of integers + If no address is specified, the default of m/44'/148'/0' is used + """ + if address: + return client.expand_path(address) + else: + return client.expand_path("m/44'/148'/0'") + + +def address_from_public_key(pk_bytes): + """Returns the base32-encoded version of pk_bytes (G...) + """ + final_bytes = bytearray() + + # version + final_bytes.append(6 << 3) + # public key + final_bytes.extend(pk_bytes) + # checksum + final_bytes.extend(struct.pack(" max_timebound or parsed["timebounds_start"] < 0: + raise ValueError("Starting timebound out of range (must be between 0 and " + max_timebound) + if parsed["timebounds_end"] > max_timebound or parsed["timebounds_end"] < 0: + raise ValueError("Ending timebound out of range (must be between 0 and " + max_timebound) + + # memo type determines what optional fields are set + parsed["memo_type"] = unpacker.unpack_uint() + parsed["memo_text"] = None + parsed["memo_id"] = None + parsed["memo_hash"] = None + + # text + if parsed["memo_type"] == 1: + parsed["memo_text"] = unpacker.unpack_string() + # id (64-bit uint) + if parsed["memo_type"] == 2: + parsed["memo_id"] = unpacker.unpack_uhyper() + # hash / return are the same structure (32 bytes representing a hash) + if parsed["memo_type"] == 3 or parsed["memo_type"] == 4: + parsed["memo+hash"] = unpacker.unpack_fopaque(32) + + parsed["num_operations"] = unpacker.unpack_uint() + + for opIdx in range(0, parsed["num_operations"]): + parsed["operations"].append(_parse_operation_bytes(unpacker)) + + return parsed + +def _parse_operation_bytes(unpacker): + """Returns a dictionary describing the next operation as read from + the byte stream in unpacker + """ + op = { + "source_account": None, + "type": None + } + + has_source_account = unpacker.unpack_bool() + if has_source_account: + op["source_account"] = unpacker.unpack_fopaque(32) + + op["type"] = unpacker.unpack_uint() + + # see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L16 + if op["type"] == 0: + op["new_account"] = _xdr_read_address(unpacker) + op["starting_balance"] = unpacker.unpack_hyper() + + # see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L54 + if op["type"] == 1: + op["destination_account"] = _xdr_read_address(unpacker) + op["asset"] = _xdr_read_asset(unpacker) + op["amount"] = unpacker.unpack_hyper() + + # see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L72 + if op["type"] == 2: + op["send_asset"] = _xdr_read_asset(unpacker) + op["send_max"] = unpacker.unpack_hyper() + op["destination_account"] = _xdr_read_address(unpacker) + op["destination_asset"] = _xdr_read_asset(unpacker) + op["destination_amount"] = unpacker.unpack_hyper() + op["paths"] = [] + + num_paths = unpacker.unpack_uint() + for i in range(0, num_paths): + op["paths"].append(_xdr_read_asset(unpacker)) + + # see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L93 + if op["type"] == 3: + op["selling_asset"] = _xdr_read_asset(unpacker) + op["buying_asset"] = _xdr_read_asset(unpacker) + op["amount"] = unpacker.unpack_hyper() + op["price_n"] = unpacker.unpack_uint() + op["price_d"] = unpacker.unpack_uint() + op["offer_id"] = unpacker.unpack_uhyper() + + # see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L111 + if op["type"] == 4: + op["selling_asset"] = _xdr_read_asset(unpacker) + op["buying_asset"] = _xdr_read_asset(unpacker) + op["amount"] = unpacker.unpack_hyper() + op["price_n"] = unpacker.unpack_uint() + op["price_d"] = unpacker.unpack_uint() + + # see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L129 + if op["type"] == 5: + op["inflation_destination"] = None + op["clear_flags"] = None + op["set_flags"] = None + op["master_weight"] = None + op["low_threshold"] = None + op["medium_threshold"] = None + op["high_threshold"] = None + op["home_domain"] = None + op["signer_type"] = None + op["signer_key"] = None + op["signer_weight"] = None + + op["has_inflation_destination"] = unpacker.unpack_bool() + if op["has_inflation_destination"]: + op["inflation_destination"] = _xdr_read_address(unpacker) + + op["has_clear_flags"] = unpacker.unpack_bool() + if op["has_clear_flags"]: + op["clear_flags"] = unpacker.unpack_uint() + + op["has_set_flags"] = unpacker.unpack_bool() + if op["has_set_flags"]: + op["set_flags"] = unpacker.unpack_uint() + + op["has_master_weight"] = unpacker.unpack_bool() + if op["has_master_weight"]: + op["master_weight"] = unpacker.unpack_uint() + + op["has_low_threshold"] = unpacker.unpack_bool() + if op["has_low_threshold"]: + op["low_threshold"] = unpacker.unpack_uint() + + op["has_medium_threshold"] = unpacker.unpack_bool() + if op["has_medium_threshold"]: + op["medium_threshold"] = unpacker.unpack_uint() + + op["has_high_threshold"] = unpacker.unpack_bool() + if op["has_high_threshold"]: + op["high_threshold"] = unpacker.unpack_uint() + + op["has_home_domain"] = unpacker.unpack_bool() + if op["has_home_domain"]: + op["home_domain"] = unpacker.unpack_string() + + op["has_signer"] = unpacker.unpack_bool() + if op["has_signer"]: + op["signer_type"] = unpacker.unpack_uint() + op["signer_key"] = unpacker.unpack_fopaque(32) + op["signer_weight"] = unpacker.unpack_uint() + + # Change Trust + # see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L156 + if op["type"] == 6: + op["asset"] = _xdr_read_asset(unpacker) + op["limit"] = unpacker.unpack_uhyper() + + # Allow Trust + # see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L173 + if op["type"] == 7: + op["trusted_account"] = _xdr_read_address(unpacker) + op["asset_type"] = unpacker.unpack_uint() + + if op["asset_type"] == 1: + op["asset_code"] = unpacker.unpack_fstring(4) + if op["asset_type"] == 2: + op["asset_code"] = unpacker.unpack_fstring(12) + + op["is_authorized"] = unpacker.unpack_bool() + + # Merge Account + # see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L251 + if op["type"] == 8: + op["destination_account"] = _xdr_read_address(unpacker) + + # Inflation is not implemented since any account can send this operation + + # Manage Data + # see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L218 + if op["type"] == 10: + op["key"] = unpacker.unpack_string() + + op["value"] = None + op["has_value"] = unpacker.unpack_bool() + if op["has_value"]: + op["value"] = unpacker.unpack_opaque() + + # Bump Sequence + # see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L269 + if op["type"] == 11: + op["bump_to"] = unpacker.unpack_uhyper() + + return op + +def _xdr_read_asset(unpacker): + """Reads a stellar Asset from unpacker""" + asset = { + "type": unpacker.unpack_uint(), + "code": None, + "issuer": None + } + + # alphanum 4 + if asset["type"] == 1: + asset["code"] = unpacker.unpack_fstring(4) + asset["issuer"] = _xdr_read_address(unpacker) + + if asset["type"] == 2: + asset["code"] = unpacker.unpack_fstring(12) + asset["issuer"] = _xdr_read_address(unpacker) + + return asset + + +def _xdr_read_address(unpacker): + """Reads a stellar address and returns the 32-byte + data representing the address + """ + # First 4 bytes are the address type + address_type = unpacker.unpack_uint() + if address_type != 0: + raise ValueError("Unsupported address type") + + return unpacker.unpack_fopaque(32) + +def _crc16_checksum(bytes): + """Returns the CRC-16 checksum of bytearray bytes + + Ported from Java implementation at: http://introcs.cs.princeton.edu/java/61data/CRC16CCITT.java.html + + Initial value changed to 0x0000 to match Stellar configuration. + """ + crc = 0x0000 + polynomial = 0x1021 + + for byte in bytes: + for i in range(0, 8): + bit = ((byte >> (7 - i) & 1) == 1) + c15 = ((crc >> 15 & 1) == 1) + crc <<= 1 + if c15 ^ bit: + crc ^= polynomial + + return crc & 0xffff \ No newline at end of file