1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-01-20 12:21:01 +00:00

chore(core): decrease ripple size by 1170 bytes

This commit is contained in:
grdddj 2022-09-16 12:06:56 +02:00 committed by matejcik
parent 9c0c3852f5
commit f8f3f1bc55
6 changed files with 128 additions and 166 deletions

View File

@ -6,18 +6,18 @@ from trezor.crypto import base58
_ripple_alphabet = "rpshnaf39wBUDNEGHJKLM4PQRST7VWXYZ2bcdeCg65jkm8oFqi1tuvAxyz" _ripple_alphabet = "rpshnaf39wBUDNEGHJKLM4PQRST7VWXYZ2bcdeCg65jkm8oFqi1tuvAxyz"
def encode(data: bytes) -> str: def _encode(data: bytes) -> str:
""" """
Convert bytes to base58 encoded string. Convert bytes to base58 encoded string.
""" """
return base58.encode(data, alphabet=_ripple_alphabet) return base58.encode(data, _ripple_alphabet)
def decode(string: str) -> bytes: def _decode(string: str) -> bytes:
""" """
Convert base58 encoded string to bytes. Convert base58 encoded string to bytes.
""" """
return base58.decode(string, alphabet=_ripple_alphabet) return base58.decode(string, _ripple_alphabet)
def encode_check( def encode_check(
@ -26,7 +26,7 @@ def encode_check(
""" """
Convert bytes to base58 encoded string, append checksum. Convert bytes to base58 encoded string, append checksum.
""" """
return encode(data + digestfunc(data)) return _encode(data + digestfunc(data))
def decode_check( def decode_check(
@ -35,5 +35,5 @@ def decode_check(
""" """
Convert base58 encoded string to bytes and verify checksum. Convert base58 encoded string to bytes and verify checksum.
""" """
data = decode(string) data = _decode(string)
return base58.verify_checksum(data, digestfunc) return base58.verify_checksum(data, digestfunc)

View File

@ -1,15 +1,9 @@
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from trezor.messages import RippleAddress
from trezor.ui.layouts import show_address
from apps.common import paths
from apps.common.keychain import auto_keychain from apps.common.keychain import auto_keychain
from .helpers import address_from_public_key
if TYPE_CHECKING: if TYPE_CHECKING:
from trezor.messages import RippleGetAddress from trezor.messages import RippleGetAddress, RippleAddress
from apps.common.keychain import Keychain from apps.common.keychain import Keychain
from trezor.wire import Context from trezor.wire import Context
@ -18,6 +12,12 @@ if TYPE_CHECKING:
async def get_address( async def get_address(
ctx: Context, msg: RippleGetAddress, keychain: Keychain ctx: Context, msg: RippleGetAddress, keychain: Keychain
) -> RippleAddress: ) -> RippleAddress:
# NOTE: local imports here saves 20 bytes
from trezor.messages import RippleAddress
from trezor.ui.layouts import show_address
from apps.common import paths
from .helpers import address_from_public_key
await paths.validate_path(ctx, keychain, msg.address_n) await paths.validate_path(ctx, keychain, msg.address_n)
node = keychain.derive(msg.address_n) node = keychain.derive(msg.address_n)
@ -26,6 +26,6 @@ async def get_address(
if msg.show_display: if msg.show_display:
title = paths.address_n_to_str(msg.address_n) title = paths.address_n_to_str(msg.address_n)
await show_address(ctx, address=address, title=title) await show_address(ctx, address, title=title)
return RippleAddress(address=address) return RippleAddress(address=address)

View File

@ -1,7 +1,5 @@
from micropython import const from micropython import const
from trezor.crypto.hashlib import ripemd160, sha256
from . import base58_ripple from . import base58_ripple
# HASH_TX_ID = const(0x5458_4E00) # 'TXN' # HASH_TX_ID = const(0x5458_4E00) # 'TXN'
@ -36,6 +34,9 @@ def address_from_public_key(pubkey: bytes) -> str:
Returns the Ripple address created using base58 Returns the Ripple address created using base58
""" """
# NOTE: local imports here saves 8 bytes
from trezor.crypto.hashlib import ripemd160, sha256
h = sha256(pubkey).digest() h = sha256(pubkey).digest()
h = ripemd160(h).digest() h = ripemd160(h).digest()

View File

@ -3,9 +3,8 @@ from typing import TYPE_CHECKING
from trezor.enums import ButtonRequestType from trezor.enums import ButtonRequestType
from trezor.strings import format_amount from trezor.strings import format_amount
from trezor.ui.layouts import confirm_metadata from trezor.ui.layouts import confirm_metadata
from trezor.ui.layouts.altcoin import confirm_total_ripple
from . import helpers from .helpers import DECIMALS
if TYPE_CHECKING: if TYPE_CHECKING:
from trezor.wire import Context from trezor.wire import Context
@ -15,11 +14,11 @@ async def require_confirm_fee(ctx: Context, fee: int) -> None:
await confirm_metadata( await confirm_metadata(
ctx, ctx,
"confirm_fee", "confirm_fee",
title="Confirm fee", "Confirm fee",
content="Transaction fee:\n{}", "Transaction fee:\n{}",
param=format_amount(fee, helpers.DECIMALS) + " XRP", format_amount(fee, DECIMALS) + " XRP",
ButtonRequestType.ConfirmOutput,
hide_continue=True, hide_continue=True,
br_code=ButtonRequestType.ConfirmOutput,
) )
@ -27,13 +26,16 @@ async def require_confirm_destination_tag(ctx: Context, tag: int) -> None:
await confirm_metadata( await confirm_metadata(
ctx, ctx,
"confirm_destination_tag", "confirm_destination_tag",
title="Confirm tag", "Confirm tag",
content="Destination tag:\n{}", "Destination tag:\n{}",
param=str(tag), str(tag),
ButtonRequestType.ConfirmOutput,
hide_continue=True, hide_continue=True,
br_code=ButtonRequestType.ConfirmOutput,
) )
async def require_confirm_tx(ctx: Context, to: str, value: int) -> None: async def require_confirm_tx(ctx: Context, to: str, value: int) -> None:
await confirm_total_ripple(ctx, to, format_amount(value, helpers.DECIMALS)) # NOTE: local imports here saves 4 bytes
from trezor.ui.layouts.altcoin import confirm_total_ripple
await confirm_total_ripple(ctx, to, format_amount(value, DECIMALS))

View File

@ -11,41 +11,17 @@
from micropython import const from micropython import const
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from . import helpers
if TYPE_CHECKING: if TYPE_CHECKING:
from trezor.messages import RippleSignTx from trezor.messages import RippleSignTx
from trezor.utils import Writer from trezor.utils import Writer
class RippleField:
def __init__(self, type: int, key: int) -> None:
self.type: int = type
self.key: int = key
_FIELD_TYPE_INT16 = const(1) _FIELD_TYPE_INT16 = const(1)
_FIELD_TYPE_INT32 = const(2) _FIELD_TYPE_INT32 = const(2)
_FIELD_TYPE_AMOUNT = const(6) _FIELD_TYPE_AMOUNT = const(6)
_FIELD_TYPE_VL = const(7) _FIELD_TYPE_VL = const(7)
_FIELD_TYPE_ACCOUNT = const(8) _FIELD_TYPE_ACCOUNT = const(8)
FIELDS_MAP: dict[str, RippleField] = {
"account": RippleField(type=_FIELD_TYPE_ACCOUNT, key=1),
"amount": RippleField(type=_FIELD_TYPE_AMOUNT, key=1),
"destination": RippleField(type=_FIELD_TYPE_ACCOUNT, key=3),
"fee": RippleField(type=_FIELD_TYPE_AMOUNT, key=8),
"sequence": RippleField(type=_FIELD_TYPE_INT32, key=4),
"type": RippleField(type=_FIELD_TYPE_INT16, key=2),
"signingPubKey": RippleField(type=_FIELD_TYPE_VL, key=3),
"flags": RippleField(type=_FIELD_TYPE_INT32, key=2),
"txnSignature": RippleField(type=_FIELD_TYPE_VL, key=4),
"lastLedgerSequence": RippleField(type=_FIELD_TYPE_INT32, key=27),
"destinationTag": RippleField(type=_FIELD_TYPE_INT32, key=14),
}
TRANSACTION_TYPES = {"Payment": 0}
def serialize( def serialize(
msg: RippleSignTx, msg: RippleSignTx,
@ -53,93 +29,98 @@ def serialize(
pubkey: bytes, pubkey: bytes,
signature: bytes | None = None, signature: bytes | None = None,
) -> bytearray: ) -> bytearray:
w = bytearray()
# must be sorted numerically first by type and then by name # must be sorted numerically first by type and then by name
write(w, FIELDS_MAP["type"], TRANSACTION_TYPES["Payment"]) fields_to_write = ( # field_type, field_key, value
write(w, FIELDS_MAP["flags"], msg.flags) (_FIELD_TYPE_INT16, 2, 0), # payment type is 0
write(w, FIELDS_MAP["sequence"], msg.sequence) (_FIELD_TYPE_INT32, 2, msg.flags), # flags
write(w, FIELDS_MAP["destinationTag"], msg.payment.destination_tag) (_FIELD_TYPE_INT32, 4, msg.sequence), # sequence
write(w, FIELDS_MAP["lastLedgerSequence"], msg.last_ledger_sequence) (_FIELD_TYPE_INT32, 14, msg.payment.destination_tag), # destinationTag
write(w, FIELDS_MAP["amount"], msg.payment.amount) (_FIELD_TYPE_INT32, 27, msg.last_ledger_sequence), # lastLedgerSequence
write(w, FIELDS_MAP["fee"], msg.fee) (_FIELD_TYPE_AMOUNT, 1, msg.payment.amount), # amount
write(w, FIELDS_MAP["signingPubKey"], pubkey) (_FIELD_TYPE_AMOUNT, 8, msg.fee), # fee
write(w, FIELDS_MAP["txnSignature"], signature) (_FIELD_TYPE_VL, 3, pubkey), # signingPubKey
write(w, FIELDS_MAP["account"], source_address) (_FIELD_TYPE_VL, 4, signature), # txnSignature
write(w, FIELDS_MAP["destination"], msg.payment.destination) (_FIELD_TYPE_ACCOUNT, 1, source_address), # account
(_FIELD_TYPE_ACCOUNT, 3, msg.payment.destination), # destination
)
w = bytearray()
for field_type, field_key, value in fields_to_write:
_write(w, field_type, field_key, value)
return w return w
def write(w: Writer, field: RippleField, value: int | bytes | str | None) -> None: def _write(
w: Writer, field_type: int, field_key: int, value: int | bytes | str | None
) -> None:
from . import helpers
if value is None: if value is None:
return return
write_type(w, field)
if field.type == _FIELD_TYPE_INT16: # write_type
if field_key <= 0xF:
w.append((field_type << 4) | field_key)
else:
# this concerns two-bytes fields such as lastLedgerSequence
w.append(field_type << 4)
w.append(field_key)
if field_type == _FIELD_TYPE_INT16:
assert isinstance(value, int) assert isinstance(value, int)
w.extend(value.to_bytes(2, "big")) w.extend(value.to_bytes(2, "big"))
elif field.type == _FIELD_TYPE_INT32: elif field_type == _FIELD_TYPE_INT32:
assert isinstance(value, int) assert isinstance(value, int)
w.extend(value.to_bytes(4, "big")) w.extend(value.to_bytes(4, "big"))
elif field.type == _FIELD_TYPE_AMOUNT: elif field_type == _FIELD_TYPE_AMOUNT:
assert isinstance(value, int) assert isinstance(value, int)
w.extend(serialize_amount(value))
elif field.type == _FIELD_TYPE_ACCOUNT: # serialize_amount
if value < 0:
raise ValueError("Only non-negative integers are supported")
if value > helpers.MAX_ALLOWED_AMOUNT:
raise ValueError("Value is too large")
serialized_amount = bytearray(value.to_bytes(8, "big"))
serialized_amount[0] &= 0x7F # clear first bit to indicate XRP
serialized_amount[0] |= 0x40 # set second bit to indicate positive number
w.extend(serialized_amount)
elif field_type == _FIELD_TYPE_ACCOUNT:
assert isinstance(value, str) assert isinstance(value, str)
write_bytes_varint(w, helpers.decode_address(value)) write_bytes_varint(w, helpers.decode_address(value))
elif field.type == _FIELD_TYPE_VL: elif field_type == _FIELD_TYPE_VL:
assert isinstance(value, (bytes, bytearray)) assert isinstance(value, (bytes, bytearray))
write_bytes_varint(w, value) write_bytes_varint(w, value)
else: else:
raise ValueError("Unknown field type") raise ValueError("Unknown field type")
def write_type(w: Writer, field: RippleField) -> None:
if field.key <= 0xF:
w.append((field.type << 4) | field.key)
else:
# this concerns two-bytes fields such as lastLedgerSequence
w.append(field.type << 4)
w.append(field.key)
def serialize_amount(value: int) -> bytearray:
if value < 0:
raise ValueError("Only non-negative integers are supported")
if value > helpers.MAX_ALLOWED_AMOUNT:
raise ValueError("Value is too large")
b = bytearray(value.to_bytes(8, "big"))
b[0] &= 0x7F # clear first bit to indicate XRP
b[0] |= 0x40 # set second bit to indicate positive number
return b
def write_bytes_varint(w: Writer, value: bytes) -> None: def write_bytes_varint(w: Writer, value: bytes) -> None:
"""Serialize a variable length bytes.""" """Serialize a variable length bytes."""
write_varint(w, len(value)) append = w.append # local_cache_attribute
w.extend(value)
# write_varint
def write_varint(w: Writer, val: int) -> None: # Implements variable-length int encoding from Ripple.
""" # See: https://ripple.com/wiki/Binary_Format#Variable_Length_Data_Encoding
Implements variable-length int encoding from Ripple. val = len(value)
See: https://ripple.com/wiki/Binary_Format#Variable_Length_Data_Encoding
"""
if val < 0: if val < 0:
raise ValueError("Only non-negative integers are supported") raise ValueError("Only non-negative integers are supported")
elif val < 192: elif val < 192:
w.append(val) append(val)
elif val <= 12480: elif val <= 12480:
val -= 193 val -= 193
w.append(193 + rshift(val, 8)) append(193 + rshift(val, 8))
w.append(val & 0xFF) append(val & 0xFF)
elif val <= 918744: elif val <= 918744:
val -= 12481 val -= 12481
w.append(241 + rshift(val, 16)) append(241 + rshift(val, 16))
w.append(rshift(val, 8) & 0xFF) append(rshift(val, 8) & 0xFF)
w.append(val & 0xFF) append(val & 0xFF)
else: else:
raise ValueError("Value is too large") raise ValueError("Value is too large")
w.extend(value)
def rshift(val: int, n: int) -> int: def rshift(val: int, n: int) -> int:
""" """

View File

@ -1,81 +1,59 @@
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from apps.common.keychain import auto_keychain
if TYPE_CHECKING:
from trezor.messages import RippleSignTx, RippleSignedTx
from apps.common.keychain import Keychain
from trezor.wire import Context
# NOTE: it is one big function because that way it is the most flash-space-efficient
@auto_keychain(__name__)
async def sign_tx(
ctx: Context, msg: RippleSignTx, keychain: Keychain
) -> RippleSignedTx:
from trezor.crypto import der from trezor.crypto import der
from trezor.crypto.curve import secp256k1 from trezor.crypto.curve import secp256k1
from trezor.crypto.hashlib import sha512 from trezor.crypto.hashlib import sha512
from trezor.messages import RippleSignedTx from trezor.messages import RippleSignedTx
from trezor.wire import ProcessError from trezor.wire import ProcessError
from apps.common import paths from apps.common import paths
from apps.common.keychain import auto_keychain
from . import helpers, layout from . import helpers, layout
from .serialize import serialize from .serialize import serialize
if TYPE_CHECKING: payment = msg.payment # local_cache_attribute
from trezor.messages import RippleSignTx
from apps.common.keychain import Keychain
from trezor.wire import Context
if payment.amount > helpers.MAX_ALLOWED_AMOUNT:
@auto_keychain(__name__) raise ProcessError("Amount exceeds maximum allowed amount.")
async def sign_tx(
ctx: Context, msg: RippleSignTx, keychain: Keychain
) -> RippleSignedTx:
validate(msg)
await paths.validate_path(ctx, keychain, msg.address_n) await paths.validate_path(ctx, keychain, msg.address_n)
node = keychain.derive(msg.address_n) node = keychain.derive(msg.address_n)
source_address = helpers.address_from_public_key(node.public_key()) source_address = helpers.address_from_public_key(node.public_key())
set_canonical_flag(msg) # Setting canonical flag
tx = serialize(msg, source_address, pubkey=node.public_key()) # Our ECDSA implementation already returns fully-canonical signatures,
to_sign = get_network_prefix() + tx # so we're enforcing it in the transaction using the designated flag
# - see https://wiki.ripple.com/Transaction_Malleability#Using_Fully-Canonical_Signatures
check_fee(msg.fee) # - see https://github.com/trezor/trezor-crypto/blob/3e8974ff8871263a70b7fbb9a27a1da5b0d810f7/ecdsa.c#L791
if msg.payment.destination_tag is not None:
await layout.require_confirm_destination_tag(ctx, msg.payment.destination_tag)
await layout.require_confirm_fee(ctx, msg.fee)
await layout.require_confirm_tx(ctx, msg.payment.destination, msg.payment.amount)
signature = ecdsa_sign(node.private_key(), first_half_of_sha512(to_sign))
tx = serialize(msg, source_address, pubkey=node.public_key(), signature=signature)
return RippleSignedTx(signature=signature, serialized_tx=tx)
def check_fee(fee: int) -> None:
if fee < helpers.MIN_FEE or fee > helpers.MAX_FEE:
raise ProcessError("Fee must be in the range of 10 to 10,000 drops")
def get_network_prefix() -> bytes:
"""Network prefix is prepended before the transaction and public key is included"""
return helpers.HASH_TX_SIGN.to_bytes(4, "big")
def first_half_of_sha512(b: bytes) -> bytes:
"""First half of SHA512, which Ripple uses"""
hash = sha512(b)
return hash.digest()[:32]
def ecdsa_sign(private_key: bytes, digest: bytes) -> bytes:
"""Signs and encodes signature into DER format"""
signature = secp256k1.sign(private_key, digest)
sig_der = der.encode_seq((signature[1:33], signature[33:65]))
return sig_der
def set_canonical_flag(msg: RippleSignTx) -> None:
"""
Our ECDSA implementation already returns fully-canonical signatures,
so we're enforcing it in the transaction using the designated flag
- see https://wiki.ripple.com/Transaction_Malleability#Using_Fully-Canonical_Signatures
- see https://github.com/trezor/trezor-crypto/blob/3e8974ff8871263a70b7fbb9a27a1da5b0d810f7/ecdsa.c#L791
"""
msg.flags |= helpers.FLAG_FULLY_CANONICAL msg.flags |= helpers.FLAG_FULLY_CANONICAL
tx = serialize(msg, source_address, node.public_key())
network_prefix = helpers.HASH_TX_SIGN.to_bytes(4, "big")
to_sign = network_prefix + tx
def validate(msg: RippleSignTx) -> None: if msg.fee < helpers.MIN_FEE or msg.fee > helpers.MAX_FEE:
if msg.payment.amount > helpers.MAX_ALLOWED_AMOUNT: raise ProcessError("Fee must be in the range of 10 to 10,000 drops")
raise ProcessError("Amount exceeds maximum allowed amount.")
if payment.destination_tag is not None:
await layout.require_confirm_destination_tag(ctx, payment.destination_tag)
await layout.require_confirm_fee(ctx, msg.fee)
await layout.require_confirm_tx(ctx, payment.destination, payment.amount)
# Signs and encodes signature into DER format
first_half_of_sha512 = sha512(to_sign).digest()[:32]
sig = secp256k1.sign(node.private_key(), first_half_of_sha512)
sig_encoded = der.encode_seq((sig[1:33], sig[33:65]))
tx = serialize(msg, source_address, node.public_key(), sig_encoded)
return RippleSignedTx(signature=sig_encoded, serialized_tx=tx)