mirror of
https://github.com/trezor/trezor-firmware.git
synced 2024-12-23 06:48:16 +00:00
add Stellar support
This commit is contained in:
parent
0902aefd62
commit
1d722f838a
60
trezorctl
60
trezorctl
@ -27,6 +27,7 @@ import functools
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import trezorlib.stellar as stellar
|
||||
|
||||
from trezorlib.client import TrezorClient, TrezorClientVerbose, CallException, format_protobuf
|
||||
from trezorlib.transport import get_transport, enumerate_devices, TransportException
|
||||
@ -865,6 +866,65 @@ def cosi_sign(connect, address, data, global_commitment, global_pubkey):
|
||||
return client.cosi_sign(address_n, binascii.unhexlify(data), binascii.unhexlify(global_commitment), binascii.unhexlify(global_pubkey))
|
||||
|
||||
|
||||
#
|
||||
# Stellar functions
|
||||
#
|
||||
@cli.command(help='Get Stellar public address')
|
||||
@click.option('-n', '--address', required=False, help="BIP32 path. Default primary account is m/44'/148'/0'. Always use hardened paths and the m/44'/148'/ prefix")
|
||||
@click.pass_obj
|
||||
def stellar_get_address(connect, address):
|
||||
client = connect()
|
||||
address_n = stellar.expand_path_or_default(client, address)
|
||||
# StellarPublicKey response
|
||||
response = client.stellar_get_public_key(address_n)
|
||||
return stellar.address_from_public_key(response.public_key)
|
||||
|
||||
@cli.command(help='Sign a string with a Stellar key')
|
||||
@click.option('-n', '--address', required=False, help="BIP32 path. Default primary account is m/44'/148'/0'. Always use hardened paths and the m/44'/148'/ prefix")
|
||||
@click.argument('message')
|
||||
@click.pass_obj
|
||||
def stellar_sign_message(connect, address, message):
|
||||
client = connect()
|
||||
address_n = stellar.expand_path_or_default(client, address)
|
||||
response = client.stellar_sign_message(address_n, message)
|
||||
return base64.b64encode(response.signature)
|
||||
|
||||
@cli.command(help='Verify that a signature is valid')
|
||||
@click.option('--message-is-b64/--no-message-is-b64', default=False, required=False, help="If set, the message argument will be interpreted as a base64-encoded string")
|
||||
@click.argument('address')
|
||||
@click.argument('signatureb64')
|
||||
@click.argument('message')
|
||||
@click.pass_obj
|
||||
def stellar_verify_message(connect, address, message_is_b64, signatureb64, message):
|
||||
if message_is_b64:
|
||||
message = base64.b64decode(message)
|
||||
else:
|
||||
message = message.encode('utf-8')
|
||||
|
||||
pubkey_bytes = stellar.address_to_public_key(address)
|
||||
|
||||
client = connect()
|
||||
is_verified = client.stellar_verify_message(pubkey_bytes, base64.b64decode(signatureb64), message)
|
||||
|
||||
if is_verified:
|
||||
return "Success: message verified"
|
||||
else:
|
||||
print("ERROR: invalid signature, verification failed")
|
||||
sys.exit(1)
|
||||
|
||||
@cli.command(help='Sign a base64-encoded transaction envelope')
|
||||
@click.option('-n', '--address', required=False, help="BIP32 path. Default primary account is m/44'/148'/0'. Always use hardened paths and the m/44'/148'/ prefix")
|
||||
@click.option('-n', '--network-passphrase', required=False, help="Network passphrase (blank for public network). Testnet is: 'Test SDF Network ; September 2015'")
|
||||
@click.argument('b64envelope')
|
||||
@click.pass_obj
|
||||
def stellar_sign_transaction(connect, b64envelope, address, network_passphrase):
|
||||
client = connect()
|
||||
address_n = stellar.expand_path_or_default(client, address)
|
||||
# raw signature bytes
|
||||
resp = client.stellar_sign_transaction(base64.b64decode(b64envelope), address_n, network_passphrase)
|
||||
|
||||
return base64.b64encode(resp.signature)
|
||||
|
||||
#
|
||||
# Main
|
||||
#
|
||||
|
@ -37,6 +37,7 @@ from . import nem
|
||||
from .coins import coins_slip44
|
||||
from .debuglink import DebugLink
|
||||
from .protobuf import MessageType
|
||||
from . import stellar as stellar
|
||||
|
||||
|
||||
if sys.version_info.major < 3:
|
||||
@ -1100,6 +1101,172 @@ class ProtocolMixin(object):
|
||||
|
||||
return self.call(proto.SelfTest(payload=b'\x00\xFF\x55\xAA\x66\x99\x33\xCCABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!\x00\xFF\x55\xAA\x66\x99\x33\xCC'))
|
||||
|
||||
@expect(proto.StellarPublicKey)
|
||||
def stellar_get_public_key(self, address_n):
|
||||
return self.call(proto.StellarGetPublicKey(address_n=address_n))
|
||||
|
||||
def stellar_sign_transaction(self, txEnvelope, address_n, networkPassphrase=None):
|
||||
# default networkPassphrase to the public network
|
||||
if networkPassphrase is None:
|
||||
networkPassphrase = "Public Global Stellar Network ; September 2015"
|
||||
|
||||
parsed = stellar.parse_transaction_bytes(txEnvelope)
|
||||
|
||||
# Will return a StellarTxOpRequest
|
||||
resp = self.call(proto.StellarSignTx(
|
||||
protocol_version=parsed["protocol_version"],
|
||||
address_n=address_n,
|
||||
network_passphrase=networkPassphrase,
|
||||
source_account=parsed["source_account"],
|
||||
fee=parsed["fee"],
|
||||
sequence_number=parsed["sequence_number"],
|
||||
timebounds_start=parsed["timebounds_start"],
|
||||
timebounds_end=parsed["timebounds_end"],
|
||||
memo_type=parsed["memo_type"],
|
||||
memo_text=parsed["memo_text"],
|
||||
memo_id=parsed["memo_id"],
|
||||
memo_hash=parsed["memo_hash"],
|
||||
num_operations=parsed["num_operations"]
|
||||
))
|
||||
if resp.__class__.__name__ != "StellarTxOpRequest":
|
||||
raise CallException("Unexpected response to transaction")
|
||||
|
||||
for opIdx in range(0, parsed["num_operations"]):
|
||||
op = parsed["operations"][opIdx]
|
||||
resp = None
|
||||
|
||||
# Create account
|
||||
if op["type"] == 0:
|
||||
resp = self.call(proto.StellarCreateAccountOp(
|
||||
source_account=op["source_account"],
|
||||
new_account=op["new_account"],
|
||||
starting_balance=op["starting_balance"]
|
||||
))
|
||||
# Payment
|
||||
if op["type"] == 1:
|
||||
asset = types.StellarAssetType(type=op["asset"]["type"], code=op["asset"]["code"],
|
||||
issuer=op["asset"]["issuer"])
|
||||
resp = self.call(proto.StellarPaymentOp(
|
||||
source_account=op["source_account"],
|
||||
destination_account=op["destination_account"],
|
||||
amount=op["amount"],
|
||||
asset=asset
|
||||
))
|
||||
# Path Payment
|
||||
if op["type"] == 2:
|
||||
destination_asset = types.StellarAssetType(type=op["destination_asset"]["type"],
|
||||
code=op["destination_asset"]["code"],
|
||||
issuer=op["destination_asset"]["issuer"])
|
||||
resp = self.call(proto.StellarPathPaymentOp(
|
||||
source_account=op["source_account"],
|
||||
send_max=op["send_max"],
|
||||
destination_account=op["destination_account"],
|
||||
destination_asset=destination_asset,
|
||||
destination_amount=op["destination_amount"],
|
||||
paths=op["paths"]
|
||||
))
|
||||
# Manage Offer
|
||||
if op["type"] == 3:
|
||||
selling_asset = types.StellarAssetType(type=op["selling_asset"]["type"],
|
||||
code=op["selling_asset"]["code"],
|
||||
issuer=op["selling_asset"]["issuer"])
|
||||
buying_asset = types.StellarAssetType(type=op["buying_asset"]["type"], code=op["buying_asset"]["code"],
|
||||
issuer=op["buying_asset"]["issuer"])
|
||||
resp = self.call(proto.StellarManageOfferOp(
|
||||
source_account=op["source_account"],
|
||||
selling_asset=selling_asset,
|
||||
buying_asset=buying_asset,
|
||||
amount=op["amount"],
|
||||
price_n=op["price_n"],
|
||||
price_d=op["price_d"],
|
||||
offer_id=op["offer_id"]
|
||||
))
|
||||
# Passive Offer
|
||||
if op["type"] == 4:
|
||||
selling_asset = types.StellarAssetType(type=op["selling_asset"]["type"],
|
||||
code=op["selling_asset"]["code"],
|
||||
issuer=op["selling_asset"]["issuer"])
|
||||
buying_asset = types.StellarAssetType(type=op["buying_asset"]["type"], code=op["buying_asset"]["code"],
|
||||
issuer=op["buying_asset"]["issuer"])
|
||||
resp = self.call(proto.StellarCreatePassiveOfferOp(
|
||||
source_account=op["source_account"],
|
||||
selling_asset=selling_asset,
|
||||
buying_asset=buying_asset,
|
||||
amount=op["amount"],
|
||||
price_n=op["price_n"],
|
||||
price_d=op["price_d"]
|
||||
))
|
||||
# Set Options
|
||||
if op["type"] == 5:
|
||||
resp = self.call(proto.StellarSetOptionsOp(
|
||||
source_account=op["source_account"],
|
||||
inflation_destination_account=op["inflation_destination"],
|
||||
clear_flags=op["clear_flags"],
|
||||
set_flags=op["set_flags"],
|
||||
master_weight=op["master_weight"],
|
||||
low_threshold=op["low_threshold"],
|
||||
medium_threshold=op["medium_threshold"],
|
||||
high_threshold=op["high_threshold"],
|
||||
home_domain=op["home_domain"],
|
||||
signer_type=op["signer_type"],
|
||||
signer_key=op["signer_key"],
|
||||
signer_weight=op["signer_weight"],
|
||||
))
|
||||
# Change Trust
|
||||
if op["type"] == 6:
|
||||
asset = types.StellarAssetType(type=op["asset"]["type"], code=op["asset"]["code"],
|
||||
issuer=op["asset"]["issuer"])
|
||||
resp = self.call(proto.StellarChangeTrustOp(
|
||||
source_account=op["source_account"],
|
||||
limit=op["limit"],
|
||||
asset=asset
|
||||
))
|
||||
# Allow Trust
|
||||
if op["type"] == 7:
|
||||
resp = self.call(proto.StellarAllowTrustOp(
|
||||
source_account=op["source_account"],
|
||||
trusted_account=op["trusted_account"],
|
||||
asset_type=op["asset_type"],
|
||||
asset_code=op["asset_code"],
|
||||
is_authorized=op["is_authorized"]
|
||||
))
|
||||
# Merge Account
|
||||
if op["type"] == 8:
|
||||
resp = self.call(proto.StellarAccountMergeOp(
|
||||
source_account=op["source_account"],
|
||||
destination_account=op["destination_account"]
|
||||
))
|
||||
# Manage data
|
||||
if op["type"] == 10:
|
||||
resp = self.call(proto.StellarManageDataOp(
|
||||
source_account=op["source_account"],
|
||||
key=op["key"],
|
||||
value=op["value"]
|
||||
))
|
||||
# Merge Account
|
||||
if op["type"] == 11:
|
||||
resp = self.call(proto.StellarBumpSequenceOp(
|
||||
source_account=op["source_account"],
|
||||
bump_to=op["bump_to"]
|
||||
))
|
||||
|
||||
# Exit if the response was a StellarSignedTx
|
||||
if resp.__class__.__name__ == "StellarSignedTx":
|
||||
return resp
|
||||
|
||||
raise CallException("Reached end of operations without a signature")
|
||||
|
||||
@expect(proto.StellarMessageSignature)
|
||||
def stellar_sign_message(self, address_n, message):
|
||||
return self.call(proto.StellarSignMessage(address_n=address_n, message=message))
|
||||
|
||||
def stellar_verify_message(self, pubkey_bytes, signature, message):
|
||||
resp = self.call(proto.StellarVerifyMessage(public_key=pubkey_bytes, message=message, signature=signature))
|
||||
|
||||
if isinstance(resp, proto.Success):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class TrezorClient(ProtocolMixin, TextUIMixin, BaseClient):
|
||||
pass
|
||||
|
290
trezorlib/stellar.py
Normal file
290
trezorlib/stellar.py
Normal file
@ -0,0 +1,290 @@
|
||||
import base64
|
||||
import struct
|
||||
import binascii
|
||||
import xdrlib
|
||||
|
||||
def expand_path_or_default(client, address):
|
||||
"""Uses client to parse address and returns an array of integers
|
||||
If no address is specified, the default of m/44'/148'/0' is used
|
||||
"""
|
||||
if address:
|
||||
return client.expand_path(address)
|
||||
else:
|
||||
return client.expand_path("m/44'/148'/0'")
|
||||
|
||||
|
||||
def address_from_public_key(pk_bytes):
|
||||
"""Returns the base32-encoded version of pk_bytes (G...)
|
||||
"""
|
||||
final_bytes = bytearray()
|
||||
|
||||
# version
|
||||
final_bytes.append(6 << 3)
|
||||
# public key
|
||||
final_bytes.extend(pk_bytes)
|
||||
# checksum
|
||||
final_bytes.extend(struct.pack("<H", _crc16_checksum(final_bytes)))
|
||||
|
||||
return base64.b32encode(final_bytes)
|
||||
|
||||
def address_to_public_key(address_str):
|
||||
"""Returns the raw 32 bytes representing a public key by extracting
|
||||
it from the G... string
|
||||
"""
|
||||
final_bytes = bytearray()
|
||||
decoded = base64.b32decode(address_str)
|
||||
|
||||
# skip 0th byte (version) and last two bytes (checksum)
|
||||
return decoded[1:-2]
|
||||
|
||||
|
||||
def parse_transaction_bytes(bytes):
|
||||
"""Parses base64data into a StellarSignTx message
|
||||
"""
|
||||
parsed = {}
|
||||
parsed["protocol_version"] = 1
|
||||
parsed["operations"] = []
|
||||
unpacker = xdrlib.Unpacker(bytes)
|
||||
|
||||
parsed["source_account"] = _xdr_read_address(unpacker)
|
||||
parsed["fee"] = unpacker.unpack_uint()
|
||||
parsed["sequence_number"] = unpacker.unpack_uhyper()
|
||||
|
||||
# Timebounds is an optional field
|
||||
parsed["timebounds_start"] = 0
|
||||
parsed["timebounds_end"] = 0
|
||||
has_timebounds = unpacker.unpack_bool()
|
||||
if has_timebounds:
|
||||
max_timebound = 2**32-1 # max unsigned 32-bit int (trezor does not support the full 64-bit time value)
|
||||
parsed["timebounds_start"] = unpacker.unpack_uhyper()
|
||||
parsed["timebounds_end"] = unpacker.unpack_uhyper()
|
||||
|
||||
if parsed["timebounds_start"] > max_timebound or parsed["timebounds_start"] < 0:
|
||||
raise ValueError("Starting timebound out of range (must be between 0 and " + max_timebound)
|
||||
if parsed["timebounds_end"] > max_timebound or parsed["timebounds_end"] < 0:
|
||||
raise ValueError("Ending timebound out of range (must be between 0 and " + max_timebound)
|
||||
|
||||
# memo type determines what optional fields are set
|
||||
parsed["memo_type"] = unpacker.unpack_uint()
|
||||
parsed["memo_text"] = None
|
||||
parsed["memo_id"] = None
|
||||
parsed["memo_hash"] = None
|
||||
|
||||
# text
|
||||
if parsed["memo_type"] == 1:
|
||||
parsed["memo_text"] = unpacker.unpack_string()
|
||||
# id (64-bit uint)
|
||||
if parsed["memo_type"] == 2:
|
||||
parsed["memo_id"] = unpacker.unpack_uhyper()
|
||||
# hash / return are the same structure (32 bytes representing a hash)
|
||||
if parsed["memo_type"] == 3 or parsed["memo_type"] == 4:
|
||||
parsed["memo+hash"] = unpacker.unpack_fopaque(32)
|
||||
|
||||
parsed["num_operations"] = unpacker.unpack_uint()
|
||||
|
||||
for opIdx in range(0, parsed["num_operations"]):
|
||||
parsed["operations"].append(_parse_operation_bytes(unpacker))
|
||||
|
||||
return parsed
|
||||
|
||||
def _parse_operation_bytes(unpacker):
|
||||
"""Returns a dictionary describing the next operation as read from
|
||||
the byte stream in unpacker
|
||||
"""
|
||||
op = {
|
||||
"source_account": None,
|
||||
"type": None
|
||||
}
|
||||
|
||||
has_source_account = unpacker.unpack_bool()
|
||||
if has_source_account:
|
||||
op["source_account"] = unpacker.unpack_fopaque(32)
|
||||
|
||||
op["type"] = unpacker.unpack_uint()
|
||||
|
||||
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L16
|
||||
if op["type"] == 0:
|
||||
op["new_account"] = _xdr_read_address(unpacker)
|
||||
op["starting_balance"] = unpacker.unpack_hyper()
|
||||
|
||||
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L54
|
||||
if op["type"] == 1:
|
||||
op["destination_account"] = _xdr_read_address(unpacker)
|
||||
op["asset"] = _xdr_read_asset(unpacker)
|
||||
op["amount"] = unpacker.unpack_hyper()
|
||||
|
||||
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L72
|
||||
if op["type"] == 2:
|
||||
op["send_asset"] = _xdr_read_asset(unpacker)
|
||||
op["send_max"] = unpacker.unpack_hyper()
|
||||
op["destination_account"] = _xdr_read_address(unpacker)
|
||||
op["destination_asset"] = _xdr_read_asset(unpacker)
|
||||
op["destination_amount"] = unpacker.unpack_hyper()
|
||||
op["paths"] = []
|
||||
|
||||
num_paths = unpacker.unpack_uint()
|
||||
for i in range(0, num_paths):
|
||||
op["paths"].append(_xdr_read_asset(unpacker))
|
||||
|
||||
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L93
|
||||
if op["type"] == 3:
|
||||
op["selling_asset"] = _xdr_read_asset(unpacker)
|
||||
op["buying_asset"] = _xdr_read_asset(unpacker)
|
||||
op["amount"] = unpacker.unpack_hyper()
|
||||
op["price_n"] = unpacker.unpack_uint()
|
||||
op["price_d"] = unpacker.unpack_uint()
|
||||
op["offer_id"] = unpacker.unpack_uhyper()
|
||||
|
||||
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L111
|
||||
if op["type"] == 4:
|
||||
op["selling_asset"] = _xdr_read_asset(unpacker)
|
||||
op["buying_asset"] = _xdr_read_asset(unpacker)
|
||||
op["amount"] = unpacker.unpack_hyper()
|
||||
op["price_n"] = unpacker.unpack_uint()
|
||||
op["price_d"] = unpacker.unpack_uint()
|
||||
|
||||
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L129
|
||||
if op["type"] == 5:
|
||||
op["inflation_destination"] = None
|
||||
op["clear_flags"] = None
|
||||
op["set_flags"] = None
|
||||
op["master_weight"] = None
|
||||
op["low_threshold"] = None
|
||||
op["medium_threshold"] = None
|
||||
op["high_threshold"] = None
|
||||
op["home_domain"] = None
|
||||
op["signer_type"] = None
|
||||
op["signer_key"] = None
|
||||
op["signer_weight"] = None
|
||||
|
||||
op["has_inflation_destination"] = unpacker.unpack_bool()
|
||||
if op["has_inflation_destination"]:
|
||||
op["inflation_destination"] = _xdr_read_address(unpacker)
|
||||
|
||||
op["has_clear_flags"] = unpacker.unpack_bool()
|
||||
if op["has_clear_flags"]:
|
||||
op["clear_flags"] = unpacker.unpack_uint()
|
||||
|
||||
op["has_set_flags"] = unpacker.unpack_bool()
|
||||
if op["has_set_flags"]:
|
||||
op["set_flags"] = unpacker.unpack_uint()
|
||||
|
||||
op["has_master_weight"] = unpacker.unpack_bool()
|
||||
if op["has_master_weight"]:
|
||||
op["master_weight"] = unpacker.unpack_uint()
|
||||
|
||||
op["has_low_threshold"] = unpacker.unpack_bool()
|
||||
if op["has_low_threshold"]:
|
||||
op["low_threshold"] = unpacker.unpack_uint()
|
||||
|
||||
op["has_medium_threshold"] = unpacker.unpack_bool()
|
||||
if op["has_medium_threshold"]:
|
||||
op["medium_threshold"] = unpacker.unpack_uint()
|
||||
|
||||
op["has_high_threshold"] = unpacker.unpack_bool()
|
||||
if op["has_high_threshold"]:
|
||||
op["high_threshold"] = unpacker.unpack_uint()
|
||||
|
||||
op["has_home_domain"] = unpacker.unpack_bool()
|
||||
if op["has_home_domain"]:
|
||||
op["home_domain"] = unpacker.unpack_string()
|
||||
|
||||
op["has_signer"] = unpacker.unpack_bool()
|
||||
if op["has_signer"]:
|
||||
op["signer_type"] = unpacker.unpack_uint()
|
||||
op["signer_key"] = unpacker.unpack_fopaque(32)
|
||||
op["signer_weight"] = unpacker.unpack_uint()
|
||||
|
||||
# Change Trust
|
||||
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L156
|
||||
if op["type"] == 6:
|
||||
op["asset"] = _xdr_read_asset(unpacker)
|
||||
op["limit"] = unpacker.unpack_uhyper()
|
||||
|
||||
# Allow Trust
|
||||
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L173
|
||||
if op["type"] == 7:
|
||||
op["trusted_account"] = _xdr_read_address(unpacker)
|
||||
op["asset_type"] = unpacker.unpack_uint()
|
||||
|
||||
if op["asset_type"] == 1:
|
||||
op["asset_code"] = unpacker.unpack_fstring(4)
|
||||
if op["asset_type"] == 2:
|
||||
op["asset_code"] = unpacker.unpack_fstring(12)
|
||||
|
||||
op["is_authorized"] = unpacker.unpack_bool()
|
||||
|
||||
# Merge Account
|
||||
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L251
|
||||
if op["type"] == 8:
|
||||
op["destination_account"] = _xdr_read_address(unpacker)
|
||||
|
||||
# Inflation is not implemented since any account can send this operation
|
||||
|
||||
# Manage Data
|
||||
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L218
|
||||
if op["type"] == 10:
|
||||
op["key"] = unpacker.unpack_string()
|
||||
|
||||
op["value"] = None
|
||||
op["has_value"] = unpacker.unpack_bool()
|
||||
if op["has_value"]:
|
||||
op["value"] = unpacker.unpack_opaque()
|
||||
|
||||
# Bump Sequence
|
||||
# see: https://github.com/stellar/stellar-core/blob/master/src/xdr/Stellar-transaction.x#L269
|
||||
if op["type"] == 11:
|
||||
op["bump_to"] = unpacker.unpack_uhyper()
|
||||
|
||||
return op
|
||||
|
||||
def _xdr_read_asset(unpacker):
|
||||
"""Reads a stellar Asset from unpacker"""
|
||||
asset = {
|
||||
"type": unpacker.unpack_uint(),
|
||||
"code": None,
|
||||
"issuer": None
|
||||
}
|
||||
|
||||
# alphanum 4
|
||||
if asset["type"] == 1:
|
||||
asset["code"] = unpacker.unpack_fstring(4)
|
||||
asset["issuer"] = _xdr_read_address(unpacker)
|
||||
|
||||
if asset["type"] == 2:
|
||||
asset["code"] = unpacker.unpack_fstring(12)
|
||||
asset["issuer"] = _xdr_read_address(unpacker)
|
||||
|
||||
return asset
|
||||
|
||||
|
||||
def _xdr_read_address(unpacker):
|
||||
"""Reads a stellar address and returns the 32-byte
|
||||
data representing the address
|
||||
"""
|
||||
# First 4 bytes are the address type
|
||||
address_type = unpacker.unpack_uint()
|
||||
if address_type != 0:
|
||||
raise ValueError("Unsupported address type")
|
||||
|
||||
return unpacker.unpack_fopaque(32)
|
||||
|
||||
def _crc16_checksum(bytes):
|
||||
"""Returns the CRC-16 checksum of bytearray bytes
|
||||
|
||||
Ported from Java implementation at: http://introcs.cs.princeton.edu/java/61data/CRC16CCITT.java.html
|
||||
|
||||
Initial value changed to 0x0000 to match Stellar configuration.
|
||||
"""
|
||||
crc = 0x0000
|
||||
polynomial = 0x1021
|
||||
|
||||
for byte in bytes:
|
||||
for i in range(0, 8):
|
||||
bit = ((byte >> (7 - i) & 1) == 1)
|
||||
c15 = ((crc >> 15 & 1) == 1)
|
||||
crc <<= 1
|
||||
if c15 ^ bit:
|
||||
crc ^= polynomial
|
||||
|
||||
return crc & 0xffff
|
Loading…
Reference in New Issue
Block a user