1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-01-13 08:50:56 +00:00

apps: unify common writer logic

This commit is contained in:
Jan Pochyla 2018-08-23 14:39:30 +02:00 committed by Tomas Susanka
parent 2f910839fe
commit f7c1465d57
21 changed files with 504 additions and 391 deletions

View File

@ -0,0 +1,68 @@
def empty_bytearray(preallocate: int) -> bytearray:
"""
Returns bytearray that won't allocate for at least `preallocate` bytes.
Useful in case you want to avoid allocating too often.
"""
b = bytearray(preallocate)
b[:] = bytes()
return b
def write_uint8(w: bytearray, n: int) -> int:
assert 0 <= n <= 0xFF
w.append(n)
return 1
def write_uint32_le(w: bytearray, n: int) -> int:
assert 0 <= n <= 0xFFFFFFFF
w.append(n & 0xFF)
w.append((n >> 8) & 0xFF)
w.append((n >> 16) & 0xFF)
w.append((n >> 24) & 0xFF)
return 4
def write_uint32_be(w: bytearray, n: int) -> int:
assert 0 <= n <= 0xFFFFFFFF
w.append((n >> 24) & 0xFF)
w.append((n >> 16) & 0xFF)
w.append((n >> 8) & 0xFF)
w.append(n & 0xFF)
return 4
def write_uint64_le(w: bytearray, n: int) -> int:
assert 0 <= n <= 0xFFFFFFFFFFFFFFFF
w.append(n & 0xFF)
w.append((n >> 8) & 0xFF)
w.append((n >> 16) & 0xFF)
w.append((n >> 24) & 0xFF)
w.append((n >> 32) & 0xFF)
w.append((n >> 40) & 0xFF)
w.append((n >> 48) & 0xFF)
w.append((n >> 56) & 0xFF)
return 8
def write_uint64_be(w: bytearray, n: int) -> int:
assert 0 <= n <= 0xFFFFFFFFFFFFFFFF
w.append((n >> 56) & 0xFF)
w.append((n >> 48) & 0xFF)
w.append((n >> 40) & 0xFF)
w.append((n >> 32) & 0xFF)
w.append((n >> 24) & 0xFF)
w.append((n >> 16) & 0xFF)
w.append((n >> 8) & 0xFF)
w.append(n & 0xFF)
return 8
def write_bytes(w: bytearray, b: bytes) -> int:
w.extend(b)
return len(b)
def write_bytes_reversed(w: bytearray, b: bytes) -> int:
w.extend(bytes(reversed(b)))
return len(b)

View File

