mirror of
https://github.com/trezor/trezor-firmware.git
synced 2025-01-10 15:30:55 +00:00
trezorlib: implement auto-creating protobuf messages from dicts
along with a convert_dict function to rename keys from camelCase and optionally apply simple field renames
This commit is contained in:
parent
f05f9a2b37
commit
abf0e82d80
@ -1,7 +1,8 @@
|
||||
import binascii
|
||||
|
||||
from . import messages as proto
|
||||
from .tools import CallException, expect, normalize_nfc
|
||||
from .tools import CallException, expect, normalize_nfc, dict_from_camelcase
|
||||
from .protobuf import dict_to_proto
|
||||
|
||||
|
||||
@expect(proto.LiskAddress, field="address")
|
||||
@ -33,46 +34,11 @@ def verify_message(client, pubkey, signature, message):
|
||||
return isinstance(resp, proto.Success)
|
||||
|
||||
|
||||
def _asset_to_proto(asset):
|
||||
msg = proto.LiskTransactionAsset()
|
||||
|
||||
if "votes" in asset:
|
||||
msg.votes = asset["votes"]
|
||||
if "data" in asset:
|
||||
msg.data = asset["data"]
|
||||
if "signature" in asset:
|
||||
msg.signature = proto.LiskSignatureType()
|
||||
msg.signature.public_key = binascii.unhexlify(asset["signature"]["publicKey"])
|
||||
if "delegate" in asset:
|
||||
msg.delegate = proto.LiskDelegateType()
|
||||
msg.delegate.username = asset["delegate"]["username"]
|
||||
if "multisignature" in asset:
|
||||
msg.multisignature = proto.LiskMultisignatureType()
|
||||
msg.multisignature.min = asset["multisignature"]["min"]
|
||||
msg.multisignature.life_time = asset["multisignature"]["lifetime"]
|
||||
msg.multisignature.keys_group = asset["multisignature"]["keysgroup"]
|
||||
return msg
|
||||
RENAMES = {"lifetime": "life_time", "keysgroup": "keys_group"}
|
||||
|
||||
|
||||
@expect(proto.LiskSignedTx)
|
||||
def sign_tx(client, n, transaction):
|
||||
msg = proto.LiskTransactionCommon()
|
||||
|
||||
msg.type = transaction["type"]
|
||||
msg.fee = int(
|
||||
transaction["fee"]
|
||||
) # Lisk use strings for big numbers (javascript issue)
|
||||
msg.amount = int(transaction["amount"]) # And we convert it back to number
|
||||
msg.timestamp = transaction["timestamp"]
|
||||
|
||||
if "recipientId" in transaction:
|
||||
msg.recipient_id = transaction["recipientId"]
|
||||
if "senderPublicKey" in transaction:
|
||||
msg.sender_public_key = binascii.unhexlify(transaction["senderPublicKey"])
|
||||
if "requesterPublicKey" in transaction:
|
||||
msg.requester_public_key = binascii.unhexlify(transaction["requesterPublicKey"])
|
||||
if "signature" in transaction:
|
||||
msg.signature = binascii.unhexlify(transaction["signature"])
|
||||
|
||||
msg.asset = _asset_to_proto(transaction["asset"])
|
||||
transaction = dict_from_camelcase(transaction, renames=RENAMES)
|
||||
msg = dict_to_proto(proto.LiskTransactionCommon, transaction)
|
||||
return client.call(proto.LiskSignTx(address_n=n, transaction=msg))
|
||||
|
@ -398,3 +398,50 @@ def format_message(
|
||||
size=pb.ByteSize(),
|
||||
content=pformat_value(pb.__dict__, indent),
|
||||
)
|
||||
|
||||
|
||||
def value_to_proto(ftype, value):
|
||||
if issubclass(ftype, MessageType):
|
||||
raise TypeError("value_to_proto only converts simple values")
|
||||
|
||||
if ftype in (UVarintType, SVarintType):
|
||||
return int(value)
|
||||
|
||||
if ftype is BoolType:
|
||||
return bool(value)
|
||||
|
||||
if ftype is UnicodeType:
|
||||
return str(value)
|
||||
|
||||
if ftype is BytesType:
|
||||
if isinstance(value, str):
|
||||
return binascii.unhexlify(value)
|
||||
elif isinstance(value, bytes):
|
||||
return value
|
||||
else:
|
||||
raise TypeError("can't convert {} value to bytes".format(type(value)))
|
||||
|
||||
|
||||
def dict_to_proto(message_type, d):
|
||||
params = {}
|
||||
for fname, ftype, fflags in message_type.FIELDS.values():
|
||||
repeated = fflags & FLAG_REPEATED
|
||||
value = d.get(fname)
|
||||
if value is None:
|
||||
continue
|
||||
|
||||
if not repeated:
|
||||
value = [value]
|
||||
|
||||
if issubclass(ftype, MessageType):
|
||||
function = dict_to_proto
|
||||
else:
|
||||
function = value_to_proto
|
||||
|
||||
newvalue = [function(ftype, v) for v in value]
|
||||
|
||||
if not repeated:
|
||||
newvalue = newvalue[0]
|
||||
|
||||
params[fname] = newvalue
|
||||
return message_type(**params)
|
||||
|
@ -16,7 +16,8 @@
|
||||
|
||||
|
||||
from . import messages
|
||||
from .tools import expect
|
||||
from .tools import expect, dict_from_camelcase
|
||||
from .protobuf import dict_to_proto
|
||||
|
||||
REQUIRED_FIELDS = ("Fee", "Sequence", "TransactionType", "Amount", "Destination")
|
||||
|
||||
@ -40,16 +41,5 @@ def create_sign_tx_msg(transaction) -> messages.RippleSignTx:
|
||||
if transaction["TransactionType"] != "Payment":
|
||||
raise ValueError("Only Payment transaction type is supported")
|
||||
|
||||
return messages.RippleSignTx(
|
||||
fee=transaction.get("Fee"),
|
||||
sequence=transaction.get("Sequence"),
|
||||
flags=transaction.get("Flags"),
|
||||
last_ledger_sequence=transaction.get("LastLedgerSequence"),
|
||||
payment=_create_payment(transaction),
|
||||
)
|
||||
|
||||
|
||||
def _create_payment(transaction) -> messages.RipplePayment:
|
||||
return messages.RipplePayment(
|
||||
amount=transaction.get("Amount"), destination=transaction.get("Destination")
|
||||
)
|
||||
converted = dict_from_camelcase(transaction)
|
||||
return dict_to_proto(messages.RippleSignTx, converted)
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
import functools
|
||||
import hashlib
|
||||
import re
|
||||
import struct
|
||||
import unicodedata
|
||||
from typing import List, NewType
|
||||
@ -215,3 +216,37 @@ def session(f):
|
||||
client.transport.session_end()
|
||||
|
||||
return wrapped_f
|
||||
|
||||
|
||||
# de-camelcasifier
|
||||
# https://stackoverflow.com/a/1176023/222189
|
||||
|
||||
FIRST_CAP_RE = re.compile("(.)([A-Z][a-z]+)")
|
||||
ALL_CAP_RE = re.compile("([a-z0-9])([A-Z])")
|
||||
|
||||
|
||||
def from_camelcase(s):
|
||||
s = FIRST_CAP_RE.sub(r"\1_\2", s)
|
||||
return ALL_CAP_RE.sub(r"\1_\2", s).lower()
|
||||
|
||||
|
||||
def dict_from_camelcase(d, renames=None):
|
||||
if not isinstance(d, dict):
|
||||
return d
|
||||
|
||||
if renames is None:
|
||||
renames = {}
|
||||
|
||||
res = {}
|
||||
for key, value in d.items():
|
||||
newkey = from_camelcase(key)
|
||||
renamed_key = renames.get(newkey) or renames.get(key)
|
||||
if renamed_key:
|
||||
newkey = renamed_key
|
||||
|
||||
if isinstance(value, list):
|
||||
res[newkey] = [dict_from_camelcase(v, renames) for v in value]
|
||||
else:
|
||||
res[newkey] = dict_from_camelcase(value, renames)
|
||||
|
||||
return res
|
||||
|
Loading…
Reference in New Issue
Block a user