@ -6,26 +6,29 @@ from ..helpers import (
NEM_TRANSACTION_TYPE_MOSAIC_CREATION,
NEM_TRANSACTION_TYPE_MOSAIC_SUPPLY_CHANGE,
)
from ..writers import write_bytes_with_length, write_common, write_uint32, write_uint64
from ..writers import (
serialize_tx_common,
write_bytes_with_len,
write_uint32_le,
write_uint64_le,
)
def serialize_mosaic_creation(
common: NEMTransactionCommon, creation: NEMMosaicCreation, public_key: bytes
):
w = write_common(
common, bytearray(public_key), NEM_TRANSACTION_TYPE_MOSAIC_CREATION
)
w = serialize_tx_common(common, public_key, NEM_TRANSACTION_TYPE_MOSAIC_CREATION)
mosaics_w = bytearray()
write_bytes_with_length(mosaics_w, bytearray(public_key))
identifier_length = (
4 + len(creation.definition.namespace) + 4 + len(creation.definition.mosaic)
)
write_uint32(mosaics_w, identifier_length)
write_bytes_with_length(mosaics_w, bytearray(creation.definition.namespace))
write_bytes_with_length(mosaics_w, bytearray(creation.definition.mosaic))
write_bytes_with_length(mosaics_w, bytearray(creation.definition.description))
write_uint32(mosaics_w, 4) # number of properties
write_bytes_with_len(mosaics_w, public_key)
identifier_w = bytearray()
write_bytes_with_len(identifier_w, creation.definition.namespace.encode())
write_bytes_with_len(identifier_w, creation.definition.mosaic.encode())
write_bytes_with_len(mosaics_w, identifier_w)
write_bytes_with_len(mosaics_w, creation.definition.description.encode())
write_uint32_le(mosaics_w, 4) # number of properties
_write_property(mosaics_w, "divisibility", creation.definition.divisibility)
_write_property(mosaics_w, "initialSupply", creation.definition.supply)
@ -33,37 +36,29 @@ def serialize_mosaic_creation(
_write_property(mosaics_w, "transferable", creation.definition.transferable)
if creation.definition.levy:
levy_identifier_length = (
4
+ len(creation.definition.levy_namespace)
+ 4
+ len(creation.definition.levy_mosaic)
levy_identifier_w = bytearray()
write_bytes_with_len(
levy_identifier_w, creation.definition.levy_namespace.encode()
)
write_uint32(
mosaics_w,
4
+ 4
+ len(creation.definition.levy_address)
+ 4
+ levy_identifier_length
+ 8,
write_bytes_with_len(
levy_identifier_w, creation.definition.levy_mosaic.encode()
)
write_uint32(mosaics_w, creation.definition.levy)
write_bytes_with_length(mosaics_w, bytearray(creation.definition.levy_address))
write_uint32(mosaics_w, levy_identifier_length)
write_bytes_with_length(
mosaics_w, bytearray(creation.definition.levy_namespace)
)
write_bytes_with_length(mosaics_w, bytearray(creation.definition.levy_mosaic))
write_uint64(mosaics_w, creation.definition.fee)
levy_w = bytearray()
write_uint32_le(levy_w, creation.definition.levy)
write_bytes_with_len(levy_w, creation.definition.levy_address.encode())
write_bytes_with_len(levy_w, levy_identifier_w)
write_uint64_le(levy_w, creation.definition.fee)
write_bytes_with_len(mosaics_w, levy_w)
else:
write_uint32(mosaics_w, 0)
write_uint32_le(mosaics_w, 0) # no levy
# write mosaic bytes with length
write_bytes_with_length(w, mosaics_w)
write_bytes_with_len(w, mosaics_w)
write_bytes_with_length(w, bytearray(creation.sink))
write_uint64(w, creation.fee)
write_bytes_with_len(w, creation.sink.encode())
write_uint64_le(w, creation.fee)
return w
@ -71,17 +66,18 @@ def serialize_mosaic_creation(
def serialize_mosaic_supply_change(
common: NEMTransactionCommon, change: NEMMosaicSupplyChange, public_key: bytes
):
w = write_common(
common, bytearray(public_key), NEM_TRANSACTION_TYPE_MOSAIC_SUPPLY_CHANGE
w = serialize_tx_common(
common, public_key, NEM_TRANSACTION_TYPE_MOSAIC_SUPPLY_CHANGE
)
identifier_length = 4 + len(change.namespace) + 4 + len(change.mosaic)
write_uint32(w, identifier_length)
write_bytes_with_length(w, bytearray(change.namespace))
write_bytes_with_length(w, bytearray(change.mosaic))
identifier_w = bytearray()
write_bytes_with_len(identifier_w, change.namespace.encode())
write_bytes_with_len(identifier_w, change.mosaic.encode())
write_uint32(w, change.type)
write_uint64(w, change.delta)
write_bytes_with_len(w, identifier_w)
write_uint32_le(w, change.type)
write_uint64_le(w, change.delta)
return w
@ -98,8 +94,10 @@ def _write_property(w: bytearray, name: str, value):
value = "false"
elif type(value) == int:
value = str(value)
elif type(value) != str:
if type(value) != str:
raise ValueError("Incompatible value type")
write_uint32(w, 4 + len(name) + 4 + len(value))
write_bytes_with_length(w, bytearray(name))
write_bytes_with_length(w, bytearray(value))
name = name.encode()
value = value.encode()
write_uint32_le(w, 4 + len(name) + 4 + len(value))
write_bytes_with_len(w, name)
write_bytes_with_len(w, value)

View File

@ -30,8 +30,8 @@ async def aggregate_modification(
w = serialize.serialize_aggregate_modification(common, aggr, public_key)
for m in aggr.modifications:
serialize.serialize_cosignatory_modification(w, m.type, m.public_key)
serialize.write_cosignatory_modification(w, m.type, m.public_key)
if aggr.relative_change:
serialize.serialize_minimum_cosignatories(w, aggr.relative_change)
serialize.write_minimum_cosignatories(w, aggr.relative_change)
return w

View File

@ -7,12 +7,12 @@ from ..helpers import (
NEM_TRANSACTION_TYPE_MULTISIG,
NEM_TRANSACTION_TYPE_MULTISIG_SIGNATURE,
)
from ..writers import write_bytes_with_length, write_common, write_uint32
from ..writers import serialize_tx_common, write_bytes_with_len, write_uint32_le
def serialize_multisig(common: NEMTransactionCommon, public_key: bytes, inner: bytes):
w = write_common(common, bytearray(public_key), NEM_TRANSACTION_TYPE_MULTISIG)
write_bytes_with_length(w, bytearray(inner))
w = serialize_tx_common(common, public_key, NEM_TRANSACTION_TYPE_MULTISIG)
write_bytes_with_len(w, inner)
return w
@ -22,15 +22,13 @@ def serialize_multisig_signature(
inner: bytes,
address_public_key: bytes,
):
address = nem.compute_address(address_public_key, common.network)
w = write_common(
common, bytearray(public_key), NEM_TRANSACTION_TYPE_MULTISIG_SIGNATURE
)
w = serialize_tx_common(common, public_key, NEM_TRANSACTION_TYPE_MULTISIG_SIGNATURE)
digest = hashlib.sha3_256(inner, keccak=True).digest()
address = nem.compute_address(address_public_key, common.network)
write_uint32(w, 4 + len(digest))
write_bytes_with_length(w, digest)
write_bytes_with_length(w, address)
write_uint32_le(w, 4 + len(digest))
write_bytes_with_len(w, digest)
write_bytes_with_len(w, address)
return w
@ -41,25 +39,22 @@ def serialize_aggregate_modification(
if mod.relative_change:
version = common.network << 24 | 2
w = write_common(
common,
bytearray(public_key),
NEM_TRANSACTION_TYPE_AGGREGATE_MODIFICATION,
version,
w = serialize_tx_common(
common, public_key, NEM_TRANSACTION_TYPE_AGGREGATE_MODIFICATION, version
)
write_uint32(w, len(mod.modifications))
write_uint32_le(w, len(mod.modifications))
return w
def serialize_cosignatory_modification(
w: bytearray, type: int, cosignatory_pubkey: bytes
def write_cosignatory_modification(
w: bytearray, cosignatory_type: int, cosignatory_pubkey: bytes
):
write_uint32(w, 4 + 4 + len(cosignatory_pubkey))
write_uint32(w, type)
write_bytes_with_length(w, bytearray(cosignatory_pubkey))
write_uint32_le(w, 4 + 4 + len(cosignatory_pubkey))
write_uint32_le(w, cosignatory_type)
write_bytes_with_len(w, cosignatory_pubkey)
return w
def serialize_minimum_cosignatories(w: bytearray, relative_change: int):
write_uint32(w, 4)
write_uint32(w, relative_change)
def write_minimum_cosignatories(w: bytearray, relative_change: int):
write_uint32_le(w, 4)
write_uint32_le(w, relative_change)

View File

@ -2,22 +2,27 @@ from trezor.messages.NEMProvisionNamespace import NEMProvisionNamespace
from trezor.messages.NEMTransactionCommon import NEMTransactionCommon
from ..helpers import NEM_TRANSACTION_TYPE_PROVISION_NAMESPACE
from ..writers import write_bytes_with_length, write_common, write_uint32, write_uint64
from ..writers import (
serialize_tx_common,
write_bytes_with_len,
write_uint32_le,
write_uint64_le,
)
def serialize_provision_namespace(
common: NEMTransactionCommon, namespace: NEMProvisionNamespace, public_key: bytes
) -> bytearray:
tx = write_common(
common, bytearray(public_key), NEM_TRANSACTION_TYPE_PROVISION_NAMESPACE
tx = serialize_tx_common(
common, public_key, NEM_TRANSACTION_TYPE_PROVISION_NAMESPACE
)
write_bytes_with_length(tx, bytearray(namespace.sink))
write_uint64(tx, namespace.fee)
write_bytes_with_length(tx, bytearray(namespace.namespace))
write_bytes_with_len(tx, namespace.sink.encode())
write_uint64_le(tx, namespace.fee)
write_bytes_with_len(tx, namespace.namespace.encode())
if namespace.parent:
write_bytes_with_length(tx, bytearray(namespace.parent))
write_bytes_with_len(tx, namespace.parent.encode())
else:
write_uint32(tx, 0xffffffff)
write_uint32_le(tx, 0xffffffff)
return tx

View File

@ -15,8 +15,8 @@ async def sign_tx(ctx, msg: NEMSignTx):
if msg.multisig:
public_key = msg.multisig.signer
await multisig.ask(ctx, msg)
common = msg.multisig
await multisig.ask(ctx, msg)
else:
public_key = seed.remove_ed25519_prefix(node.public_key())
common = msg.transaction

View File

@ -10,7 +10,12 @@ from ..helpers import (
NEM_TRANSACTION_TYPE_IMPORTANCE_TRANSFER,
NEM_TRANSACTION_TYPE_TRANSFER,
)
from ..writers import write_bytes_with_length, write_common, write_uint32, write_uint64
from ..writers import (
serialize_tx_common,
write_bytes_with_len,
write_uint32_le,
write_uint64_le,
)
def serialize_transfer(
@ -20,52 +25,54 @@ def serialize_transfer(
payload: bytes = None,
encrypted: bool = False,
) -> bytearray:
tx = write_common(
tx = serialize_tx_common(
common,
bytearray(public_key),
public_key,
NEM_TRANSACTION_TYPE_TRANSFER,
_get_version(common.network, transfer.mosaics),
)
write_bytes_with_length(tx, bytearray(transfer.recipient))
write_uint64(tx, transfer.amount)
write_bytes_with_len(tx, transfer.recipient.encode())
write_uint64_le(tx, transfer.amount)
if payload:
# payload + payload size (u32) + encryption flag (u32)
write_uint32(tx, len(payload) + 2 * 4)
write_uint32_le(tx, len(payload) + 2 * 4)
if encrypted:
write_uint32(tx, 0x02)
write_uint32_le(tx, 0x02)
else:
write_uint32(tx, 0x01)
write_bytes_with_length(tx, bytearray(payload))
write_uint32_le(tx, 0x01)
write_bytes_with_len(tx, payload)
else:
write_uint32(tx, 0)
write_uint32_le(tx, 0)
if transfer.mosaics:
write_uint32(tx, len(transfer.mosaics))
write_uint32_le(tx, len(transfer.mosaics))
return tx
def serialize_mosaic(w: bytearray, namespace: str, mosaic: str, quantity: int):
identifier_length = 4 + len(namespace) + 4 + len(mosaic)
# indentifier length (u32) + quantity (u64) + identifier size
write_uint32(w, 4 + 8 + identifier_length)
write_uint32(w, identifier_length)
write_bytes_with_length(w, bytearray(namespace))
write_bytes_with_length(w, bytearray(mosaic))
write_uint64(w, quantity)
identifier_w = bytearray()
write_bytes_with_len(identifier_w, namespace.encode())
write_bytes_with_len(identifier_w, mosaic.encode())
mosaic_w = bytearray()
write_bytes_with_len(mosaic_w, identifier_w)
write_uint64_le(mosaic_w, quantity)
write_bytes_with_len(w, mosaic_w)
def serialize_importance_transfer(
common: NEMTransactionCommon, imp: NEMImportanceTransfer, public_key: bytes
) -> bytearray:
w = write_common(
common, bytearray(public_key), NEM_TRANSACTION_TYPE_IMPORTANCE_TRANSFER
w = serialize_tx_common(
common, public_key, NEM_TRANSACTION_TYPE_IMPORTANCE_TRANSFER
)
write_uint32(w, imp.mode)
write_bytes_with_length(w, bytearray(imp.public_key))
write_uint32_le(w, imp.mode)
write_bytes_with_len(w, imp.public_key)
return w
@ -109,8 +116,8 @@ def are_mosaics_equal(a: NEMMosaic, b: NEMMosaic) -> bool:
def merge_mosaics(mosaics: list) -> list:
if not mosaics:
return list()
ret = list()
return []
ret = []
for i in mosaics:
found = False
for k, y in enumerate(ret):

View File

@ -1,49 +1,29 @@
from trezor.messages.NEMTransactionCommon import NEMTransactionCommon
def write_uint32(w, n: int):
w.append(n & 0xFF)
w.append((n >> 8) & 0xFF)
w.append((n >> 16) & 0xFF)
w.append((n >> 24) & 0xFF)
from apps.common.writers import write_bytes, write_uint32_le, write_uint64_le
def write_uint64(w, n: int):
w.append(n & 0xFF)
w.append((n >> 8) & 0xFF)
w.append((n >> 16) & 0xFF)
w.append((n >> 24) & 0xFF)
w.append((n >> 32) & 0xFF)
w.append((n >> 40) & 0xFF)
w.append((n >> 48) & 0xFF)
w.append((n >> 56) & 0xFF)
def write_bytes(w, buf: bytearray):
w.extend(buf)
def write_bytes_with_length(w, buf: bytearray):
write_uint32(w, len(buf))
write_bytes(w, buf)
def write_common(
def serialize_tx_common(
common: NEMTransactionCommon,
public_key: bytearray,
transaction_type: int,
version: int = None,
) -> bytearray:
ret = bytearray()
w = bytearray()
write_uint32(ret, transaction_type)
write_uint32_le(w, transaction_type)
if version is None:
version = common.network << 24 | 1
write_uint32(ret, version)
write_uint32(ret, common.timestamp)
write_uint32_le(w, version)
write_uint32_le(w, common.timestamp)
write_bytes_with_length(ret, public_key)
write_uint64(ret, common.fee)
write_uint32(ret, common.deadline)
write_bytes_with_len(w, public_key)
write_uint64_le(w, common.fee)
write_uint32_le(w, common.deadline)
return ret
return w
def write_bytes_with_len(w, buf: bytes):
write_uint32_le(w, len(buf))
write_bytes(w, buf)

View File

@ -7,7 +7,9 @@
# the other four the record type (amount, fee, destination..) and then
# the actual data follow. This currently only supports the Payment
# transaction type and the fields that are required for it.
#
from micropython import const
from trezor.messages.RippleSignTx import RippleSignTx
from . import helpers
@ -78,44 +80,50 @@ def write_type(w: bytearray, field: dict):
def serialize_amount(value: int) -> bytearray:
if value < 0 or isinstance(value, float):
raise ValueError("Only positive integers are supported")
if value > 100000000000: # max allowed value
raise ValueError("Value is larger than 100000000000")
MAX_ALLOWED_AMOUNT = const(100000000000)
if value < 0:
raise ValueError("Only non-negative integers are supported")
if value > MAX_ALLOWED_AMOUNT:
raise ValueError("Value is too large")
b = bytearray(value.to_bytes(8, "big"))
# Clear first bit to indicate XRP
b[0] &= 0x7f
# Set second bit to indicate positive number
b[0] |= 0x40
b[0] &= 0x7f # clear first bit to indicate XRP
b[0] |= 0x40 # set second bit to indicate positive number
return b
def write_bytes(w: bytearray, value: bytes):
"""Serialize a variable length bytes."""
serialize_varint(w, len(value))
write_varint(w, len(value))
w.extend(value)
def serialize_varint(w, val):
"""https://ripple.com/wiki/Binary_Format#Variable_Length_Data_Encoding"""
def rshift(val, n):
# http://stackoverflow.com/a/5833119/15677
return (val % 0x100000000) >> n
assert val >= 0
b = bytearray()
if val < 192:
b.append(val)
def write_varint(w: bytearray, val: int):
"""
Implements variable-length int encoding from Ripple.
See: https://ripple.com/wiki/Binary_Format#Variable_Length_Data_Encoding
"""
if val < 0:
raise ValueError("Only non-negative integers are supported")
elif val < 192:
w.append(val)
elif val <= 12480:
val -= 193
b.extend([193 + rshift(val, 8), val & 0xff])
w.append(193 + rshift(val, 8))
w.append(val & 0xff)
elif val <= 918744:
val -= 12481
b.extend([241 + rshift(val, 16), rshift(val, 8) & 0xff, val & 0xff])
w.append(241 + rshift(val, 16))
w.append(rshift(val, 8) & 0xff)
w.append(val & 0xff)
else:
raise ValueError("Variable integer overflow.")
raise ValueError("Value is too large")
w.extend(b)
def rshift(val, n):
"""
Implements signed right-shift.
See: http://stackoverflow.com/a/5833119/15677
"""
return (val % 0x100000000) >> n

View File

@ -2,43 +2,43 @@ from apps.stellar import consts, writers
from apps.stellar.operations import layout, serialize
async def operation(ctx, w, op):
async def process_operation(ctx, w, op):
if op.source_account:
await layout.confirm_source_account(ctx, op.source_account)
serialize.serialize_account(w, op.source_account)
serialize.write_account(w, op.source_account)
writers.write_uint32(w, consts.get_op_code(op))
if isinstance(op, serialize.StellarAccountMergeOp):
await layout.confirm_account_merge_op(ctx, op)
serialize.serialize_account_merge_op(w, op)
serialize.write_account_merge_op(w, op)
elif isinstance(op, serialize.StellarAllowTrustOp):
await layout.confirm_allow_trust_op(ctx, op)
serialize.serialize_allow_trust_op(w, op)
serialize.write_allow_trust_op(w, op)
elif isinstance(op, serialize.StellarBumpSequenceOp):
await layout.confirm_bump_sequence_op(ctx, op)
serialize.serialize_bump_sequence_op(w, op)
serialize.write_bump_sequence_op(w, op)
elif isinstance(op, serialize.StellarChangeTrustOp):
await layout.confirm_change_trust_op(ctx, op)
serialize.serialize_change_trust_op(w, op)
serialize.write_change_trust_op(w, op)
elif isinstance(op, serialize.StellarCreateAccountOp):
await layout.confirm_create_account_op(ctx, op)
serialize.serialize_create_account_op(w, op)
serialize.write_create_account_op(w, op)
elif isinstance(op, serialize.StellarCreatePassiveOfferOp):
await layout.confirm_create_passive_offer_op(ctx, op)
serialize.serialize_create_passive_offer_op(w, op)
serialize.write_create_passive_offer_op(w, op)
elif isinstance(op, serialize.StellarManageDataOp):
await layout.confirm_manage_data_op(ctx, op)
serialize.serialize_manage_data_op(w, op)
serialize.write_manage_data_op(w, op)
elif isinstance(op, serialize.StellarManageOfferOp):
await layout.confirm_manage_offer_op(ctx, op)
serialize.serialize_manage_offer_op(w, op)
serialize.write_manage_offer_op(w, op)
elif isinstance(op, serialize.StellarPathPaymentOp):
await layout.confirm_path_payment_op(ctx, op)
serialize.serialize_path_payment_op(w, op)
serialize.write_path_payment_op(w, op)
elif isinstance(op, serialize.StellarPaymentOp):
await layout.confirm_payment_op(ctx, op)
serialize.serialize_payment_op(w, op)
serialize.write_payment_op(w, op)
elif isinstance(op, serialize.StellarSetOptionsOp):
await layout.confirm_set_options_op(ctx, op)
serialize.serialize_set_options_op(w, op)
serialize.write_set_options_op(w, op)
else:
raise ValueError("serialize.Stellar: unknown operation")
raise ValueError("Unknown operation")

View File

@ -15,42 +15,42 @@ from trezor.wire import ProcessError
from apps.stellar import consts, writers
def serialize_account_merge_op(w, msg: StellarAccountMergeOp):
def write_account_merge_op(w, msg: StellarAccountMergeOp):
writers.write_pubkey(w, msg.destination_account)
def serialize_allow_trust_op(w, msg: StellarAllowTrustOp):
def write_allow_trust_op(w, msg: StellarAllowTrustOp):
# trustor account (the account being allowed to access the asset)
writers.write_pubkey(w, msg.trusted_account)
writers.write_uint32(w, msg.asset_type)
_serialize_asset_code(w, msg.asset_type, msg.asset_code)
_write_asset_code(w, msg.asset_type, msg.asset_code)
writers.write_bool(w, msg.is_authorized)
def serialize_bump_sequence_op(w, msg: StellarBumpSequenceOp):
def write_bump_sequence_op(w, msg: StellarBumpSequenceOp):
writers.write_uint64(w, msg.bump_to)
def serialize_change_trust_op(w, msg: StellarChangeTrustOp):
_serialize_asset(w, msg.asset)
def write_change_trust_op(w, msg: StellarChangeTrustOp):
_write_asset(w, msg.asset)
writers.write_uint64(w, msg.limit)
def serialize_create_account_op(w, msg: StellarCreateAccountOp):
def write_create_account_op(w, msg: StellarCreateAccountOp):
writers.write_pubkey(w, msg.new_account)
writers.write_uint64(w, msg.starting_balance)
def serialize_create_passive_offer_op(w, msg: StellarCreatePassiveOfferOp):
_serialize_asset(w, msg.selling_asset)
_serialize_asset(w, msg.buying_asset)
def write_create_passive_offer_op(w, msg: StellarCreatePassiveOfferOp):
_write_asset(w, msg.selling_asset)
_write_asset(w, msg.buying_asset)
writers.write_uint64(w, msg.amount)
writers.write_uint32(w, msg.price_n)
writers.write_uint32(w, msg.price_d)
def serialize_manage_data_op(w, msg: StellarManageDataOp):
def write_manage_data_op(w, msg: StellarManageDataOp):
if len(msg.key) > 64:
raise ProcessError("Stellar: max length of a key is 64 bytes")
writers.write_string(w, msg.key)
@ -60,34 +60,34 @@ def serialize_manage_data_op(w, msg: StellarManageDataOp):
writers.write_bytes(w, msg.value)
def serialize_manage_offer_op(w, msg: StellarManageOfferOp):
_serialize_asset(w, msg.selling_asset)
_serialize_asset(w, msg.buying_asset)
def write_manage_offer_op(w, msg: StellarManageOfferOp):
_write_asset(w, msg.selling_asset)
_write_asset(w, msg.buying_asset)
writers.write_uint64(w, msg.amount) # amount to sell
writers.write_uint32(w, msg.price_n) # numerator
writers.write_uint32(w, msg.price_d) # denominator
writers.write_uint64(w, msg.offer_id)
def serialize_path_payment_op(w, msg: StellarPathPaymentOp):
_serialize_asset(w, msg.send_asset)
def write_path_payment_op(w, msg: StellarPathPaymentOp):
_write_asset(w, msg.send_asset)
writers.write_uint64(w, msg.send_max)
writers.write_pubkey(w, msg.destination_account)
_serialize_asset(w, msg.destination_asset)
_write_asset(w, msg.destination_asset)
writers.write_uint64(w, msg.destination_amount)
writers.write_uint32(w, len(msg.paths))
for p in msg.paths:
_serialize_asset(w, p)
_write_asset(w, p)
def serialize_payment_op(w, msg: StellarPaymentOp):
def write_payment_op(w, msg: StellarPaymentOp):
writers.write_pubkey(w, msg.destination_account)
_serialize_asset(w, msg.asset)
_write_asset(w, msg.asset)
writers.write_uint64(w, msg.amount)
def serialize_set_options_op(w, msg: StellarSetOptionsOp):
def write_set_options_op(w, msg: StellarSetOptionsOp):
# inflation destination
writers.write_bool(w, bool(msg.inflation_destination_account))
if msg.inflation_destination_account:
@ -136,14 +136,14 @@ def serialize_set_options_op(w, msg: StellarSetOptionsOp):
writers.write_uint32(w, msg.signer_weight)
def serialize_account(w, source_account: str):
def write_account(w, source_account: str):
if source_account is None:
writers.write_bool(w, False)
return
writers.write_pubkey(w, source_account)
def _serialize_asset_code(w, asset_type: int, asset_code: str):
def _write_asset_code(w, asset_type: int, asset_code: str):
code = bytearray(asset_code)
if asset_type == consts.ASSET_TYPE_NATIVE:
return # nothing is needed
@ -157,10 +157,10 @@ def _serialize_asset_code(w, asset_type: int, asset_code: str):
raise ProcessError("Stellar: invalid asset type")
def _serialize_asset(w, asset: StellarAssetType):
def _write_asset(w, asset: StellarAssetType):
if asset is None:
writers.write_uint32(w, 0)
return
writers.write_uint32(w, asset.type)
_serialize_asset_code(w, asset.type, asset.code)
_write_asset_code(w, asset.type, asset.code)
writers.write_pubkey(w, asset.issuer)

View File

@ -9,7 +9,7 @@ from trezor.wire import ProcessError
from apps.common import seed
from apps.stellar import consts, helpers, layout, writers
from apps.stellar.operations import operation
from apps.stellar.operations import process_operation
async def sign_tx(ctx, msg: StellarSignTx):
@ -72,7 +72,7 @@ async def _operations(ctx, w: bytearray, num_operations: int):
writers.write_uint32(w, num_operations)
for i in range(num_operations):
op = await ctx.call(StellarTxOpRequest(), *consts.op_wire_types)
await operation(ctx, w, op)
await process_operation(ctx, w, op)
async def _memo(ctx, w: bytearray, msg: StellarSignTx):

View File

@ -1,30 +1,22 @@
import ustruct
from .helpers import public_key_from_address
from apps.common.writers import write_bytes, write_uint32_be, write_uint64_be
def write_uint32(w, n: int):
write_bytes(w, ustruct.pack(">L", n))
def write_uint64(w, n: int):
write_bytes(w, ustruct.pack(">Q", n))
write_uint32 = write_uint32_be
write_uint64 = write_uint64_be
def write_string(w, s: str):
write_uint32(w, len(s))
write_bytes(w, bytearray(s))
buf = s.encode()
write_uint32(w, len(buf))
write_bytes(w, buf)
# if len isn't a multiple of 4, add padding bytes
reminder = len(s) % 4
reminder = len(buf) % 4
if reminder:
write_bytes(w, bytearray([0] * (4 - reminder)))
write_bytes(w, bytes([0] * (4 - reminder)))
def write_bytes(w, buf: bytearray):
w.extend(buf)
def write_bool(w, val: True):
def write_bool(w, val: bool):
if val:
write_uint32(w, 1)
else:
@ -34,5 +26,4 @@ def write_bool(w, val: True):
def write_pubkey(w, address: str):
# first 4 bytes of an address are the type, there's only one type (0)
write_uint32(w, 0)
pubkey = public_key_from_address(address)
write_bytes(w, bytearray(pubkey))
write_bytes(w, public_key_from_address(address))

View File

@ -13,7 +13,7 @@ from apps.wallet.sign_tx.scripts import output_script_multisig, output_script_p2
from apps.wallet.sign_tx.writers import (
get_tx_hash,
write_bytes,
write_bytes_rev,
write_bytes_reversed,
write_tx_output,
write_uint32,
write_uint64,
@ -34,7 +34,7 @@ class Zip143:
self.h_outputs = HashWriter(blake2b, outlen=32, personal=b"ZcashOutputsHash")
def add_prevouts(self, txi: TxInputType):
write_bytes_rev(self.h_prevouts, txi.prev_hash)
write_bytes_reversed(self.h_prevouts, txi.prev_hash)
write_uint32(self.h_prevouts, txi.prev_index)
def add_sequence(self, txi: TxInputType):
@ -78,7 +78,7 @@ class Zip143:
write_uint32(h_preimage, tx.expiry) # 8. expiryHeight
write_uint32(h_preimage, sighash) # 9. nHashType
write_bytes_rev(h_preimage, txi.prev_hash) # 10a. outpoint
write_bytes_reversed(h_preimage, txi.prev_hash) # 10a. outpoint
write_uint32(h_preimage, txi.prev_index)
script_code = self.derive_script_code(txi, pubkeyhash) # 10b. scriptCode

View File

@ -1,9 +1,9 @@
from trezor.crypto.hashlib import ripemd160, sha256
from trezor.messages.MultisigRedeemScriptType import MultisigRedeemScriptType
from apps.common.writers import empty_bytearray
from apps.wallet.sign_tx.multisig import multisig_get_pubkeys
from apps.wallet.sign_tx.writers import (
bytearray_with_cap,
write_bytes,
write_op_push,
write_scriptnum,
@ -23,7 +23,7 @@ class ScriptsError(ValueError):
def input_script_p2pkh_or_p2sh(
pubkey: bytes, signature: bytes, sighash: int
) -> bytearray:
w = bytearray_with_cap(5 + len(signature) + 1 + 5 + len(pubkey))
w = empty_bytearray(5 + len(signature) + 1 + 5 + len(pubkey))
append_signature(w, signature, sighash)
append_pubkey(w, pubkey)
return w
@ -87,7 +87,7 @@ def output_script_native_p2wpkh_or_p2wsh(witprog: bytes) -> bytearray:
# 00 14 <20-byte-key-hash>
# 00 20 <32-byte-script-hash>
w = bytearray_with_cap(3 + len(witprog))
w = empty_bytearray(3 + len(witprog))
w.append(0x00) # witness version byte
w.append(len(witprog)) # pub key hash length is 20 (P2WPKH) or 32 (P2WSH) bytes
write_bytes(w, witprog) # pub key hash
@ -106,7 +106,7 @@ def input_script_p2wpkh_in_p2sh(pubkeyhash: bytes) -> bytearray:
# 16 00 14 <pubkeyhash>
# Signature is moved to the witness.
w = bytearray_with_cap(3 + len(pubkeyhash))
w = empty_bytearray(3 + len(pubkeyhash))
w.append(0x16) # length of the data
w.append(0x00) # witness version byte
w.append(0x14) # P2WPKH witness program (pub key hash length)
@ -129,7 +129,7 @@ def input_script_p2wsh_in_p2sh(script_hash: bytes) -> bytearray:
if len(script_hash) != 32:
raise ScriptsError("Redeem script hash should be 32 bytes long")
w = bytearray_with_cap(3 + len(script_hash))
w = empty_bytearray(3 + len(script_hash))
w.append(0x22) # length of the data
w.append(0x00) # witness version byte
w.append(0x20) # P2WSH witness program (redeem script hash length)
@ -142,7 +142,7 @@ def input_script_p2wsh_in_p2sh(script_hash: bytes) -> bytearray:
def witness_p2wpkh(signature: bytes, pubkey: bytes, sighash: int):
w = bytearray_with_cap(1 + 5 + len(signature) + 1 + 5 + len(pubkey))
w = empty_bytearray(1 + 5 + len(signature) + 1 + 5 + len(pubkey))
write_varint(w, 0x02) # num of segwit items, in P2WPKH it's always 2
append_signature(w, signature, sighash)
append_pubkey(w, pubkey)
@ -239,7 +239,7 @@ def output_script_multisig(pubkeys, m: int) -> bytearray:
def output_script_paytoopreturn(data: bytes) -> bytearray:
w = bytearray_with_cap(1 + 5 + len(data))
w = empty_bytearray(1 + 5 + len(data))
w.append(0x6A) # OP_RETURN
write_op_push(w, len(data))
w.extend(data)

View File

@ -11,7 +11,7 @@ from apps.wallet.sign_tx.scripts import output_script_multisig, output_script_p2
from apps.wallet.sign_tx.writers import (
get_tx_hash,
write_bytes,
write_bytes_rev,
write_bytes_reversed,
write_tx_output,
write_uint32,
write_uint64,
@ -30,7 +30,7 @@ class Bip143:
self.h_outputs = HashWriter(sha256)
def add_prevouts(self, txi: TxInputType):
write_bytes_rev(self.h_prevouts, txi.prev_hash)
write_bytes_reversed(self.h_prevouts, txi.prev_hash)
write_uint32(self.h_prevouts, txi.prev_index)
def add_sequence(self, txi: TxInputType):
@ -64,7 +64,7 @@ class Bip143:
write_bytes(h_preimage, bytearray(self.get_prevouts_hash(coin))) # hashPrevouts
write_bytes(h_preimage, bytearray(self.get_sequence_hash(coin))) # hashSequence
write_bytes_rev(h_preimage, txi.prev_hash) # outpoint
write_bytes_reversed(h_preimage, txi.prev_hash) # outpoint
write_uint32(h_preimage, txi.prev_index) # outpoint
script_code = self.derive_script_code(txi, pubkeyhash) # scriptCode

View File

@ -10,6 +10,7 @@ from trezor.utils import HashWriter
from apps.common import address_type, coins
from apps.common.coininfo import CoinInfo
from apps.common.writers import empty_bytearray
from apps.wallet.sign_tx import progress
from apps.wallet.sign_tx.addresses import *
from apps.wallet.sign_tx.helpers import *
@ -206,7 +207,7 @@ async def sign_tx(tx: SignTx, root: bip32.HDNode):
key_sign_pub = key_sign.public_key()
txi_sign.script_sig = input_derive_script(coin, txi_sign, key_sign_pub)
w_txi = bytearray_with_cap(
w_txi = empty_bytearray(
7 + len(txi_sign.prev_hash) + 4 + len(txi_sign.script_sig) + 4
)
if i_sign == 0: # serializing first input => prepend headers
@ -248,7 +249,7 @@ async def sign_tx(tx: SignTx, root: bip32.HDNode):
txi_sign.script_sig = input_derive_script(
coin, txi_sign, key_sign_pub, signature
)
w_txi_sign = bytearray_with_cap(
w_txi_sign = empty_bytearray(
5 + len(txi_sign.prev_hash) + 4 + len(txi_sign.script_sig) + 4
)
if i_sign == 0: # serializing first input => prepend headers
@ -344,7 +345,7 @@ async def sign_tx(tx: SignTx, root: bip32.HDNode):
txi_sign.script_sig = input_derive_script(
coin, txi_sign, key_sign_pub, signature
)
w_txi_sign = bytearray_with_cap(
w_txi_sign = empty_bytearray(
5 + len(txi_sign.prev_hash) + 4 + len(txi_sign.script_sig) + 4
)
if i_sign == 0: # serializing first input => prepend headers
@ -362,7 +363,7 @@ async def sign_tx(tx: SignTx, root: bip32.HDNode):
txo_bin.script_pubkey = output_derive_script(txo, coin, root)
# serialize output
w_txo_bin = bytearray_with_cap(5 + 8 + 5 + len(txo_bin.script_pubkey) + 4)
w_txo_bin = empty_bytearray(5 + 8 + 5 + len(txo_bin.script_pubkey) + 4)
if o == 0: # serializing first output => prepend outputs count
write_varint(w_txo_bin, tx.outputs_count)
write_tx_output(w_txo_bin, txo_bin)

View File

@ -2,12 +2,19 @@ from trezor.crypto.hashlib import sha256
from trezor.messages.TxInputType import TxInputType
from trezor.messages.TxOutputBinType import TxOutputBinType
# TX Serialization
# ===
from apps.common.writers import (
write_bytes,
write_bytes_reversed,
write_uint32_le,
write_uint64_le,
)
write_uint32 = write_uint32_le
write_uint64 = write_uint64_le
def write_tx_input(w, i: TxInputType):
write_bytes_rev(w, i.prev_hash)
write_bytes_reversed(w, i.prev_hash)
write_uint32(w, i.prev_index)
write_varint(w, len(i.script_sig))
write_bytes(w, i.script_sig)
@ -50,10 +57,6 @@ def write_op_push(w, n: int):
w.append((n >> 24) & 0xFF)
# Buffer IO & Serialization
# ===
def write_varint(w, n: int):
assert n >= 0 and n <= 0xFFFFFFFF
if n < 253:
@ -92,44 +95,6 @@ def write_scriptnum(w, n: int):
w.append((n >> 24) & 0xFF)
def write_uint32(w, n: int):
assert n >= 0 and n <= 0xFFFFFFFF
w.append(n & 0xFF)
w.append((n >> 8) & 0xFF)
w.append((n >> 16) & 0xFF)
w.append((n >> 24) & 0xFF)
def write_uint64(w, n: int):
assert n >= 0 and n <= 0xFFFFFFFFFFFFFFFF
w.append(n & 0xFF)
w.append((n >> 8) & 0xFF)
w.append((n >> 16) & 0xFF)
w.append((n >> 24) & 0xFF)
w.append((n >> 32) & 0xFF)
w.append((n >> 40) & 0xFF)
w.append((n >> 48) & 0xFF)
w.append((n >> 56) & 0xFF)
def write_bytes(w, buf: bytearray):
w.extend(buf)
def write_bytes_rev(w, buf: bytearray):
w.extend(bytearray(reversed(buf)))
def bytearray_with_cap(cap: int) -> bytearray:
b = bytearray(cap)
b[:] = bytes()
return b
# Hashes
# ===
def get_tx_hash(w, double: bool = False, reverse: bool = False) -> bytes:
d = w.get_digest()
if double:

View File

@ -1,85 +1,159 @@
from common import *
from trezor.crypto import hashlib
from trezor.messages.NEMAggregateModification import NEMAggregateModification
from trezor.messages.NEMCosignatoryModification import NEMCosignatoryModification
from trezor.messages.NEMSignTx import NEMSignTx
from trezor.messages.NEMTransactionCommon import NEMTransactionCommon
from apps.nem.helpers import *
from apps.nem.multisig import *
from apps.nem.multisig.serialize import *
from trezor.crypto import hashlib
from trezor.messages.NEMSignTx import NEMSignTx
from trezor.messages.NEMAggregateModification import NEMAggregateModification
from trezor.messages.NEMCosignatoryModification import NEMCosignatoryModification
from trezor.messages.NEMTransactionCommon import NEMTransactionCommon
class TestNemMultisigAggregateModification(unittest.TestCase):
def test_nem_transaction_aggregate_modification(self):
# http://bob.nem.ninja:8765/#/aggregate/6a55471b17159e5b6cd579c421e95a4e39d92e3f78b0a55ee337e785a601d3a2
m = _create_msg(NEM_NETWORK_TESTNET,
0,
22000000,
0,
2,
0)
t = serialize_aggregate_modification(m.transaction, m.aggregate_modification, unhexlify("462ee976890916e54fa825d26bdd0235f5eb5b6a143c199ab0ae5ee9328e08ce"))
m = _create_msg(NEM_NETWORK_TESTNET, 0, 22000000, 0, 2, 0)
t = serialize_aggregate_modification(
m.transaction,
m.aggregate_modification,
unhexlify(
"462ee976890916e54fa825d26bdd0235f5eb5b6a143c199ab0ae5ee9328e08ce"
),
)
serialize_cosignatory_modification(t, 1, unhexlify(
"994793ba1c789fa9bdea918afc9b06e2d0309beb1081ac5b6952991e4defd324"))
serialize_cosignatory_modification(t, 1, unhexlify(
"c54d6e33ed1446eedd7f7a80a588dd01857f723687a09200c1917d5524752f8b"))
write_cosignatory_modification(
t,
1,
unhexlify(
"994793ba1c789fa9bdea918afc9b06e2d0309beb1081ac5b6952991e4defd324"
),
)
write_cosignatory_modification(
t,
1,
unhexlify(
"c54d6e33ed1446eedd7f7a80a588dd01857f723687a09200c1917d5524752f8b"
),
)
self.assertEqual(hashlib.sha3_256(t, keccak=True).digest(),
unhexlify("6a55471b17159e5b6cd579c421e95a4e39d92e3f78b0a55ee337e785a601d3a2"))
self.assertEqual(
hashlib.sha3_256(t, keccak=True).digest(),
unhexlify(
"6a55471b17159e5b6cd579c421e95a4e39d92e3f78b0a55ee337e785a601d3a2"
),
)
# http://chain.nem.ninja/#/aggregate/cc64ca69bfa95db2ff7ac1e21fe6d27ece189c603200ebc9778d8bb80ca25c3c
m = _create_msg(NEM_NETWORK_MAINNET,
0,
40000000,
0,
5,
0)
t = serialize_aggregate_modification(m.transaction, m.aggregate_modification, unhexlify("f41b99320549741c5cce42d9e4bb836d98c50ed5415d0c3c2912d1bb50e6a0e5"))
m = _create_msg(NEM_NETWORK_MAINNET, 0, 40000000, 0, 5, 0)
t = serialize_aggregate_modification(
m.transaction,
m.aggregate_modification,
unhexlify(
"f41b99320549741c5cce42d9e4bb836d98c50ed5415d0c3c2912d1bb50e6a0e5"
),
)
serialize_cosignatory_modification(t, 1, unhexlify(
"1fbdbdde28daf828245e4533765726f0b7790e0b7146e2ce205df3e86366980b"))
serialize_cosignatory_modification(t, 1, unhexlify(
"f94e8702eb1943b23570b1b83be1b81536df35538978820e98bfce8f999e2d37"))
serialize_cosignatory_modification(t, 1, unhexlify(
"826cedee421ff66e708858c17815fcd831a4bb68e3d8956299334e9e24380ba8"))
serialize_cosignatory_modification(t, 1, unhexlify(
"719862cd7d0f4e875a6a0274c9a1738f38f40ad9944179006a54c34724c1274d"))
serialize_cosignatory_modification(t, 1, unhexlify(
"43aa69177018fc3e2bdbeb259c81cddf24be50eef9c5386db51d82386c41475a"))
write_cosignatory_modification(
t,
1,
unhexlify(
"1fbdbdde28daf828245e4533765726f0b7790e0b7146e2ce205df3e86366980b"
),
)
write_cosignatory_modification(
t,
1,
unhexlify(
"f94e8702eb1943b23570b1b83be1b81536df35538978820e98bfce8f999e2d37"
),
)
write_cosignatory_modification(
t,
1,
unhexlify(
"826cedee421ff66e708858c17815fcd831a4bb68e3d8956299334e9e24380ba8"
),
)
write_cosignatory_modification(
t,
1,
unhexlify(
"719862cd7d0f4e875a6a0274c9a1738f38f40ad9944179006a54c34724c1274d"
),
)
write_cosignatory_modification(
t,
1,
unhexlify(
"43aa69177018fc3e2bdbeb259c81cddf24be50eef9c5386db51d82386c41475a"
),
)
self.assertEqual(hashlib.sha3_256(t, keccak=True).digest(),
unhexlify("cc64ca69bfa95db2ff7ac1e21fe6d27ece189c603200ebc9778d8bb80ca25c3c"))
self.assertEqual(
hashlib.sha3_256(t, keccak=True).digest(),
unhexlify(
"cc64ca69bfa95db2ff7ac1e21fe6d27ece189c603200ebc9778d8bb80ca25c3c"
),
)
def test_nem_transaction_aggregate_modification_relative_change(self):
# http://bob.nem.ninja:8765/#/aggregate/1fbdae5ba753e68af270930413ae90f671eb8ab58988116684bac0abd5726584
m = _create_msg(NEM_NETWORK_TESTNET,
6542254,
40000000,
6545854,
4,
2)
t = serialize_aggregate_modification(m.transaction, m.aggregate_modification, unhexlify("6bf7849c1eec6a2002995cc457dc00c4e29bad5c88de63f51e42dfdcd7b2131d"))
m = _create_msg(NEM_NETWORK_TESTNET, 6542254, 40000000, 6545854, 4, 2)
t = serialize_aggregate_modification(
m.transaction,
m.aggregate_modification,
unhexlify(
"6bf7849c1eec6a2002995cc457dc00c4e29bad5c88de63f51e42dfdcd7b2131d"
),
)
serialize_cosignatory_modification(t, 1, unhexlify(
"5f53d076c8c3ec3110b98364bc423092c3ec2be2b1b3c40fd8ab68d54fa39295"))
serialize_cosignatory_modification(t, 1, unhexlify(
"9eb199c2b4d406f64cb7aa5b2b0815264b56ba8fe44d558a6cb423a31a33c4c2"))
serialize_cosignatory_modification(t, 1, unhexlify(
"94b2323dab23a3faba24fa6ddda0ece4fbb06acfedd74e76ad9fae38d006882b"))
serialize_cosignatory_modification(t, 1, unhexlify(
"d88c6ee2a2cd3929d0d76b6b14ecb549d21296ab196a2b3a4cb2536bcce32e87"))
serialize_minimum_cosignatories(t, 2)
write_cosignatory_modification(
t,
1,
unhexlify(
"5f53d076c8c3ec3110b98364bc423092c3ec2be2b1b3c40fd8ab68d54fa39295"
),
)
write_cosignatory_modification(
t,
1,
unhexlify(
"9eb199c2b4d406f64cb7aa5b2b0815264b56ba8fe44d558a6cb423a31a33c4c2"
),
)
write_cosignatory_modification(
t,
1,
unhexlify(
"94b2323dab23a3faba24fa6ddda0ece4fbb06acfedd74e76ad9fae38d006882b"
),
)
write_cosignatory_modification(
t,
1,
unhexlify(
"d88c6ee2a2cd3929d0d76b6b14ecb549d21296ab196a2b3a4cb2536bcce32e87"
),
)
write_minimum_cosignatories(t, 2)
self.assertEqual(hashlib.sha3_256(t, keccak=True).digest(),
unhexlify("1fbdae5ba753e68af270930413ae90f671eb8ab58988116684bac0abd5726584"))
self.assertEqual(
hashlib.sha3_256(t, keccak=True).digest(),
unhexlify(
"1fbdae5ba753e68af270930413ae90f671eb8ab58988116684bac0abd5726584"
),
)
def _create_msg(network: int, timestamp: int, fee: int, deadline: int,
modifications: int, relative_change: int):
def _create_msg(
network: int,
timestamp: int,
fee: int,
deadline: int,
modifications: int,
relative_change: int,
):
m = NEMSignTx()
m.transaction = NEMTransactionCommon()
m.transaction.network = network
@ -94,5 +168,5 @@ def _create_msg(network: int, timestamp: int, fee: int, deadline: int,
return m
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()

View File

@ -24,7 +24,7 @@ class TestNemMultisig(unittest.TestCase):
0)
base_tx = serialize_aggregate_modification(m.transaction, m.aggregate_modification, unhexlify("abac2ee3d4aaa7a3bfb65261a00cc04c761521527dd3f2cf741e2815cbba83ac"))
base_tx = serialize_cosignatory_modification(base_tx, 2, unhexlify("e6cff9b3725a91f31089c3acca0fac3e341c00b1c8c6e9578f66c4514509c3b3"))
base_tx = write_cosignatory_modification(base_tx, 2, unhexlify("e6cff9b3725a91f31089c3acca0fac3e341c00b1c8c6e9578f66c4514509c3b3"))
m = _create_common_msg(NEM_NETWORK_TESTNET,
3939039,
6000000,

View File

@ -1,77 +1,98 @@
from common import *
from apps.ripple.serialize import serialize
from apps.ripple.serialize import serialize_amount
from apps.ripple.sign_tx import get_network_prefix
from trezor.messages.RippleSignTx import RippleSignTx
from trezor.messages.RipplePayment import RipplePayment
from trezor.messages.RippleSignTx import RippleSignTx
from apps.ripple.serialize import serialize, serialize_amount
from apps.ripple.sign_tx import get_network_prefix
class TestRippleSerializer(unittest.TestCase):
def test_amount(self):
# https://github.com/ripple/ripple-binary-codec/blob/4581f1b41e712f545ba08be15e188a557c731ecf/test/fixtures/data-driven-tests.json#L2494
assert serialize_amount(0) == unhexlify('4000000000000000')
assert serialize_amount(1) == unhexlify('4000000000000001')
assert serialize_amount(93493429243) == unhexlify('40000015c4a483fb')
assert serialize_amount(0) == unhexlify("4000000000000000")
assert serialize_amount(1) == unhexlify("4000000000000001")
assert serialize_amount(93493429243) == unhexlify("40000015c4a483fb")
with self.assertRaises(ValueError):
serialize_amount(1000000000000000000) # too large
with self.assertRaises(ValueError):
serialize_amount(-1) # negative not supported
with self.assertRaises(ValueError):
with self.assertRaises(Exception):
serialize_amount(1.1) # float numbers not supported
def test_transactions(self):
# from https://github.com/miracle2k/ripple-python
source_address = 'r3P9vH81KBayazSTrQj6S25jW6kDb779Gi'
payment = RipplePayment(200000000, 'r3kmLJN5D28dHuH8vZNUZpMC43pEHpaocV')
source_address = "r3P9vH81KBayazSTrQj6S25jW6kDb779Gi"
payment = RipplePayment(200000000, "r3kmLJN5D28dHuH8vZNUZpMC43pEHpaocV")
common = RippleSignTx(None, 10, None, 1, None, payment)
assert serialize(common, source_address) == unhexlify('120000240000000161400000000bebc20068400000000000000a811450f97a072f1c4357f1ad84566a609479d927c9428314550fc62003e785dc231a1058a05e56e3f09cf4e6')
assert serialize(common, source_address) == unhexlify(
"120000240000000161400000000bebc20068400000000000000a811450f97a072f1c4357f1ad84566a609479d927c9428314550fc62003e785dc231a1058a05e56e3f09cf4e6"
)
source_address = 'r3kmLJN5D28dHuH8vZNUZpMC43pEHpaocV'
payment = RipplePayment(1, 'r3P9vH81KBayazSTrQj6S25jW6kDb779Gi')
source_address = "r3kmLJN5D28dHuH8vZNUZpMC43pEHpaocV"
payment = RipplePayment(1, "r3P9vH81KBayazSTrQj6S25jW6kDb779Gi")
common = RippleSignTx(None, 99, None, 99, None, payment)
assert serialize(common, source_address) == unhexlify('12000024000000636140000000000000016840000000000000638114550fc62003e785dc231a1058a05e56e3f09cf4e6831450f97a072f1c4357f1ad84566a609479d927c942')
assert serialize(common, source_address) == unhexlify(
"12000024000000636140000000000000016840000000000000638114550fc62003e785dc231a1058a05e56e3f09cf4e6831450f97a072f1c4357f1ad84566a609479d927c942"
)
# https://github.com/ripple/ripple-binary-codec/blob/4581f1b41e712f545ba08be15e188a557c731ecf/test/fixtures/data-driven-tests.json#L1579
source_address = 'r9TeThyi5xiuUUrFjtPKZiHcDxs7K9H6Rb'
payment = RipplePayment(25000000, 'r4BPgS7DHebQiU31xWELvZawwSG2fSPJ7C')
source_address = "r9TeThyi5xiuUUrFjtPKZiHcDxs7K9H6Rb"
payment = RipplePayment(25000000, "r4BPgS7DHebQiU31xWELvZawwSG2fSPJ7C")
common = RippleSignTx(None, 10, 0, 2, None, payment)
assert serialize(common, source_address) == unhexlify('120000220000000024000000026140000000017d784068400000000000000a81145ccb151f6e9d603f394ae778acf10d3bece874f68314e851bbbe79e328e43d68f43445368133df5fba5a')
assert serialize(common, source_address) == unhexlify(
"120000220000000024000000026140000000017d784068400000000000000a81145ccb151f6e9d603f394ae778acf10d3bece874f68314e851bbbe79e328e43d68f43445368133df5fba5a"
)
# https://github.com/ripple/ripple-binary-codec/blob/4581f1b41e712f545ba08be15e188a557c731ecf/test/fixtures/data-driven-tests.json#L1651
source_address = 'rGWTUVmm1fB5QUjMYn8KfnyrFNgDiD9H9e'
payment = RipplePayment(200000, 'rw71Qs1UYQrSQ9hSgRohqNNQcyjCCfffkQ')
source_address = "rGWTUVmm1fB5QUjMYn8KfnyrFNgDiD9H9e"
payment = RipplePayment(200000, "rw71Qs1UYQrSQ9hSgRohqNNQcyjCCfffkQ")
common = RippleSignTx(None, 15, 0, 144, None, payment)
# 201b005ee9ba removed from the test vector because last ledger sequence is not supported
assert serialize(common, source_address) == unhexlify('12000022000000002400000090614000000000030d4068400000000000000f8114aa1bd19d9e87be8069fdbf6843653c43837c03c6831467fe6ec28e0464dd24fb2d62a492aac697cfad02')
assert serialize(common, source_address) == unhexlify(
"12000022000000002400000090614000000000030d4068400000000000000f8114aa1bd19d9e87be8069fdbf6843653c43837c03c6831467fe6ec28e0464dd24fb2d62a492aac697cfad02"
)
# https://github.com/ripple/ripple-binary-codec/blob/4581f1b41e712f545ba08be15e188a557c731ecf/test/fixtures/data-driven-tests.json#L1732
source_address = 'r4BPgS7DHebQiU31xWELvZawwSG2fSPJ7C'
payment = RipplePayment(25000000, 'rBqSFEFg2B6GBMobtxnU1eLA1zbNC9NDGM')
source_address = "r4BPgS7DHebQiU31xWELvZawwSG2fSPJ7C"
payment = RipplePayment(25000000, "rBqSFEFg2B6GBMobtxnU1eLA1zbNC9NDGM")
common = RippleSignTx(None, 12, 0, 1, None, payment)
# 2ef72d50ca removed from the test vector because destination tag is not supported
assert serialize(common, source_address) == unhexlify('120000220000000024000000016140000000017d784068400000000000000c8114e851bbbe79e328e43d68f43445368133df5fba5a831476dac5e814cd4aa74142c3ab45e69a900e637aa2')
assert serialize(common, source_address) == unhexlify(
"120000220000000024000000016140000000017d784068400000000000000c8114e851bbbe79e328e43d68f43445368133df5fba5a831476dac5e814cd4aa74142c3ab45e69a900e637aa2"
)
def test_transactions_for_signing(self):
# https://github.com/ripple/ripple-binary-codec/blob/4581f1b41e712f545ba08be15e188a557c731ecf/test/signing-data-encoding-test.js
source_address = 'r9LqNeG6qHxjeUocjvVki2XR35weJ9mZgQ'
payment = RipplePayment(1000, 'rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh')
source_address = "r9LqNeG6qHxjeUocjvVki2XR35weJ9mZgQ"
payment = RipplePayment(1000, "rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh")
common = RippleSignTx(None, 10, 2147483648, 1, None, payment)
tx = serialize(common, source_address, pubkey=unhexlify('ed5f5ac8b98974a3ca843326d9b88cebd0560177b973ee0b149f782cfaa06dc66a'))
tx = serialize(
common,
source_address,
pubkey=unhexlify(
"ed5f5ac8b98974a3ca843326d9b88cebd0560177b973ee0b149f782cfaa06dc66a"
),
)
tx = get_network_prefix() + tx
assert tx[0:4] == unhexlify('53545800') # signing prefix
assert tx[4:7] == unhexlify('120000') # transaction type
assert tx[7:12] == unhexlify('2280000000') # flags
assert tx[12:17] == unhexlify('2400000001') # sequence
assert tx[17:26] == unhexlify('6140000000000003e8') # amount
assert tx[26:35] == unhexlify('68400000000000000a') # fee
assert tx[35:70] == unhexlify('7321ed5f5ac8b98974a3ca843326d9b88cebd0560177b973ee0b149f782cfaa06dc66a') # singing pub key
assert tx[70:92] == unhexlify('81145b812c9d57731e27a2da8b1830195f88ef32a3b6') # account
assert tx[92:114] == unhexlify('8314b5f762798a53d543a014caf8b297cff8f2f937e8') # destination
assert tx[0:4] == unhexlify("53545800") # signing prefix
assert tx[4:7] == unhexlify("120000") # transaction type
assert tx[7:12] == unhexlify("2280000000") # flags
assert tx[12:17] == unhexlify("2400000001") # sequence
assert tx[17:26] == unhexlify("6140000000000003e8") # amount
assert tx[26:35] == unhexlify("68400000000000000a") # fee
assert tx[35:70] == unhexlify(
"7321ed5f5ac8b98974a3ca843326d9b88cebd0560177b973ee0b149f782cfaa06dc66a"
) # singing pub key
assert tx[70:92] == unhexlify(
"81145b812c9d57731e27a2da8b1830195f88ef32a3b6"
) # account
assert tx[92:114] == unhexlify(
"8314b5f762798a53d543a014caf8b297cff8f2f937e8"
) # destination
assert len(tx[114:]) == 0 # that's it
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()