1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2024-11-26 09:28:13 +00:00

xmr: major protocol upgrade, CLSAG support added

- CLSAG signature scheme added
  - type hints added

xmr: optimize protocol, send only required data
  - real_out_additional_tx_keys contains only one element as nothing more is needed during signature
  - only src_entr.outputs[index] is HMACed and always present. Other outputs are present only if needed which reduces comm and CPU overhead.
  - getting rid of subaddresses dictionary (memory requirements), now subaddr indices are present per source entry so keys are computed when needed

xmr: prepare for permutation sending removal, specify index
  - specify source entry ordering index prior sorting by key images as original HMAC keys are generated based on these.
  - permutation checked just by valid HMACs, size of the set, key image sort order
  - sending permutation is now deprecated, will be removed in the following protocol versions
  - more strict state transition checks, guard strict check with respect to steps ordering
This commit is contained in:
Dusan Klinec 2020-04-09 17:16:48 +02:00 committed by Pavol Rusnak
parent 2ad4554d69
commit 6b8fc9c894
No known key found for this signature in database
GPG Key ID: 91F3B339B9A02A3D
32 changed files with 1388 additions and 736 deletions

View File

@ -77,14 +77,14 @@ using the data. Cold wallet creates `signed_txset`
### Cold wallet protocols
As cold wallet support is already present in Monero codebase, the protocols were well designed and analyzed.
We decided to reuse the cold wallet approach when signing the transaction as the trezor pretty much behaves as the cold wallet,
i.e., does not have access to the blockchain or full Monero node. The whole transaction is built in the trezor thus
We decided to reuse the cold wallet approach when signing the transaction as the Trezor pretty much behaves as the cold wallet,
i.e., does not have access to the blockchain or full Monero node. The whole transaction is built in the Trezor thus
the integration has security properties of the cold wallet (which is belevied to be secure). This integration approach
makes security analysis easier and enables to use existing codebase and protocols. This makes merging trezor support to
makes security analysis easier and enables to use existing codebase and protocols. This makes merging Trezor support to
the Monero codebase easier.
We believe that by choosing a bit more high-level approach in the protocol design we could easily add more advanced features,
trezor implements cold wallet protocols in this integration scheme.
Trezor implements cold wallet protocols in this integration scheme.
## Description
@ -113,12 +113,12 @@ Serialization is synchronous.
Transaction signing and Key Image (KI) sync are multi-step stateful protocols.
The protocol have several roundtrips.
In the signing protocol the connected host mainly serves as a dumb storage providing values to the trezor when needed,
mainly due to memory constrains on trezor. The offloaded data can be in plaintext. In this case data is HMACed with unique HMAC
In the signing protocol the connected host mainly serves as a dumb storage providing values to the Trezor when needed,
mainly due to memory constrains on Trezor. The offloaded data can be in plaintext. In this case data is HMACed with unique HMAC
key to avoid data tampering, reordering, replay, reuse, etc... Some data are offloaded as protected, encrypted and authenticated
with Chacha20Poly1305 with unique key (derived from the protocol step, message, purpose, counter, master secret).
trezor builds the signed Monero transaction incrementally, i.e., one UTXO per round trip, one transaction output per roundtrip.
Trezor builds the signed Monero transaction incrementally, i.e., one UTXO per round trip, one transaction output per roundtrip.
### Protocol workflow
@ -144,7 +144,7 @@ In the KI sync cold wallet protocol KIs are generated by the cold wallet. For ea
generated by the cold wallet (KI proof).
KI sync is mainly needed to recover from some problem or when using a new hot-wallet (corruption of a wallet file or
using trezor on a different host).
using Trezor on a different host).
The KI protocol has 3 steps.
@ -180,69 +180,74 @@ For detailed description and rationale please refer to the [monero-doc].
range proof details (type of the range proof, batching scheme).
After receiving this message:
- The trezor prompts user for verification of the destination addresses and amounts.
- Trezor prompts user for verification of the destination addresses and amounts.
- Commitments are computed thus later potential deviations from transaction destinations are detected and signing aborts.
- Secrets for HMACs / encryption are computed, TX key is computed.
- Precomputes required sub-addresses (init message indicates which sub-addresses are needed).
- Deprecated: Precomputes required sub-addresses (init message indicates which sub-addresses are needed).
### `MoneroTransactionSetInputRequest`
- Sends one UTXO to the trezor for processing, encoded as `MoneroTransactionSourceEntry`.
- Sends one UTXO to the Trezor for processing, encoded as `MoneroTransactionSourceEntry`.
- Contains construction data needed for signing the transaction, computing spending key for UTXO.
trezor computes spending keys, `TxinToKey`, `pseudo_out`, HMACs for offloaded data
Trezor computes spending keys, `TxinToKey`, `pseudo_out`, HMACs for offloaded data
### `MoneroTransactionInputsPermutationRequest`
### `MoneroTransactionInputsPermutationRequest` (Deprecated)
UTXOs have to be sorted by the key image in the valid blockchain transaction.
This message caries permutation on the key images so they are sorted in the desired way.
In Client version 3+ sending the permutation is deprecated. Original sort index is sent from the host
when needed (to verify HMACs built on the original ordering). Moreover, permutation correctness is checked by
the set size, HMAC validity and strict ordering on the key images.
### `MoneroTransactionInputViniRequest`
- Step needed to correctly hash all transaction inputs, in the right order (permutation computed in the previous step).
- Contains `MoneroTransactionSourceEntry` and `TxinToKey` computed in the previous step.
- trezor Computes `tx_prefix_hash` is part of the signed data.
- Trezor Computes `tx_prefix_hash` is part of the signed data.
### `MoneroTransactionAllInputsSetRequest`
- Sent after all inputs have been processed.
- Used in the range proof offloading to the host. E.g., in case of batched Bulletproofs with more than 2 transaction outputs.
The message response contains trezor-generated commitment masks so host can compute range proof correctly.
### `MoneroTransactionSetOutputRequest`
- Sends transaction output, `MoneroTransactionDestinationEntry`, one per message.
- HMAC prevents tampering with previously accepted data (in the init step).
- trezor computes data related to transaction output, e.g., range proofs, ECDH info for the receiver, output public key.
- Trezor computes data related to transaction output, e.g., range proofs, ECDH info for the receiver, output public key.
- In case offloaded range proof is used the request can carry computed range proof.
### `MoneroTransactionAllOutSetRequest`
Sent after all transaction outputs have been sent to the trezor for processing.
Sent after all transaction outputs have been sent to the Trezor for processing.
Request is empty, the response contains computed `extra` field (may contain additional public keys if sub-addresses are used),
computed `tx_prefix_hash` and basis for the final transaction signature `MoneroRingCtSig` (fee, transaction type).
### `MoneroTransactionMlsagDoneRequest`
Message sent to ask trezor to compute pre-MLSAG hash required for the signature.
Hash is computed incrementally by trezor since the init message and can be finalized in this step.
Message sent to ask Trezor to compute pre-MLSAG hash required for the signature.
Hash is computed incrementally by Trezor since the init message and can be finalized in this step.
Request is empty, response contains message hash, required for the signature.
### `MoneroTransactionSignInputRequest`
- Caries `MoneroTransactionSourceEntry`, similarly as previous messages `MoneroTransactionSetInputRequest`, `MoneroTransactionInputViniRequest`.
- Caries computed transaction inputs, pseudo outputs, HMACs, encrypted spending keys and alpha masks
- trezor generates MLSAG for this UTXO, returns the signature.
- Code returns also `cout` value if the multisig mode is active - not fully implemented, will be needed later when implementing multisigs.
- Trezor generates MLSAG for this UTXO, returns the signature.
- As output masks are deterministic, the pseudo output balancing is performed in this step (sum of input masks equal to the sum of output masks).
- Multisig is not supported.
### `MoneroTransactionFinalRequest`
- Sent when all UTXOs have been signed properly
- Finalizes transaction signature
- Returns encrypted transaction private keys which are needed later, e.g. for TX proof. As trezor cannot store aux data
for all signed transactions its offloaded encrypted to the wallet. Later when TX proof is implemented in the trezor it
- Returns encrypted transaction private keys which are needed later, e.g. for TX proof. As Trezor cannot store aux data
for all signed transactions its offloaded encrypted to the wallet. Later when TX proof is implemented in the Trezor it
will load encrypted TX keys, decrypt it and generate the proof.
- Since Client v3+ the final response contains opening encryption key to decrypt signatures generated in the previous step.
## Implementation notes
@ -280,29 +285,6 @@ normed to avoid complications when chaining operations such as `scalarmult`s.
### Range signatures
Borromean range signatures were optimized and ported to [trezor-crypto].
Range signatures xmr_gen_range_sig are CPU intensive and memory intensive operations which were originally implemented
in python (trezor-core) but it was not feasible to run on the Trezor device due to a small amount of RAM and long
computation times. It was needed to optimize the algorithm and port it to C so it is feasible to run it on the real hardware and run it fast.
Range signature is a well-contained problem with no allocations needed, simple API.
For memory and timing reasons its implemented directly in trezor-crypto (as it brings real benefit to the user).
On the other hand, MLASG and other ring signatures are built from building blocks in python for easier development,
code readability, maintenance and debugging. Porting to C is not that straightforward and I don't see any benefit here.
The memory and CPU is not the problem as in the case of range signatures so I think it is fine to have it in Python.
Porting to C would also increase complexity of trezor-crypto and could lead to bugs.
Using small and easily auditable & testable building blocks, such as ge25519_add (fast, in C) to build more complex
schemes in high level language is, in my opinion, a scalable and secure way to build the system.
Porting all Monero crypto schemes to C would be very time consuming and prone to errors.
Having access to low-level features also speeds up development of new features, such as multisigs.
MLSAG may need to be slightly changed when implementing multisigs
(some preparations have been made already but we will see after this phase starts).
Bulletproof generation and verification is implemented, however the device can handle maximum 2 batched outputs
in the bulletproof due to high memory requirements (more on that in [monero-doc]). If number of outputs is larger
than 2 the offloading to host is required. In such case, the bulletproofs are first computed at the host and sent to

View File

@ -9,7 +9,16 @@ from trezor.utils import chunks
from apps.common.confirm import require_confirm, require_hold_to_confirm
from apps.monero.layout import common
DUMMY_PAYMENT_ID = b"\x00" * 8
DUMMY_PAYMENT_ID = b"\x00\x00\x00\x00\x00\x00\x00\x00"
if False:
from typing import Optional
from apps.monero.signing.state import State
from trezor.messages.MoneroTransactionData import MoneroTransactionData
from trezor.messages.MoneroTransactionDestinationEntry import (
MoneroTransactionDestinationEntry,
)
async def require_confirm_watchkey(ctx):
@ -43,7 +52,9 @@ async def require_confirm_tx_key(ctx, export_key=False):
await require_confirm(ctx, content, ButtonRequestType.SignTx)
async def require_confirm_transaction(ctx, state, tsx_data, network_type):
async def require_confirm_transaction(
ctx, state: State, tsx_data: MoneroTransactionData, network_type: int
):
"""
Ask for confirmation from user.
"""
@ -77,7 +88,9 @@ async def require_confirm_transaction(ctx, state, tsx_data, network_type):
await transaction_step(state, 0)
async def _require_confirm_output(ctx, dst, network_type, payment_id):
async def _require_confirm_output(
ctx, dst: MoneroTransactionDestinationEntry, network_type: int, payment_id: bytes
):
"""
Single transaction destination confirmation
"""
@ -103,10 +116,10 @@ async def _require_confirm_output(ctx, dst, network_type, payment_id):
raise wire.ActionCancelled("Cancelled")
async def _require_confirm_payment_id(ctx, payment_id):
async def _require_confirm_payment_id(ctx, payment_id: bytes):
if not await common.naive_pagination(
ctx,
[ui.MONO] + list(chunks(hexlify((payment_id)), 16)),
[ui.MONO] + list(chunks(hexlify(payment_id), 16)),
"Payment ID",
ui.ICON_SEND,
ui.GREEN,
@ -171,22 +184,22 @@ class LiveRefreshStep(ui.Component):
)
async def transaction_step(state, step, sub_step=None):
async def transaction_step(state: State, step: int, sub_step: Optional[int] = None):
if step == 0:
info = ["Signing..."]
elif step == 100:
elif step == state.STEP_INP:
info = ["Processing inputs", "%d/%d" % (sub_step + 1, state.input_count)]
elif step == 200:
elif step == state.STEP_PERM:
info = ["Sorting..."]
elif step == 300:
elif step == state.STEP_VINI:
info = ["Hashing inputs", "%d/%d" % (sub_step + 1, state.input_count)]
elif step == 350:
elif step == state.STEP_ALL_IN:
info = ["Processing..."]
elif step == 400:
elif step == state.STEP_OUT:
info = ["Processing outputs", "%d/%d" % (sub_step + 1, state.output_count)]
elif step == 500:
elif step == state.STEP_ALL_OUT:
info = ["Postprocessing..."]
elif step == 600:
elif step == state.STEP_SIGN:
info = ["Signing inputs", "%d/%d" % (sub_step + 1, state.input_count)]
else:
info = ["Processing..."]

View File

@ -51,6 +51,7 @@ async def sign_tx_dispatch(state, msg, keychain):
(
MessageType.MoneroTransactionSetInputRequest,
MessageType.MoneroTransactionInputsPermutationRequest,
MessageType.MoneroTransactionInputViniRequest,
),
)
@ -67,7 +68,7 @@ async def sign_tx_dispatch(state, msg, keychain):
return (
await step_04_input_vini.input_vini(
state, msg.src_entr, msg.vini, msg.vini_hmac
state, msg.src_entr, msg.vini, msg.vini_hmac, msg.orig_idx
),
(
MessageType.MoneroTransactionInputViniRequest,
@ -121,6 +122,7 @@ async def sign_tx_dispatch(state, msg, keychain):
msg.pseudo_out_hmac,
msg.pseudo_out_alpha,
msg.spend_key,
msg.orig_idx,
),
(
MessageType.MoneroTransactionSignInputRequest,

View File

@ -15,42 +15,8 @@ class NotEnoughOutputsError(wire.DataError):
class RctType:
"""
There are two types of monero Ring Confidential Transactions:
1. RCTTypeFull = 1 (used if num_inputs == 1)
2. RCTTypeSimple = 2 (for num_inputs > 1)
There is actually also RCTTypeNull but we ignore that one.
There are several types of monero Ring Confidential Transactions
like RCTTypeFull and RCTTypeSimple but currently we use only Bulletproof2
"""
Full = 1
Simple = 2
class RsigType:
"""
Range signature types
There are four types of range proofs/signatures in official Monero:
1. RangeProofBorromean = 0
2. RangeProofBulletproof = 1
3. RangeProofMultiOutputBulletproof = 2
4. RangeProofPaddedBulletproof = 3
We simplify all the bulletproofs into one.
"""
Borromean = 0
Bulletproof = 1
def get_monero_rct_type(bp_version=1):
"""
Returns transaction RctType according to the BP version.
Only HP9+ is supported, thus only Simple variant is concerned.
"""
if bp_version == 1:
return 3 # TxRctType.Bulletproof
elif bp_version == 2:
return 4 # TxRctType.Bulletproof2
else:
raise ValueError("Unsupported BP version")
Bulletproof2 = 4

View File

@ -1,12 +1,23 @@
from micropython import const
from trezor import utils
from apps.monero.xmr import crypto
if False:
from apps.monero.xmr.types import Sc25519
from trezor.messages.MoneroTransactionSourceEntry import (
MoneroTransactionSourceEntry,
)
from trezor.messages.MoneroTransactionDestinationEntry import (
MoneroTransactionDestinationEntry,
)
BUILD_KEY_BUFFER = bytearray(32 + 12 + 4) # key + disc + index
_SECRET_LENGTH = const(32)
_DISCRIMINATOR_LENGTH = const(12)
_INDEX_LENGTH = const(4)
_BUILD_KEY_BUFFER = bytearray(_SECRET_LENGTH + _DISCRIMINATOR_LENGTH + _INDEX_LENGTH)
def _build_key(
@ -15,19 +26,19 @@ def _build_key(
"""
Creates an unique-purpose key
"""
key_buff = BUILD_KEY_BUFFER # bytearray(32 + 12 + 4) # key + disc + index
utils.ensure(len(secret) == 32, "Invalid key length")
utils.ensure(len(discriminator) <= 12, "Disc too long")
offset = 32
utils.memcpy(key_buff, 0, secret, 0, 32)
key_buff = _BUILD_KEY_BUFFER
utils.ensure(len(secret) == _SECRET_LENGTH, "Invalid key length")
utils.ensure(len(discriminator) <= _DISCRIMINATOR_LENGTH, "Disc too long")
for i in range(32, len(key_buff)):
offset = _SECRET_LENGTH
utils.memcpy(key_buff, 0, secret, 0, _SECRET_LENGTH)
for i in range(_SECRET_LENGTH, len(key_buff)):
key_buff[i] = 0
if discriminator is not None:
utils.memcpy(key_buff, offset, discriminator, 0, len(discriminator))
offset += len(discriminator)
utils.memcpy(key_buff, offset, discriminator, 0, len(discriminator))
offset += _DISCRIMINATOR_LENGTH # fixed domain separator size
if index is not None:
# dump_uvarint_b_into, saving import
@ -97,6 +108,13 @@ def enc_key_cout(key_enc, idx: int = None) -> bytes:
return _build_key(key_enc, b"cout", idx)
def key_signature(master, idx: int, is_iv=False) -> bytes:
"""
Generates signature offloading related offloading keys
"""
return _build_key(master, b"sig-iv" if is_iv else b"sig-key", idx)
def det_comm_masks(key_enc, idx: int) -> Sc25519:
"""
Deterministic output commitment masks
@ -104,15 +122,33 @@ def det_comm_masks(key_enc, idx: int) -> Sc25519:
return crypto.decodeint(_build_key(key_enc, b"out-mask", idx))
async def gen_hmac_vini(key, src_entr, vini_bin, idx: int) -> bytes:
async def gen_hmac_vini(
key, src_entr: MoneroTransactionSourceEntry, vini_bin: bytes, idx: int
) -> bytes:
"""
Computes hmac (TxSourceEntry[i] || tx.vin[i])
In src_entr.outputs only src_entr.outputs[src_entr.real_output]
is HMACed as it is used across the protocol. Consistency of
other values across the protocol is not required as they are
used only once and hard to check. I.e., indices in step 2
are uncheckable, decoy keys in step 9 are just random keys.
"""
import protobuf
from apps.monero.xmr.keccak_hasher import get_keccak_writer
kwriter = get_keccak_writer()
real_outputs = src_entr.outputs
real_additional = src_entr.real_out_additional_tx_keys
src_entr.outputs = [src_entr.outputs[src_entr.real_output]]
if real_additional and len(real_additional) > 1:
src_entr.real_out_additional_tx_keys = [
src_entr.real_out_additional_tx_keys[src_entr.real_output_in_tx_index]
]
await protobuf.dump_message(kwriter, src_entr)
src_entr.outputs = real_outputs
src_entr.real_out_additional_tx_keys = real_additional
kwriter.write(vini_bin)
hmac_key_vini = hmac_key_txin(key, idx)
@ -120,7 +156,9 @@ async def gen_hmac_vini(key, src_entr, vini_bin, idx: int) -> bytes:
return hmac_vini
async def gen_hmac_vouti(key, dst_entr, tx_out_bin, idx: int) -> bytes:
async def gen_hmac_vouti(
key, dst_entr: MoneroTransactionDestinationEntry, tx_out_bin: bytes, idx: int
) -> bytes:
"""
Generates HMAC for (TxDestinationEntry[i] || tx.vout[i])
"""
@ -136,7 +174,9 @@ async def gen_hmac_vouti(key, dst_entr, tx_out_bin, idx: int) -> bytes:
return hmac_vouti
async def gen_hmac_tsxdest(key, dst_entr, idx: int) -> bytes:
async def gen_hmac_tsxdest(
key, dst_entr: MoneroTransactionDestinationEntry, idx: int
) -> bytes:
"""
Generates HMAC for TxDestinationEntry[i]
"""
@ -149,3 +189,11 @@ async def gen_hmac_tsxdest(key, dst_entr, idx: int) -> bytes:
hmac_key = hmac_key_txdst(key, idx)
hmac_tsxdest = crypto.compute_hmac(hmac_key, kwriter.get_digest())
return hmac_tsxdest
def get_ki_from_vini(vini_bin: bytes) -> bytes:
"""
Returns key image from the TxinToKey, which is currently
serialized as the last 32 bytes.
"""
return bytes(vini_bin[-32:])

View File

@ -5,17 +5,17 @@ from trezor import log
from apps.monero.xmr import crypto
if False:
from typing import Dict, List, Optional, Tuple
from apps.monero.xmr.types import Ge25519, Sc25519
from apps.monero.xmr.credentials import AccountCreds
class TprefixStub:
__slots__ = ("version", "unlock_time", "vin", "vout", "extra")
def __init__(self, **kwargs):
for kw in kwargs:
setattr(self, kw, kwargs[kw])
Subaddresses = Dict[bytes, Tuple[int, int]]
class State:
STEP_INIT = const(0)
STEP_INP = const(100)
STEP_PERM = const(200)
STEP_VINI = const(300)
@ -37,11 +37,11 @@ class State:
- spend private/public key
- and its corresponding address
"""
self.creds = None
self.creds = None # type: Optional[AccountCreds]
# HMAC/encryption keys used to protect offloaded data
self.key_hmac = None
self.key_enc = None
self.key_hmac = None # type: Optional[bytes]
self.key_enc = None # type: Optional[bytes]
"""
Transaction keys
@ -51,8 +51,8 @@ class State:
- for subaddresses the `r` is commonly denoted as `s`, however it is still just a random number
- the keys are used to derive the one time address and its keys (P = H(A*r)*G + B)
"""
self.tx_priv = None
self.tx_pub = None
self.tx_priv = None # type: Sc25519
self.tx_pub = None # type: Ge25519
"""
In some cases when subaddresses are used we need more tx_keys
@ -62,9 +62,7 @@ class State:
# Connected client version
self.client_version = 0
# Bulletproof version. Pre for <=HF9 is 1, for >HP10 is 2
self.bp_version = 1
self.hard_fork = 12
self.input_count = 0
self.output_count = 0
@ -78,8 +76,8 @@ class State:
self.account_idx = 0
# contains additional tx keys if need_additional_tx_keys is True
self.additional_tx_private_keys = []
self.additional_tx_public_keys = []
self.additional_tx_private_keys = [] # type: List[Sc25519]
self.additional_tx_public_keys = [] # type: List[bytes]
# currently processed input/output index
self.current_input_index = -1
@ -93,34 +91,39 @@ class State:
self.summary_outs_money = 0
# output commitments
self.output_pk_commitments = []
self.output_pk_commitments = [] # type: List[bytes]
self.output_amounts = []
self.output_amounts = [] # type: List[int]
# output *range proof* masks. HP10+ makes them deterministic.
self.output_masks = []
# last output mask for client_version=0
self.output_last_mask = None
self.output_masks = [] # type: List[Sc25519]
# the range proofs are calculated in batches, this denotes the grouping
self.rsig_grouping = []
self.rsig_grouping = [] # type: List[int]
# is range proof computing offloaded or not
self.rsig_offload = False
# sum of all inputs' pseudo out masks
self.sumpouts_alphas = crypto.sc_0()
self.sumpouts_alphas = crypto.sc_0() # type: Sc25519
# sum of all output' pseudo out masks
self.sumout = crypto.sc_0()
self.sumout = crypto.sc_0() # type: Sc25519
self.subaddresses = {}
self.subaddresses = {} # type: Subaddresses
# simple stub containing items hashed into tx prefix
self.tx = TprefixStub(vin=[], vout=[], extra=b"")
# TX_EXTRA_NONCE extra field for tx.extra, due to sort_tx_extra()
self.extra_nonce = None
# contains an array where each item denotes the input's position
# (inputs are sorted by key images)
self.source_permutation = []
self.source_permutation = [] # type: List[int]
# Last key image seen. Used for input permutation correctness check
self.last_ki = None # type: Optional[bytes]
# Encryption key to release to host after protocol ends without error
self.opening_key = None # type: Optional[bytes]
# Step transition automaton
self.last_step = self.STEP_INIT
"""
Tx prefix hasher/hash. We use the hasher to incrementally hash and then
@ -128,7 +131,7 @@ class State:
See Monero-Trezor documentation section 3.3 for more details.
"""
self.tx_prefix_hasher = KeccakXmrArchive()
self.tx_prefix_hash = None
self.tx_prefix_hash = None # type: Optional[bytes]
"""
Full message hasher/hash that is to be signed using MLSAG.
@ -136,7 +139,7 @@ class State:
See Monero-Trezor documentation section 3.3 for more details.
"""
self.full_message_hasher = PreMlsagHasher()
self.full_message = None
self.full_message = None # type: Optional[bytes]
def mem_trace(self, x=None, collect=False):
if __debug__:
@ -152,9 +155,3 @@ class State:
def change_address(self):
return self.output_change.addr if self.output_change else None
def is_bulletproof_v2(self):
return self.bp_version >= 2
def is_det_mask(self):
return self.bp_version >= 2 or self.client_version > 0

View File

@ -10,8 +10,15 @@ from apps.monero.signing.state import State
from apps.monero.xmr import crypto, monero
if False:
from typing import List
from apps.monero.xmr.types import Sc25519, Ge25519
from trezor.messages.MoneroTransactionData import MoneroTransactionData
from trezor.messages.MoneroTransactionRsigData import MoneroTransactionRsigData
from trezor.messages.MoneroAccountPublicAddress import MoneroAccountPublicAddress
from trezor.messages.MoneroTransactionDestinationEntry import (
MoneroTransactionDestinationEntry,
)
from trezor.messages.MoneroTransactionInitAck import MoneroTransactionInitAck
async def init_transaction(
@ -20,7 +27,7 @@ async def init_transaction(
network_type: int,
tsx_data: MoneroTransactionData,
keychain,
):
) -> MoneroTransactionInitAck:
from apps.monero.signing import offloading_keys
from apps.common import paths
@ -30,10 +37,12 @@ async def init_transaction(
state.creds = misc.get_creds(keychain, address_n, network_type)
state.client_version = tsx_data.client_version or 0
if state.client_version == 0:
raise ValueError("Client version not supported")
state.fee = state.fee if state.fee > 0 else 0
state.tx_priv = crypto.random_scalar()
state.tx_pub = crypto.scalarmult_base(state.tx_priv)
state.mem_trace(1)
state.input_count = tsx_data.num_inputs
@ -45,6 +54,8 @@ async def init_transaction(
await confirms.require_confirm_transaction(
state.ctx, state, tsx_data, state.creds.network_type
)
state.creds.address = None
state.creds.network_type = None
gc.collect()
state.mem_trace(3)
@ -53,6 +64,9 @@ async def init_transaction(
state.mixin = tsx_data.mixin
state.fee = tsx_data.fee
state.account_idx = tsx_data.account
state.last_step = state.STEP_INIT
if tsx_data.hard_fork:
state.hard_fork = tsx_data.hard_fork
# Ensure change is correct
_check_change(state, tsx_data.outputs)
@ -66,23 +80,19 @@ async def init_transaction(
_check_subaddresses(state, tsx_data.outputs)
# Extra processing, payment id
state.tx.version = 2 # current Monero transaction format (RingCT = 2)
state.tx.unlock_time = tsx_data.unlock_time
_process_payment_id(state, tsx_data)
await _compute_sec_keys(state, tsx_data)
gc.collect()
# Iterative tx_prefix_hash hash computation
state.tx_prefix_hasher.uvarint(state.tx.version)
state.tx_prefix_hasher.uvarint(state.tx.unlock_time)
state.tx_prefix_hasher.uvarint(2) # current Monero transaction format (RingCT = 2)
state.tx_prefix_hasher.uvarint(tsx_data.unlock_time)
state.tx_prefix_hasher.uvarint(state.input_count) # ContainerType, size
state.mem_trace(10, True)
# Final message hasher
state.full_message_hasher.init()
state.full_message_hasher.set_type_fee(
signing.get_monero_rct_type(state.bp_version), state.fee
)
state.full_message_hasher.set_type_fee(signing.RctType.Bulletproof2, state.fee)
# Sub address precomputation
if tsx_data.account is not None and tsx_data.minor_indices:
@ -110,7 +120,7 @@ async def init_transaction(
return MoneroTransactionInitAck(hmacs=hmacs, rsig_data=rsig_data)
def _check_subaddresses(state: State, outputs: list):
def _check_subaddresses(state: State, outputs: List[MoneroTransactionDestinationEntry]):
"""
Using subaddresses leads to a few poorly documented exceptions.
@ -155,7 +165,7 @@ def _check_subaddresses(state: State, outputs: list):
state.mem_trace(4, True)
def _get_primary_change_address(state: State):
def _get_primary_change_address(state: State) -> MoneroAccountPublicAddress:
"""
Computes primary change address for the current account index
"""
@ -189,12 +199,7 @@ def _check_rsig_data(state: State, rsig_data: MoneroTransactionRsigData):
if rsig_data.rsig_type == 0:
raise ValueError("Borromean range sig not supported")
elif rsig_data.rsig_type in (1, 2, 3):
state.bp_version = rsig_data.bp_version or 1
if state.bp_version not in (1, 2):
raise ValueError("Unknown BP version")
else:
elif rsig_data.rsig_type not in (1, 2, 3):
raise ValueError("Unknown rsig type")
if state.output_count > 2:
@ -214,7 +219,7 @@ def _check_grouping(state: State):
raise ValueError("Invalid grouping")
def _check_change(state: State, outputs: list):
def _check_change(state: State, outputs: List[MoneroTransactionDestinationEntry]):
"""
Check if the change address in state.output_change (from `tsx_data.outputs`) is
a) among tx outputs
@ -281,7 +286,7 @@ async def _compute_sec_keys(state: State, tsx_data: MoneroTransactionData):
state.key_enc = crypto.keccak_2hash(b"enc" + master_key)
def _precompute_subaddr(state: State, account: int, indices):
def _precompute_subaddr(state: State, account: int, indices: List[int]):
"""
Precomputes subaddresses for account (major) and list of indices (minors)
Subaddresses have to be stored in encoded form - unique representation.
@ -350,7 +355,7 @@ def _get_key_for_payment_id_encryption(
tsx_data: MoneroTransactionData,
change_addr=None,
add_dummy_payment_id: bool = False,
):
) -> bytes:
"""
Returns destination address public view key to be used for
payment id encryption. If no encrypted payment ID is chosen,
@ -390,7 +395,9 @@ def _get_key_for_payment_id_encryption(
return addr.view_public_key
def _encrypt_payment_id(payment_id, public_key, secret_key):
def _encrypt_payment_id(
payment_id: bytes, public_key: Ge25519, secret_key: Sc25519
) -> bytes:
"""
Encrypts payment_id hex.
Used in the transaction extra. Only recipient is able to decrypt.

View File

@ -17,12 +17,19 @@ from apps.monero.layout import confirms
from apps.monero.xmr import crypto, monero, serialize
if False:
from typing import List, Tuple, Optional
from apps.monero.xmr.types import Sc25519, Ge25519
from trezor.messages.MoneroTransactionSourceEntry import (
MoneroTransactionSourceEntry,
)
from trezor.messages.MoneroTransactionSetInputAck import (
MoneroTransactionSetInputAck,
)
async def set_input(state: State, src_entr: MoneroTransactionSourceEntry):
async def set_input(
state: State, src_entr: MoneroTransactionSourceEntry
) -> MoneroTransactionSetInputAck:
from trezor.messages.MoneroTransactionSetInputAck import (
MoneroTransactionSetInputAck,
)
@ -34,6 +41,8 @@ async def set_input(state: State, src_entr: MoneroTransactionSourceEntry):
await confirms.transaction_step(state, state.STEP_INP, state.current_input_index)
if state.last_step > state.STEP_INP:
raise ValueError("Invalid state transition")
if state.current_input_index >= state.input_count:
raise ValueError("Too many inputs")
# real_output denotes which output in outputs is the real one (ours)
@ -49,9 +58,7 @@ async def set_input(state: State, src_entr: MoneroTransactionSourceEntry):
out_key = crypto.decodepoint(src_entr.outputs[src_entr.real_output].key.dest)
# the tx_pub of our UTXO stored inside its transaction
tx_key = crypto.decodepoint(src_entr.real_out_tx_key)
additional_keys = [
crypto.decodepoint(x) for x in src_entr.real_out_additional_tx_keys
]
additional_tx_pub_key = _get_additional_public_key(src_entr)
"""
Calculates `derivation = Ra`, private spend key `x = H(Ra||i) + b` to be able
@ -62,8 +69,10 @@ async def set_input(state: State, src_entr: MoneroTransactionSourceEntry):
state.subaddresses,
out_key,
tx_key,
additional_keys,
additional_tx_pub_key,
src_entr.real_output_in_tx_index,
state.account_idx,
src_entr.subaddr_minor,
)
state.mem_trace(1, True)
@ -111,6 +120,7 @@ async def set_input(state: State, src_entr: MoneroTransactionSourceEntry):
crypto.encodeint(xi),
)
state.last_step = state.STEP_INP
if state.current_input_index + 1 == state.input_count:
"""
When we finish the inputs processing, we no longer need
@ -129,7 +139,7 @@ async def set_input(state: State, src_entr: MoneroTransactionSourceEntry):
)
def _gen_commitment(state: State, in_amount):
def _gen_commitment(state: State, in_amount: int) -> Tuple[Sc25519, Ge25519]:
"""
Computes Pedersen commitment - pseudo outs
Here is slight deviation from the original protocol.
@ -145,7 +155,7 @@ def _gen_commitment(state: State, in_amount):
return alpha, crypto.gen_commitment(alpha, in_amount)
def _absolute_output_offsets_to_relative(off):
def _absolute_output_offsets_to_relative(off: List[int]) -> List[int]:
"""
Mixin outputs are specified in relative numbers. First index is absolute
and the rest is an offset of a previous one.
@ -159,3 +169,22 @@ def _absolute_output_offsets_to_relative(off):
for i in range(len(off) - 1, 0, -1):
off[i] -= off[i - 1]
return off
def _get_additional_public_key(
src_entr: MoneroTransactionSourceEntry,
) -> Optional[Ge25519]:
additional_tx_pub_key = None
if len(src_entr.real_out_additional_tx_keys) == 1: # compression
additional_tx_pub_key = crypto.decodepoint(
src_entr.real_out_additional_tx_keys[0]
)
elif src_entr.real_out_additional_tx_keys:
if src_entr.real_output_in_tx_index >= len(
src_entr.real_out_additional_tx_keys
):
raise ValueError("Wrong number of additional derivations")
additional_tx_pub_key = crypto.decodepoint(
src_entr.real_out_additional_tx_keys[src_entr.real_output_in_tx_index]
)
return additional_tx_pub_key

View File

@ -9,14 +9,27 @@ input's position in the transaction.
We do not do the actual sorting here (we do not store the complete input
data anyway, so we can't) we just save the array to the state and use
it later when needed.
New protocol version (CL3) does not store the permutation. The permutation
correctness is checked by checking the number of elements,
HMAC correctness (host sends original sort idx) and ordering check
on the key images. This step is skipped.
"""
from .state import State
from apps.monero.layout.confirms import transaction_step
if False:
from typing import List
from trezor.messages.MoneroTransactionInputsPermutationAck import (
MoneroTransactionInputsPermutationAck,
)
async def tsx_inputs_permutation(state: State, permutation: list):
async def tsx_inputs_permutation(
state: State, permutation: List[int]
) -> MoneroTransactionInputsPermutationAck:
from trezor.messages.MoneroTransactionInputsPermutationAck import (
MoneroTransactionInputsPermutationAck,
)
@ -26,17 +39,22 @@ async def tsx_inputs_permutation(state: State, permutation: list):
"""
Set permutation on the inputs - sorted by key image on host.
"""
if state.last_step != state.STEP_INP:
raise ValueError("Invalid state transition")
if len(permutation) != state.input_count:
raise ValueError("Invalid permutation size")
if state.current_input_index != state.input_count - 1:
raise ValueError("Invalid input count")
_check_permutation(permutation)
state.source_permutation = permutation
state.current_input_index = -1
state.last_step = state.STEP_PERM
return MoneroTransactionInputsPermutationAck()
def _check_permutation(permutation):
def _check_permutation(permutation: List[int]):
for n in range(len(permutation)):
if n not in permutation:
raise ValueError("Invalid permutation")

View File

@ -1,13 +1,6 @@
"""
This step serves for an incremental hashing of tx.vin[i] to the tx_prefix_hasher
after the sorting on tx.vin[i].ki. The sorting order was received in the previous step.
Originally, this step also incrementaly hashed pseudo_output[i] to the full_message_hasher for
RctSimple transactions with Borromean proofs (HF8).
In later hard-forks, the pseudo_outputs were moved to the rctsig.prunable
which is not hashed to the final signature, thus pseudo_output hashing has been removed
(as we support only HF9 and HF10 now).
"""
from .state import State
@ -20,6 +13,9 @@ if False:
from trezor.messages.MoneroTransactionSourceEntry import (
MoneroTransactionSourceEntry,
)
from trezor.messages.MoneroTransactionInputViniAck import (
MoneroTransactionInputViniAck,
)
async def input_vini(
@ -27,7 +23,8 @@ async def input_vini(
src_entr: MoneroTransactionSourceEntry,
vini_bin: bytes,
vini_hmac: bytes,
):
orig_idx: int,
) -> MoneroTransactionInputViniAck:
from trezor.messages.MoneroTransactionInputViniAck import (
MoneroTransactionInputViniAck,
)
@ -35,9 +32,15 @@ async def input_vini(
await confirms.transaction_step(
state, state.STEP_VINI, state.current_input_index + 1
)
if state.last_step not in (state.STEP_INP, state.STEP_PERM, state.STEP_VINI):
raise ValueError("Invalid state transition")
if state.current_input_index >= state.input_count:
raise ValueError("Too many inputs")
if state.client_version >= 2 and state.last_step < state.STEP_VINI:
state.current_input_index = -1
state.last_ki = None
state.current_input_index += 1
# HMAC(T_in,i || vin_i)
@ -45,13 +48,22 @@ async def input_vini(
state.key_hmac,
src_entr,
vini_bin,
state.source_permutation[state.current_input_index],
state.source_permutation[state.current_input_index]
if state.client_version <= 1
else orig_idx,
)
if not crypto.ct_equals(hmac_vini_comp, vini_hmac):
raise ValueError("HMAC is not correct")
# Key image sorting check - permutation correctness
cur_ki = offloading_keys.get_ki_from_vini(vini_bin)
if state.current_input_index > 0 and state.last_ki <= cur_ki:
raise ValueError("Key image order invalid")
"""
Incremental hasing of tx.vin[i]
"""
state.tx_prefix_hasher.buffer(vini_bin)
state.last_step = state.STEP_VINI
state.last_ki = cur_ki if state.current_input_index < state.input_count else None
return MoneroTransactionInputViniAck()

View File

@ -8,8 +8,13 @@ from .state import State
from apps.monero.layout import confirms
from apps.monero.xmr import crypto
if False:
from trezor.messages.MoneroTransactionAllInputsSetAck import (
MoneroTransactionAllInputsSetAck,
)
async def all_inputs_set(state: State):
async def all_inputs_set(state: State) -> MoneroTransactionAllInputsSetAck:
state.mem_trace(0)
await confirms.transaction_step(state, state.STEP_ALL_IN)
@ -18,52 +23,13 @@ async def all_inputs_set(state: State):
MoneroTransactionAllInputsSetAck,
)
# Generate random commitment masks to be used in range proofs.
# If SimpleRCT is used the sum of the masks must match the input masks sum.
if state.last_step != state.STEP_VINI:
raise ValueError("Invalid state transition")
if state.current_input_index != state.input_count - 1:
raise ValueError("Invalid input count")
# The sum of the masks must match the input masks sum.
state.sumout = crypto.sc_init(0)
rsig_data = None
# Client 0, HF9. Non-deterministic masks
if not state.is_det_mask():
rsig_data = await _compute_masks(state)
resp = MoneroTransactionAllInputsSetAck(rsig_data=rsig_data)
state.last_step = state.STEP_ALL_IN
resp = MoneroTransactionAllInputsSetAck()
return resp
async def _compute_masks(state: State):
"""
Output masks computed in advance. Used with client_version=0 && HF9.
After HF10 (included) masks are deterministic, computed from the amount_key.
After all client update to v1 this code will be removed.
In order to preserve client_version=0 compatibility the masks have to be adjusted.
"""
from trezor.messages.MoneroTransactionRsigData import MoneroTransactionRsigData
from apps.monero.signing import offloading_keys
rsig_data = MoneroTransactionRsigData()
# If range proofs are being offloaded, we send the masks to the host, which uses them
# to create the range proof. If not, we do not send any and we use them in the following step.
if state.rsig_offload:
rsig_data.mask = []
# Deterministic masks, the last one is computed to balance the sums
for i in range(state.output_count):
if i + 1 == state.output_count:
cur_mask = crypto.sc_sub(state.sumpouts_alphas, state.sumout)
state.output_last_mask = cur_mask
else:
cur_mask = offloading_keys.det_comm_masks(state.key_enc, i)
crypto.sc_add_into(state.sumout, state.sumout, cur_mask)
if state.rsig_offload:
rsig_data.mask.append(crypto.encodeint(cur_mask))
if not crypto.sc_eq(state.sumpouts_alphas, state.sumout):
raise ValueError("Sum eq error")
state.sumout = crypto.sc_init(0)
return rsig_data

View File

@ -13,10 +13,27 @@ from apps.monero.layout import confirms
from apps.monero.signing import offloading_keys
from apps.monero.xmr import crypto, serialize
if False:
from typing import Tuple
from apps.monero.xmr.types import Sc25519, Ge25519
from apps.monero.xmr.serialize_messages.tx_ecdh import EcdhTuple
from apps.monero.xmr.serialize_messages.tx_rsig_bulletproof import Bulletproof
from trezor.messages.MoneroTransactionDestinationEntry import (
MoneroTransactionDestinationEntry,
)
from trezor.messages.MoneroTransactionSetOutputAck import (
MoneroTransactionSetOutputAck,
)
from trezor.messages.MoneroTransactionRsigData import MoneroTransactionRsigData
async def set_output(
state: State, dst_entr, dst_entr_hmac, rsig_data, is_offloaded_bp=False
):
state: State,
dst_entr: MoneroTransactionDestinationEntry,
dst_entr_hmac: bytes,
rsig_data: MoneroTransactionRsigData,
is_offloaded_bp=False,
) -> MoneroTransactionSetOutputAck:
state.mem_trace(0, True)
mods = utils.unimport_begin()
@ -84,6 +101,7 @@ async def set_output(
# output_pk_commitment is stored to the state as it is used during the signature and hashed to the
# RctSigBase later. No need to store amount, it was already stored.
state.output_pk_commitments.append(out_pk_commitment)
state.last_step = state.STEP_OUT
state.mem_trace(14, True)
from trezor.messages.MoneroTransactionSetOutputAck import (
@ -103,14 +121,18 @@ async def set_output(
)
async def _validate(state: State, dst_entr, dst_entr_hmac, is_offloaded_bp):
# If offloading flag then it has to be det_masks and offloading enabled.
# Using IF as it is easier to read.
if is_offloaded_bp and (not state.rsig_offload or not state.is_det_mask()):
async def _validate(
state: State,
dst_entr: MoneroTransactionDestinationEntry,
dst_entr_hmac: bytes,
is_offloaded_bp: bool,
) -> MoneroTransactionDestinationEntry:
if state.last_step not in (state.STEP_ALL_IN, state.STEP_OUT):
raise ValueError("Invalid state transition")
if is_offloaded_bp and (not state.rsig_offload):
raise ValueError("Extraneous offloaded msg")
# State change according to the det-mask BP offloading.
if state.is_det_mask() and state.rsig_offload:
if state.rsig_offload:
bidx = _get_rsig_batch(state, state.current_output_index)
last_in_batch = _is_last_in_batch(state, state.current_output_index, bidx)
@ -132,10 +154,6 @@ async def _validate(state: State, dst_entr, dst_entr_hmac, is_offloaded_bp):
utils.ensure(
state.current_output_index < state.output_count, "Invalid output index"
)
utils.ensure(
state.is_det_mask() or not state.is_processing_offloaded,
"Offloaded extra msg while not using det masks",
)
if not state.is_processing_offloaded:
# HMAC check of the destination
@ -157,7 +175,9 @@ async def _validate(state: State, dst_entr, dst_entr_hmac, is_offloaded_bp):
return dst_entr
def _compute_tx_keys(state: State, dst_entr):
def _compute_tx_keys(
state: State, dst_entr: MoneroTransactionDestinationEntry
) -> Tuple[Ge25519, Sc25519]:
"""Computes tx_out_key, amount_key"""
if state.is_processing_offloaded:
@ -177,24 +197,16 @@ def _compute_tx_keys(state: State, dst_entr):
)
del (derivation, additional_txkey_priv)
# Computes the newest mask if applicable
if state.is_det_mask():
from apps.monero.xmr import monero
mask = monero.commitment_mask(crypto.encodeint(amount_key))
elif state.current_output_index + 1 < state.output_count:
mask = offloading_keys.det_comm_masks(state.key_enc, state.current_output_index)
else:
mask = state.output_last_mask
state.output_last_mask = None
from apps.monero.xmr import monero
mask = monero.commitment_mask(crypto.encodeint(amount_key))
state.output_masks.append(mask)
return tx_out_key, amount_key
async def _set_out_tx_out(state: State, dst_entr, tx_out_key):
async def _set_out_tx_out(
state: State, dst_entr: MoneroTransactionDestinationEntry, tx_out_key: Ge25519
) -> Tuple[bytes, bytes]:
"""
Manually serializes TxOut(0, TxoutToKey(key)) and calculates hmac.
"""
@ -216,7 +228,9 @@ async def _set_out_tx_out(state: State, dst_entr, tx_out_key):
return tx_out_bin, hmac_vouti
def _range_proof(state, rsig_data):
def _range_proof(
state: State, rsig_data: MoneroTransactionRsigData
) -> Tuple[MoneroTransactionRsigData, Sc25519]:
"""
Computes rangeproof and handles range proof offloading logic.
@ -239,13 +253,13 @@ def _range_proof(state, rsig_data):
state.rsig_offload
and last_in_batch
and not provided_rsig
and (not state.is_det_mask() or state.is_processing_offloaded)
and state.is_processing_offloaded
):
raise signing.Error("Rsig expected, not provided")
# Batch not finished, skip range sig generation now
mask = state.output_masks[-1] if not state.is_processing_offloaded else None
offload_mask = mask and state.is_det_mask() and state.rsig_offload
offload_mask = mask and state.rsig_offload
# If not last, do not proceed to the BP processing.
if not last_in_batch:
@ -263,16 +277,12 @@ def _range_proof(state, rsig_data):
"""Bulletproof calculation in Trezor"""
rsig = _rsig_bp(state)
elif state.is_det_mask() and not state.is_processing_offloaded:
elif not state.is_processing_offloaded:
"""Bulletproof offloaded to the host, deterministic masks. Nothing here, waiting for offloaded BP."""
pass
elif state.is_det_mask() and state.is_processing_offloaded:
"""Bulletproof offloaded to the host, check BP, hash it."""
_rsig_process_bp(state, rsig_data)
else:
"""Bulletproof calculated on host, verify in Trezor"""
"""Bulletproof offloaded to the host, check BP, hash it."""
_rsig_process_bp(state, rsig_data)
state.mem_trace("rproof" if __debug__ else None, collect=True)
@ -292,7 +302,7 @@ def _range_proof(state, rsig_data):
return rsig_data_new, mask
def _rsig_bp(state: State):
def _rsig_bp(state: State) -> bytes:
"""Bulletproof calculation in trezor"""
from apps.monero.xmr import range_signatures
@ -305,7 +315,7 @@ def _rsig_bp(state: State):
# BP is hashed with raw=False as hash does not contain L, R
# array sizes compared to the serialized bulletproof format
# thus direct serialization cannot be used.
state.full_message_hasher.rsig_val(rsig, True, raw=False)
state.full_message_hasher.rsig_val(rsig, raw=False)
state.mem_trace("post-bp-hash" if __debug__ else None, collect=True)
rsig = _dump_rsig_bp(rsig)
@ -319,7 +329,7 @@ def _rsig_bp(state: State):
return rsig
def _rsig_process_bp(state: State, rsig_data):
def _rsig_process_bp(state: State, rsig_data: MoneroTransactionRsigData):
from apps.monero.xmr import range_signatures
from apps.monero.xmr.serialize_messages.tx_rsig_bulletproof import Bulletproof
@ -329,7 +339,7 @@ def _rsig_process_bp(state: State, rsig_data):
# BP is hashed with raw=False as hash does not contain L, R
# array sizes compared to the serialized bulletproof format
# thus direct serialization cannot be used.
state.full_message_hasher.rsig_val(bp_obj, True, raw=False)
state.full_message_hasher.rsig_val(bp_obj, raw=False)
res = range_signatures.verify_bp(bp_obj, state.output_amounts, state.output_masks)
utils.ensure(res, "BP verification fail")
state.mem_trace("BP verified" if __debug__ else None, collect=True)
@ -340,7 +350,7 @@ def _rsig_process_bp(state: State, rsig_data):
state.output_masks = []
def _dump_rsig_bp(rsig):
def _dump_rsig_bp(rsig: Bulletproof) -> bytes:
if len(rsig.L) > 127:
raise ValueError("Too large")
@ -382,7 +392,9 @@ def _dump_rsig_bp(rsig):
return buff
def _return_rsig_data(rsig=None, mask=None):
def _return_rsig_data(
rsig: bytes = None, mask: bytes = None
) -> MoneroTransactionRsigData:
if rsig is None and mask is None:
return None
@ -399,7 +411,9 @@ def _return_rsig_data(rsig=None, mask=None):
return rsig_data
def _get_ecdh_info_and_out_pk(state: State, tx_out_key, amount, mask, amount_key):
def _get_ecdh_info_and_out_pk(
state: State, tx_out_key: Ge25519, amount: int, mask: Sc25519, amount_key: Sc25519
) -> Tuple[bytes, bytes, bytes]:
"""
Calculates the Pedersen commitment C = aG + bH and returns it as CtKey.
Also encodes the two items - `mask` and `amount` - into ecdh info,
@ -408,38 +422,27 @@ def _get_ecdh_info_and_out_pk(state: State, tx_out_key, amount, mask, amount_key
out_pk_dest = crypto.encodepoint(tx_out_key)
out_pk_commitment = crypto.encodepoint(crypto.gen_commitment(mask, amount))
crypto.sc_add_into(state.sumout, state.sumout, mask)
# masking of mask and amount
ecdh_info = _ecdh_encode(
mask, amount, crypto.encodeint(amount_key), state.is_bulletproof_v2()
)
ecdh_info = _ecdh_encode(amount, crypto.encodeint(amount_key))
# Manual ECDH info serialization
ecdh_info_bin = _serialize_ecdh(ecdh_info, state.is_bulletproof_v2())
ecdh_info_bin = _serialize_ecdh(ecdh_info)
gc.collect()
return out_pk_dest, out_pk_commitment, ecdh_info_bin
def _serialize_ecdh(ecdh_info, v2=False):
def _serialize_ecdh(ecdh_info: EcdhTuple) -> bytes:
"""
Serializes ECDH according to the current format defined by the hard fork version
or the signature format respectively.
"""
if v2:
# In HF10 the amount is serialized to 8B and mask is deterministic
ecdh_info_bin = bytearray(8)
ecdh_info_bin[:] = ecdh_info.amount[0:8]
return ecdh_info_bin
else:
ecdh_info_bin = bytearray(64)
utils.memcpy(ecdh_info_bin, 0, ecdh_info.mask, 0, 32)
utils.memcpy(ecdh_info_bin, 32, ecdh_info.amount, 0, 32)
return ecdh_info_bin
# Since HF10 the amount is serialized to 8B and mask is deterministic
ecdh_info_bin = bytearray(8)
ecdh_info_bin[:] = ecdh_info.amount[0:8]
return ecdh_info_bin
def _ecdh_hash(shared_sec):
def _ecdh_hash(shared_sec: bytes) -> bytes:
"""
Generates ECDH hash for amount masking for Bulletproof2
"""
@ -449,45 +452,22 @@ def _ecdh_hash(shared_sec):
return crypto.cn_fast_hash(data)
def _ecdh_encode(mask, amount, amount_key, v2=False):
def _ecdh_encode(amount: int, amount_key: bytes) -> EcdhTuple:
"""
Output recipients need be able to reconstruct the amount commitments.
This means the blinding factor `mask` and `amount` must be communicated
to the receiver somehow.
The mask and amount are stored as:
- mask = mask + Hs(amount_key)
- amount = amount + Hs(Hs(amount_key))
Because the receiver can derive the `amount_key` they can
easily derive both mask and amount as well.
Output recipients decode amounts from EcdhTuple structure.
"""
from apps.monero.xmr.serialize_messages.tx_ecdh import EcdhTuple
ecdh_info = EcdhTuple(mask=mask, amount=crypto.sc_init(amount))
if v2:
amnt = ecdh_info.amount
ecdh_info.mask = crypto.NULL_KEY_ENC
ecdh_info.amount = bytearray(32)
crypto.encodeint_into(ecdh_info.amount, amnt)
crypto.xor8(ecdh_info.amount, _ecdh_hash(amount_key))
return ecdh_info
else:
amount_key_hash_single = crypto.hash_to_scalar(amount_key)
amount_key_hash_double = crypto.hash_to_scalar(
crypto.encodeint(amount_key_hash_single)
)
# Not modifying passed mask, is reused in BP.
ecdh_info.mask = crypto.sc_add(ecdh_info.mask, amount_key_hash_single)
crypto.sc_add_into(ecdh_info.amount, ecdh_info.amount, amount_key_hash_double)
ecdh_info.mask = crypto.encodeint(ecdh_info.mask)
ecdh_info.amount = crypto.encodeint(ecdh_info.amount)
return ecdh_info
ecdh_info = EcdhTuple(mask=crypto.NULL_KEY_ENC, amount=bytearray(32))
amnt = crypto.sc_init(amount)
crypto.encodeint_into(ecdh_info.amount, amnt)
crypto.xor8(ecdh_info.amount, _ecdh_hash(amount_key))
return ecdh_info
def _set_out_additional_keys(state: State, dst_entr):
def _set_out_additional_keys(
state: State, dst_entr: MoneroTransactionDestinationEntry
) -> Sc25519:
"""
If needed (decided in step 1), additional tx keys are calculated
for this particular output.
@ -512,7 +492,11 @@ def _set_out_additional_keys(state: State, dst_entr):
return additional_txkey_priv
def _set_out_derivation(state: State, dst_entr, additional_txkey_priv):
def _set_out_derivation(
state: State,
dst_entr: MoneroTransactionDestinationEntry,
additional_txkey_priv: Sc25519,
) -> Ge25519:
"""
Calculates derivation which is then used in the one-time address as
`P = H(derivation)*G + B`.
@ -543,7 +527,7 @@ def _set_out_derivation(state: State, dst_entr, additional_txkey_priv):
return derivation
def _is_last_in_batch(state: State, idx, bidx):
def _is_last_in_batch(state: State, idx: int, bidx: int) -> bool:
"""
Returns true if the current output is last in the rsig batch
"""
@ -551,7 +535,7 @@ def _is_last_in_batch(state: State, idx, bidx):
return (idx - sum(state.rsig_grouping[:bidx])) + 1 == batch_size
def _get_rsig_batch(state: State, idx):
def _get_rsig_batch(state: State, idx: int) -> int:
"""
Returns index of the current rsig batch
"""

View File

@ -11,11 +11,16 @@ from trezor import utils
from .state import State
from apps.monero.layout import confirms
from apps.monero.signing import get_monero_rct_type
from apps.monero.signing import RctType
from apps.monero.xmr import crypto
if False:
from trezor.messages.MoneroTransactionAllOutSetAck import (
MoneroTransactionAllOutSetAck,
)
async def all_outputs_set(state: State):
async def all_outputs_set(state: State) -> MoneroTransactionAllOutSetAck:
state.mem_trace(0)
await confirms.transaction_step(state, state.STEP_ALL_OUT)
@ -25,17 +30,18 @@ async def all_outputs_set(state: State):
state.is_processing_offloaded = False
state.mem_trace(2)
_set_tx_extra(state)
extra_b = _set_tx_extra(state)
# tx public keys not needed anymore
state.additional_tx_public_keys = None
state.tx_pub = None
state.rsig_grouping = None
state.rsig_offload = None
gc.collect()
state.mem_trace(3)
# Completes the transaction prefix hash by including extra
_set_tx_prefix(state)
extra_b = state.tx.extra
state.tx = None
_set_tx_prefix(state, extra_b)
state.output_change = None
gc.collect()
state.mem_trace(4)
@ -50,18 +56,22 @@ async def all_outputs_set(state: State):
# Initializes RCTsig structure (fee, tx prefix hash, type)
rv_pb = MoneroRingCtSig(
txn_fee=state.fee,
message=state.tx_prefix_hash,
rv_type=get_monero_rct_type(state.bp_version),
txn_fee=state.fee, message=state.tx_prefix_hash, rv_type=RctType.Bulletproof2,
)
_out_pk(state)
state.full_message_hasher.rctsig_base_done()
state.current_output_index = -1
state.current_output_index = None
state.current_input_index = -1
state.full_message = state.full_message_hasher.get_digest()
state.full_message_hasher = None
state.output_pk_commitments = None
state.summary_outs_money = None
state.summary_inputs_money = None
state.fee = None
state.last_ki = None
state.last_step = state.STEP_ALL_OUT
return MoneroTransactionAllOutSetAck(
extra=extra_b,
@ -72,6 +82,8 @@ async def all_outputs_set(state: State):
def _validate(state: State):
if state.last_step != state.STEP_OUT:
raise ValueError("Invalid state transition")
if state.current_output_index + 1 != state.output_count:
raise ValueError("Invalid out num")
@ -93,7 +105,7 @@ def _validate(state: State):
)
def _set_tx_extra(state: State):
def _set_tx_extra(state: State) -> bytes:
"""
Sets tx public keys into transaction's extra.
Extra field is supposed to be sorted (by sort_tx_extra() in the Monero)
@ -136,10 +148,10 @@ def _set_tx_extra(state: State):
utils.memcpy(extra, offset, state.extra_nonce, 0, len(state.extra_nonce))
state.extra_nonce = None
state.tx.extra = extra
return extra
def _set_tx_prefix(state: State):
def _set_tx_prefix(state: State, extra: bytes):
"""
Adds `extra` to the tx_prefix_hash, which is the last needed item,
so the tx_prefix_hash is now complete and can be incorporated
@ -147,8 +159,8 @@ def _set_tx_prefix(state: State):
"""
# Serializing "extra" type as BlobType.
# uvarint(len(extra)) || extra
state.tx_prefix_hasher.uvarint(len(state.tx.extra))
state.tx_prefix_hasher.buffer(state.tx.extra)
state.tx_prefix_hasher.uvarint(len(extra))
state.tx_prefix_hasher.buffer(extra)
state.tx_prefix_hash = state.tx_prefix_hasher.get_digest()
state.tx_prefix_hasher = None

View File

@ -20,9 +20,13 @@ from apps.monero.layout import confirms
from apps.monero.xmr import crypto
if False:
from typing import List
from trezor.messages.MoneroTransactionSourceEntry import (
MoneroTransactionSourceEntry,
)
from trezor.messages.MoneroTransactionSignInputAck import (
MoneroTransactionSignInputAck,
)
async def sign_input(
@ -34,7 +38,8 @@ async def sign_input(
pseudo_out_hmac: bytes,
pseudo_out_alpha_enc: bytes,
spend_enc: bytes,
):
orig_idx: int,
) -> MoneroTransactionSignInputAck:
"""
:param state: transaction state
:param src_entr: Source entry
@ -45,6 +50,7 @@ async def sign_input(
:param pseudo_out_hmac: HMAC for pseudo_out
:param pseudo_out_alpha_enc: alpha mask used in pseudo_out, only applicable for RCTTypeSimple. Encrypted.
:param spend_enc: one time address spending private key. Encrypted.
:param orig_idx: original index of the src_entr before sorting (HMAC check)
:return: Generated signature MGs[i]
"""
await confirms.transaction_step(
@ -52,6 +58,8 @@ async def sign_input(
)
state.current_input_index += 1
if state.last_step not in (state.STEP_ALL_OUT, state.STEP_SIGN):
raise ValueError("Invalid state transition")
if state.current_input_index >= state.input_count:
raise ValueError("Invalid inputs count")
if pseudo_out is None:
@ -59,7 +67,11 @@ async def sign_input(
if pseudo_out_alpha_enc is None:
raise ValueError("SimpleRCT requires pseudo_out's mask but none provided")
input_position = state.source_permutation[state.current_input_index]
input_position = (
state.source_permutation[state.current_input_index]
if state.client_version <= 1
else orig_idx
)
mods = utils.unimport_begin()
# Check input's HMAC
@ -71,6 +83,14 @@ async def sign_input(
if not crypto.ct_equals(vini_hmac_comp, vini_hmac):
raise ValueError("HMAC is not correct")
# Key image sorting check - permutation correctness
cur_ki = offloading_keys.get_ki_from_vini(vini_bin)
if state.current_input_index > 0 and state.last_ki <= cur_ki:
raise ValueError("Key image order invalid")
state.last_ki = cur_ki if state.current_input_index < state.input_count else None
del (cur_ki, vini_bin, vini_hmac, vini_hmac_comp)
gc.collect()
state.mem_trace(1, True)
@ -83,8 +103,8 @@ async def sign_input(
)
)
# Last pseud_out is recomputed so mask sums hold
if state.is_det_mask() and input_position + 1 == state.input_count:
# Last pseudo_out is recomputed so mask sums hold
if input_position + 1 == state.input_count:
# Recompute the lash alpha so the sum holds
state.mem_trace("Correcting alpha")
alpha_diff = crypto.sc_sub(state.sumout, state.sumpouts_alphas)
@ -129,12 +149,11 @@ async def sign_input(
utils.unimport_end(mods)
state.mem_trace(3, True)
from apps.monero.xmr.serialize_messages.ct_keys import CtKey
# Basic setup, sanity check
from apps.monero.xmr.serialize_messages.tx_ct_key import CtKey
index = src_entr.real_output
input_secret_key = CtKey(dest=spend_key, mask=crypto.decodeint(src_entr.mask))
kLRki = None # for multisig: src_entr.multisig_kLRki
input_secret_key = CtKey(spend_key, crypto.decodeint(src_entr.mask))
# Private key correctness test
utils.ensure(
@ -157,27 +176,120 @@ async def sign_input(
from apps.monero.xmr import mlsag
mg_buffer = []
ring_pubkeys = [x.key for x in src_entr.outputs]
ring_pubkeys = [x.key for x in src_entr.outputs if x]
utils.ensure(len(ring_pubkeys) == len(src_entr.outputs), "Invalid ring")
del src_entr
mlsag.generate_mlsag_simple(
state.full_message,
ring_pubkeys,
input_secret_key,
pseudo_out_alpha,
pseudo_out_c,
kLRki,
index,
mg_buffer,
)
del (input_secret_key, pseudo_out_alpha, mlsag, ring_pubkeys)
state.mem_trace(5, True)
if state.hard_fork and state.hard_fork >= 13:
state.mem_trace("CLSAG")
mlsag.generate_clsag_simple(
state.full_message,
ring_pubkeys,
input_secret_key,
pseudo_out_alpha,
pseudo_out_c,
index,
mg_buffer,
)
else:
mlsag.generate_mlsag_simple(
state.full_message,
ring_pubkeys,
input_secret_key,
pseudo_out_alpha,
pseudo_out_c,
index,
mg_buffer,
)
del (CtKey, input_secret_key, pseudo_out_alpha, mlsag, ring_pubkeys)
state.mem_trace(6, True)
from trezor.messages.MoneroTransactionSignInputAck import (
MoneroTransactionSignInputAck,
)
# Encrypt signature, reveal once protocol finishes OK
if state.client_version >= 3:
utils.unimport_end(mods)
state.mem_trace(7, True)
mg_buffer = _protect_signature(state, mg_buffer)
state.mem_trace(8, True)
state.last_step = state.STEP_SIGN
return MoneroTransactionSignInputAck(
signature=mg_buffer, pseudo_out=crypto.encodepoint(pseudo_out_c)
)
def _protect_signature(state: State, mg_buffer: List[bytes]) -> List[bytes]:
"""
Encrypts the signature with keys derived from state.opening_key.
After protocol finishes without error, opening_key is sent to the
host.
"""
from trezor.crypto import random
from trezor.crypto import chacha20poly1305
from apps.monero.signing import offloading_keys
if state.last_step != state.STEP_SIGN:
state.opening_key = random.bytes(32)
nonce = offloading_keys.key_signature(
state.opening_key, state.current_input_index, True
)[:12]
key = offloading_keys.key_signature(
state.opening_key, state.current_input_index, False
)
cipher = chacha20poly1305(key, nonce)
"""
cipher.update() input has to be 512 bit long (besides the last block).
Thus we go over mg_buffer and buffer 512 bit input blocks before
calling cipher.update().
"""
CHACHA_BLOCK = 64 # 512 bit chacha key-stream block size
buff = bytearray(CHACHA_BLOCK)
buff_len = 0 # valid bytes in the block buffer
mg_len = 0
for data in mg_buffer:
mg_len += len(data)
# Preallocate array of ciphertext blocks, ceil, add tag block
mg_res = [None] * (1 + (mg_len + CHACHA_BLOCK - 1) // CHACHA_BLOCK)
mg_res_c = 0
for ix, data in enumerate(mg_buffer):
data_ln = len(data)
data_off = 0
while data_ln > 0:
to_add = min(CHACHA_BLOCK - buff_len, data_ln)
if to_add:
buff[buff_len : buff_len + to_add] = data[data_off : data_off + to_add]
data_ln -= to_add
buff_len += to_add
data_off += to_add
if len(buff) != CHACHA_BLOCK or buff_len > CHACHA_BLOCK:
raise ValueError("Invariant error")
if buff_len == CHACHA_BLOCK:
mg_res[mg_res_c] = cipher.encrypt(buff)
mg_res_c += 1
buff_len = 0
mg_buffer[ix] = None
if ix & 7 == 0:
gc.collect()
# The last block can be incomplete
if buff_len:
mg_res[mg_res_c] = cipher.encrypt(buff[:buff_len])
mg_res_c += 1
mg_res[mg_res_c] = cipher.finish()
return mg_res

View File

@ -16,8 +16,17 @@ from apps.monero import misc
from apps.monero.xmr import crypto
from apps.monero.xmr.crypto import chacha_poly
if False:
from typing import Tuple
from apps.monero.xmr.types import Sc25519
async def final_msg(state: State) -> MoneroTransactionFinalAck:
if state.last_step != state.STEP_SIGN:
raise ValueError("Invalid state transition")
if state.current_input_index != state.input_count - 1:
raise ValueError("Invalid input count")
async def final_msg(state: State):
tx_key, salt, rand_mult = _compute_tx_key(
state.creds.spend_key_private, state.tx_prefix_hash
)
@ -26,13 +35,20 @@ async def final_msg(state: State):
[crypto.encodeint(x) for x in state.additional_tx_private_keys]
)
tx_enc_keys = chacha_poly.encrypt_pack(tx_key, key_buff)
state.last_step = None
return MoneroTransactionFinalAck(
cout_key=None, salt=salt, rand_mult=rand_mult, tx_enc_keys=tx_enc_keys
cout_key=None,
salt=salt,
rand_mult=rand_mult,
tx_enc_keys=tx_enc_keys,
opening_key=state.opening_key,
)
def _compute_tx_key(spend_key_private, tx_prefix_hash):
def _compute_tx_key(
spend_key_private: Sc25519, tx_prefix_hash: bytes
) -> Tuple[bytes, bytes, bytes]:
salt = crypto.random_bytes(32)
rand_mult_num = crypto.random_scalar()

View File

@ -2,17 +2,27 @@ from trezor.crypto import monero as tcry
from apps.monero.xmr.networks import NetworkTypes, net_version
if False:
from typing import List, Tuple, Optional
from apps.monero.xmr.types import Ge25519
from trezor.messages.MoneroAccountPublicAddress import MoneroAccountPublicAddress
from trezor.messages.MoneroTransactionDestinationEntry import (
MoneroTransactionDestinationEntry,
)
def addr_to_hash(addr):
def addr_to_hash(addr: MoneroAccountPublicAddress) -> bytes:
"""
Creates hashable address representation
"""
return bytes(addr.spend_public_key + addr.view_public_key)
def encode_addr(version, spend_pub, view_pub, payment_id=None):
def encode_addr(
version, spend_pub: Ge25519, view_pub: Ge25519, payment_id: Optional[bytes] = None
) -> str:
"""
Encodes public keys as versions
Builds Monero address from public keys
"""
buf = spend_pub + view_pub
if payment_id:
@ -20,7 +30,7 @@ def encode_addr(version, spend_pub, view_pub, payment_id=None):
return tcry.xmr_base58_addr_encode_check(ord(version), bytes(buf))
def decode_addr(addr):
def decode_addr(addr: bytes) -> Tuple[int, bytes, bytes]:
"""
Given address, get version and public spend and view keys.
"""
@ -30,7 +40,9 @@ def decode_addr(addr):
return version, pub_spend_key, pub_view_key
def public_addr_encode(pub_addr, is_sub=False, net=NetworkTypes.MAINNET):
def public_addr_encode(
pub_addr: MoneroAccountPublicAddress, is_sub=False, net=NetworkTypes.MAINNET
):
"""
Encodes public address to Monero address
"""
@ -38,7 +50,10 @@ def public_addr_encode(pub_addr, is_sub=False, net=NetworkTypes.MAINNET):
return encode_addr(net_ver, pub_addr.spend_public_key, pub_addr.view_public_key)
def classify_subaddresses(tx_dests, change_addr):
def classify_subaddresses(
tx_dests: List[MoneroTransactionDestinationEntry],
change_addr: MoneroAccountPublicAddress,
) -> Tuple[int, int, int]:
"""
Classify destination subaddresses
"""
@ -61,14 +76,17 @@ def classify_subaddresses(tx_dests, change_addr):
return num_stdaddresses, num_subaddresses, single_dest_subaddress
def addr_eq(a, b):
def addr_eq(a: MoneroAccountPublicAddress, b: MoneroAccountPublicAddress):
return (
a.spend_public_key == b.spend_public_key
and a.view_public_key == b.view_public_key
)
def get_change_addr_idx(outputs, change_dts):
def get_change_addr_idx(
outputs: List[MoneroTransactionDestinationEntry],
change_dts: MoneroTransactionDestinationEntry,
) -> int:
"""
Returns ID of the change output from the change_dts and outputs
"""

View File

@ -657,7 +657,6 @@ class KeyVPowers(KeyVBase):
raise IndexError("Only linear scan allowed: %s, %s" % (prev, item))
def set_state(self, idx, val):
self.item = idx
self.last_idx = idx
if self.raw:
return crypto.sc_copy(self.cur, val)
@ -1666,8 +1665,8 @@ class BulletProofBuilder:
_sc_mulsub(h_scalar, tmp, yinvpow, h_scalar)
if not is_single: # ph4
_sc_mulsub(m_z4[i], g_scalar, weight_z, m_z4[i])
_sc_mulsub(m_z5[i], h_scalar, weight_z, m_z5[i])
m_z4.read(i, _sc_mulsub(_tmp_bf_0, g_scalar, weight_z, m_z4[i]))
m_z5.read(i, _sc_mulsub(_tmp_bf_0, h_scalar, weight_z, m_z5[i]))
else:
_sc_mul(tmp, g_scalar, weight_z)
_sub_keys(

View File

@ -2,6 +2,10 @@ from apps.monero.xmr import crypto
from apps.monero.xmr.addresses import encode_addr
from apps.monero.xmr.networks import NetworkTypes, net_version
if False:
from typing import Optional
from apps.monero.xmr.types import Sc25519, Ge25519
class AccountCreds:
"""
@ -10,11 +14,11 @@ class AccountCreds:
def __init__(
self,
view_key_private=None,
spend_key_private=None,
view_key_public=None,
spend_key_public=None,
address=None,
view_key_private: Optional[Sc25519] = None,
spend_key_private: Optional[Sc25519] = None,
view_key_public: Optional[Ge25519] = None,
spend_key_public: Optional[Ge25519] = None,
address: Optional[str] = None,
network_type=NetworkTypes.MAINNET,
):
self.view_key_private = view_key_private
@ -26,7 +30,10 @@ class AccountCreds:
@classmethod
def new_wallet(
cls, priv_view_key, priv_spend_key, network_type=NetworkTypes.MAINNET
cls,
priv_view_key: Sc25519,
priv_spend_key: Sc25519,
network_type=NetworkTypes.MAINNET,
):
pub_view_key = crypto.scalarmult_base(priv_view_key)
pub_spend_key = crypto.scalarmult_base(priv_spend_key)

View File

@ -10,6 +10,11 @@
from trezor.crypto import hmac, monero as tcry, random
from trezor.crypto.hashlib import sha3_256
if False:
from typing import Tuple, Optional, Union
from apps.monero.xmr.types import Sc25519, Ge25519
NULL_KEY_ENC = b"\x00" * 32
random_bytes = random.bytes
@ -45,7 +50,7 @@ def compute_hmac(key, msg=None):
new_point = tcry.ge25519_set_neutral
def new_scalar():
def new_scalar() -> Sc25519:
return tcry.init256_modm(0)
@ -81,7 +86,7 @@ INV_EIGHT = b"\x79\x2f\xdc\xe2\x29\xe5\x06\x61\xd0\xda\x1c\x7d\xb3\x9d\xd3\x07\x
INV_EIGHT_SC = decodeint(INV_EIGHT)
def sc_inv_eight():
def sc_inv_eight() -> Sc25519:
return INV_EIGHT_SC
@ -90,21 +95,21 @@ def sc_inv_eight():
#
def sc_0():
def sc_0() -> Sc25519:
return tcry.init256_modm(0)
def sc_0_into(r):
def sc_0_into(r: Sc25519) -> Sc25519:
return tcry.init256_modm(r, 0)
def sc_init(x):
def sc_init(x: int) -> Sc25519:
if x >= (1 << 64):
raise ValueError("Initialization works up to 64-bit only")
return tcry.init256_modm(x)
def sc_init_into(r, x):
def sc_init_into(r: Sc25519, x: int) -> Sc25519:
if x >= (1 << 64):
raise ValueError("Initialization works up to 64-bit only")
return tcry.init256_modm(r, x)
@ -123,7 +128,7 @@ sc_mul = tcry.mul256_modm
sc_mul_into = tcry.mul256_modm
def sc_isnonzero(c):
def sc_isnonzero(c: Sc25519) -> bool:
"""
Returns true if scalar is non-zero
"""
@ -138,7 +143,7 @@ sc_muladd_into = tcry.muladd256_modm
sc_inv_into = tcry.inv256_modm
def random_scalar(r=None):
def random_scalar(r=None) -> Sc25519:
return tcry.xmr_random_scalar(r if r is not None else new_scalar())
@ -147,7 +152,7 @@ def random_scalar(r=None):
#
def ge25519_double_scalarmult_base_vartime(a, A, b):
def ge25519_double_scalarmult_base_vartime(a, A, b) -> Ge25519:
"""
void ge25519_double_scalarmult_vartime(ge25519 *r, const ge25519 *p1, const bignum256modm s1, const bignum256modm s2);
r = a * A + b * B
@ -159,7 +164,7 @@ def ge25519_double_scalarmult_base_vartime(a, A, b):
ge25519_double_scalarmult_vartime2 = tcry.xmr_add_keys3
def identity(byte_enc=False):
def identity(byte_enc=False) -> Union[Ge25519, bytes]:
idd = tcry.ge25519_set_neutral()
return idd if not byte_enc else encodepoint(idd)
@ -180,7 +185,7 @@ http://elligator.cr.yp.to/elligator-20130828.pdf
cn_fast_hash = keccak_hash
def hash_to_scalar(data, length=None):
def hash_to_scalar(data: bytes, length: Optional[int] = None):
"""
H_s(P)
"""
@ -188,7 +193,7 @@ def hash_to_scalar(data, length=None):
return tcry.xmr_hash_to_scalar(dt)
def hash_to_scalar_into(r, data, length=None):
def hash_to_scalar_into(r: Sc25519, data: bytes, length: Optional[int] = None):
dt = data[:length] if length else data
return tcry.xmr_hash_to_scalar(r, dt)
@ -212,7 +217,7 @@ hash_to_point_into = tcry.xmr_hash_to_ec
xmr_H = tcry.ge25519_set_h
def scalarmult_h(i):
def scalarmult_h(i) -> Ge25519:
return scalarmult(xmr_H(), sc_init(i) if isinstance(i, int) else i)
@ -223,7 +228,7 @@ add_keys3_into = tcry.xmr_add_keys3_vartime
gen_commitment = tcry.xmr_gen_c
def generate_key_derivation(pub, sec):
def generate_key_derivation(pub: Ge25519, sec: Sc25519) -> Ge25519:
"""
Key derivation: 8*(key2*key1)
"""
@ -232,7 +237,7 @@ def generate_key_derivation(pub, sec):
return tcry.xmr_generate_key_derivation(pub, sec)
def derivation_to_scalar(derivation, output_index):
def derivation_to_scalar(derivation: Ge25519, output_index: int) -> Sc25519:
"""
H_s(derivation || varint(output_index))
"""
@ -240,7 +245,7 @@ def derivation_to_scalar(derivation, output_index):
return tcry.xmr_derivation_to_scalar(derivation, output_index)
def derive_public_key(derivation, output_index, B):
def derive_public_key(derivation: Ge25519, output_index: int, B: Ge25519) -> Ge25519:
"""
H_s(derivation || varint(output_index))G + B
"""
@ -248,7 +253,7 @@ def derive_public_key(derivation, output_index, B):
return tcry.xmr_derive_public_key(derivation, output_index, B)
def derive_secret_key(derivation, output_index, base):
def derive_secret_key(derivation: Ge25519, output_index: int, base: Sc25519) -> Sc25519:
"""
base + H_s(derivation || varint(output_index))
"""
@ -256,7 +261,9 @@ def derive_secret_key(derivation, output_index, base):
return tcry.xmr_derive_private_key(derivation, output_index, base)
def get_subaddress_secret_key(secret_key, major=0, minor=0):
def get_subaddress_secret_key(
secret_key: Sc25519, major: int = 0, minor: int = 0
) -> Sc25519:
"""
Builds subaddress secret key from the subaddress index
Hs(SubAddr || a || index_major || index_minor)
@ -264,12 +271,7 @@ def get_subaddress_secret_key(secret_key, major=0, minor=0):
return tcry.xmr_get_subaddress_secret_key(major, minor, secret_key)
#
# Repr invariant
#
def generate_signature(data, priv):
def generate_signature(data: bytes, priv: Sc25519) -> Tuple[Sc25519, Sc25519, Ge25519]:
"""
Generate EC signature
crypto_ops::generate_signature(const hash &prefix_hash, const public_key &pub, const secret_key &sec, signature &sig)
@ -285,7 +287,7 @@ def generate_signature(data, priv):
return c, r, pub
def check_signature(data, c, r, pub):
def check_signature(data: bytes, c: Sc25519, r: Sc25519, pub: Ge25519) -> bool:
"""
EC signature verification
"""
@ -300,7 +302,7 @@ def check_signature(data, c, r, pub):
return not sc_isnonzero(res)
def xor8(buff, key):
def xor8(buff: bytes, key: bytes) -> bytes:
for i in range(8):
buff[i] ^= key[i]
return buff

View File

@ -1,8 +1,17 @@
from apps.monero.xmr import crypto, monero
from apps.monero.xmr.serialize.int_serialize import dump_uvarint_b
if False:
from typing import List, Tuple, Optional, Dict
from apps.monero.xmr.types import Ge25519, Sc25519
from apps.monero.xmr.credentials import AccountCreds
from trezor.messages.MoneroTransferDetails import MoneroTransferDetails
def compute_hash(rr):
Subaddresses = Dict[bytes, Tuple[int, int]]
Sig = List[List[Sc25519]]
def compute_hash(rr: MoneroTransferDetails) -> bytes:
kck = crypto.get_keccak()
kck.update(rr.out_key)
kck.update(rr.tx_pub_key)
@ -13,29 +22,59 @@ def compute_hash(rr):
return kck.digest()
def export_key_image(creds, subaddresses, td):
def export_key_image(
creds: AccountCreds, subaddresses: Subaddresses, td: MoneroTransferDetails
) -> Tuple[Ge25519, Sig]:
out_key = crypto.decodepoint(td.out_key)
tx_pub_key = crypto.decodepoint(td.tx_pub_key)
additional_tx_pub_keys = [crypto.decodepoint(x) for x in td.additional_tx_pub_keys]
additional_tx_pub_key = None
if len(td.additional_tx_pub_keys) == 1: # compression
additional_tx_pub_key = crypto.decodepoint(td.additional_tx_pub_keys[0])
elif td.additional_tx_pub_keys:
if td.internal_output_index >= len(td.additional_tx_pub_keys):
raise ValueError("Wrong number of additional derivations")
additional_tx_pub_key = crypto.decodepoint(
td.additional_tx_pub_keys[td.internal_output_index]
)
ki, sig = _export_key_image(
creds,
subaddresses,
out_key,
tx_pub_key,
additional_tx_pub_keys,
additional_tx_pub_key,
td.internal_output_index,
True,
td.sub_addr_major,
td.sub_addr_minor,
)
return ki, sig
def _export_key_image(
creds, subaddresses, pkey, tx_pub_key, additional_tx_pub_keys, out_idx, test=True
):
creds: AccountCreds,
subaddresses: Subaddresses,
pkey: Ge25519,
tx_pub_key: Ge25519,
additional_tx_pub_key: Optional[Ge25519],
out_idx: int,
test: bool = True,
sub_addr_major: int = None,
sub_addr_minor: int = None,
) -> Tuple[Ge25519, Sig]:
"""
Generates key image for the TXO + signature for the key image
"""
r = monero.generate_tx_spend_and_key_image_and_derivation(
creds, subaddresses, pkey, tx_pub_key, additional_tx_pub_keys, out_idx
creds,
subaddresses,
pkey,
tx_pub_key,
additional_tx_pub_key,
out_idx,
sub_addr_major,
sub_addr_minor,
)
xi, ki, recv_derivation = r[:3]
@ -45,7 +84,14 @@ def _export_key_image(
return ki, sig
def generate_ring_signature(prefix_hash, image, pubs, sec, sec_idx, test=False):
def generate_ring_signature(
prefix_hash: bytes,
image: Ge25519,
pubs: List[Ge25519],
sec: Sc25519,
sec_idx: int,
test: bool = False,
) -> Sig:
"""
Generates ring signature with key image.
void crypto_ops::generate_ring_signature()
@ -72,7 +118,7 @@ def generate_ring_signature(prefix_hash, image, pubs, sec, sec_idx, test=False):
k = crypto.sc_0()
sig = []
for i in range(len(pubs)):
for _ in range(len(pubs)):
sig.append([crypto.sc_0(), crypto.sc_0()]) # c, r
for i in range(len(pubs)):

View File

@ -47,8 +47,29 @@ import gc
from apps.monero.xmr import crypto
from apps.monero.xmr.serialize import int_serialize
if False:
from typing import List, Tuple
from apps.monero.xmr.types import Ge25519, Sc25519
from apps.monero.xmr.serialize_messages.tx_ct_key import CtKey
from trezor.messages.MoneroRctKeyPublic import MoneroRctKeyPublic
def generate_mlsag_simple(message, pubs, in_sk, a, cout, kLRki, index, mg_buff):
KeyM = List[List[bytes]]
_HASH_KEY_CLSAG_ROUND = b"CLSAG_round\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
_HASH_KEY_CLSAG_AGG_0 = b"CLSAG_agg_0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
_HASH_KEY_CLSAG_AGG_1 = b"CLSAG_agg_1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
def generate_mlsag_simple(
message: bytes,
pubs: List[MoneroRctKeyPublic],
in_sk: CtKey,
a: Sc25519,
cout: Ge25519,
index: int,
mg_buff: List[bytes],
) -> List[bytes]:
"""
MLSAG for RctType.Simple
:param message: the full message to be signed (actually its hash)
@ -56,7 +77,6 @@ def generate_mlsag_simple(message, pubs, in_sk, a, cout, kLRki, index, mg_buff):
:param in_sk: CtKey; spending private key with input commitment mask (original); better_name: input_secret_key
:param a: mask from the pseudo output commitment; better name: pseudo_out_alpha
:param cout: pseudo output commitment; point, decoded; better name: pseudo_out_c
:param kLRki: used only in multisig, currently not implemented
:param index: specifies corresponding public key to the `in_sk` in the pubs array
:param mg_buff: buffer to store the signature to
"""
@ -87,10 +107,10 @@ def generate_mlsag_simple(message, pubs, in_sk, a, cout, kLRki, index, mg_buff):
del pubs
gc.collect()
return generate_mlsag(message, M, sk, kLRki, index, dsRows, mg_buff)
return generate_mlsag(message, M, sk, index, dsRows, mg_buff)
def gen_mlsag_assert(pk, xx, kLRki, index, dsRows):
def gen_mlsag_assert(pk: KeyM, xx: List[Sc25519], index: int, dsRows: int):
"""
Conditions check
"""
@ -111,20 +131,23 @@ def gen_mlsag_assert(pk, xx, kLRki, index, dsRows):
raise ValueError("Bad xx size")
if dsRows > rows:
raise ValueError("Bad dsRows size")
if kLRki and dsRows != 1:
raise ValueError("Multisig requires exactly 1 dsRows")
if kLRki:
raise NotImplementedError("Multisig not implemented")
return rows, cols
def generate_first_c_and_key_images(message, pk, xx, kLRki, index, dsRows, rows, cols):
def generate_first_c_and_key_images(
message: bytes,
pk: KeyM,
xx: List[Sc25519],
index: int,
dsRows: int,
rows: int,
cols: int,
) -> Tuple[Sc25519, List[Ge25519], List[Ge25519]]:
"""
MLSAG computation - the part with secret keys
:param message: the full message to be signed (actually its hash)
:param pk: matrix of public keys and commitments
:param xx: input secret array composed of a private key and commitment mask
:param kLRki: used only in multisig, currently not implemented
:param index: specifies corresponding public key to the `xx`'s private key in the `pk` array
:param dsRows: row number where the pubkeys "end" (and commitments follow)
:param rows: total number of rows
@ -143,24 +166,17 @@ def generate_first_c_and_key_images(message, pk, xx, kLRki, index, dsRows, rows,
# this is somewhat extra as compared to the Ring Confidential Tx paper
# see footnote in From Zero to Monero section 3.3
hasher.update(pk[index][i])
if kLRki:
raise NotImplementedError("Multisig not implemented")
# alpha[i] = kLRki.k
# rv.II[i] = kLRki.ki
# hash_point(hasher, kLRki.L, tmp_buff)
# hash_point(hasher, kLRki.R, tmp_buff)
else:
crypto.hash_to_point_into(Hi, pk[index][i])
alpha[i] = crypto.random_scalar()
# L = alpha_i * G
crypto.scalarmult_base_into(aGi, alpha[i])
# Ri = alpha_i * H(P_i)
crypto.scalarmult_into(aHPi, Hi, alpha[i])
# key image
II[i] = crypto.scalarmult(Hi, xx[i])
_hash_point(hasher, aGi, tmp_buff)
_hash_point(hasher, aHPi, tmp_buff)
crypto.hash_to_point_into(Hi, pk[index][i])
alpha[i] = crypto.random_scalar()
# L = alpha_i * G
crypto.scalarmult_base_into(aGi, alpha[i])
# Ri = alpha_i * H(P_i)
crypto.scalarmult_into(aHPi, Hi, alpha[i])
# key image
II[i] = crypto.scalarmult(Hi, xx[i])
_hash_point(hasher, aGi, tmp_buff)
_hash_point(hasher, aHPi, tmp_buff)
for i in range(dsRows, rows):
alpha[i] = crypto.random_scalar()
@ -178,19 +194,25 @@ def generate_first_c_and_key_images(message, pk, xx, kLRki, index, dsRows, rows,
return c_old, II, alpha
def generate_mlsag(message, pk, xx, kLRki, index, dsRows, mg_buff):
def generate_mlsag(
message: bytes,
pk: KeyM,
xx: List[Sc25519],
index: int,
dsRows: int,
mg_buff: List[bytes],
) -> List[bytes]:
"""
Multilayered Spontaneous Anonymous Group Signatures (MLSAG signatures)
:param message: the full message to be signed (actually its hash)
:param pk: matrix of public keys and commitments
:param xx: input secret array composed of a private key and commitment mask
:param kLRki: used only in multisig, currently not implemented
:param index: specifies corresponding public key to the `xx`'s private key in the `pk` array
:param dsRows: separates pubkeys from commitment
:param mg_buff: mg signature buffer
"""
rows, cols = gen_mlsag_assert(pk, xx, kLRki, index, dsRows)
rows, cols = gen_mlsag_assert(pk, xx, index, dsRows)
rows_b_size = int_serialize.uvarint_size(rows)
# Preallocation of the chunked buffer, len + cols + cc
@ -206,7 +228,7 @@ def generate_mlsag(message, pk, xx, kLRki, index, dsRows, mg_buff):
# calculates the "first" c, key images and random scalars alpha
c_old, II, alpha = generate_first_c_and_key_images(
message, pk, xx, kLRki, index, dsRows, rows, cols
message, pk, xx, index, dsRows, rows, cols
)
i = (index + 1) % cols
@ -271,6 +293,176 @@ def generate_mlsag(message, pk, xx, kLRki, index, dsRows, mg_buff):
# rv.cc
mg_buff[-1] = crypto.encodeint(cc)
return mg_buff
def generate_clsag_simple(
message: bytes,
pubs: List[MoneroRctKeyPublic],
in_sk: CtKey,
a: Sc25519,
cout: Ge25519,
index: int,
mg_buff: List[bytes],
) -> List[bytes]:
"""
CLSAG for RctType.Simple
https://eprint.iacr.org/2019/654.pdf
Corresponds to proveRctCLSAGSimple in rctSigs.cpp
:param message: the full message to be signed (actually its hash)
:param pubs: vector of MoneroRctKey; this forms the ring; point values in encoded form; (dest, mask) = (P, C)
:param in_sk: CtKey; spending private key with input commitment mask (original); better_name: input_secret_key
:param a: mask from the pseudo output commitment; better name: pseudo_out_alpha
:param cout: pseudo output commitment; point, decoded; better name: pseudo_out_c
:param index: specifies corresponding public key to the `in_sk` in the pubs array
:param mg_buff: buffer to store the signature to
"""
cols = len(pubs)
if cols == 0:
raise ValueError("Empty pubs")
P = _key_vector(cols)
C_nonzero = _key_vector(cols)
p = in_sk.dest
z = crypto.sc_sub(in_sk.mask, a)
for i in range(cols):
P[i] = pubs[i].dest
C_nonzero[i] = pubs[i].commitment
pubs[i] = None
del pubs
gc.collect()
return _generate_clsag(message, P, p, C_nonzero, z, cout, index, mg_buff)
def _generate_clsag(
message: bytes,
P: List[bytes],
p: Sc25519,
C_nonzero: List[bytes],
z: Sc25519,
Cout: Ge25519,
index: int,
mg_buff: List[bytes],
) -> List[bytes]:
sI = crypto.new_point() # sig.I
sD = crypto.new_point() # sig.D
sc1 = crypto.new_scalar() # sig.c1
a = crypto.random_scalar()
H = crypto.new_point()
D = crypto.new_point()
Cout_bf = crypto.encodepoint(Cout)
tmp_sc = crypto.new_scalar()
tmp = crypto.new_point()
tmp_bf = bytearray(32)
crypto.hash_to_point_into(H, P[index])
crypto.scalarmult_into(sI, H, p) # I = p*H
crypto.scalarmult_into(D, H, z) # D = z*H
crypto.sc_mul_into(tmp_sc, z, crypto.sc_inv_eight()) # 1/8*z
crypto.scalarmult_into(sD, H, tmp_sc) # sig.D = 1/8*z*H
sD = crypto.encodepoint(sD)
hsh_P = crypto.get_keccak() # domain, I, D, P, C, C_offset
hsh_C = crypto.get_keccak() # domain, I, D, P, C, C_offset
hsh_P.update(_HASH_KEY_CLSAG_AGG_0)
hsh_C.update(_HASH_KEY_CLSAG_AGG_1)
def hsh_PC(x):
nonlocal hsh_P, hsh_C
hsh_P.update(x)
hsh_C.update(x)
for x in P:
hsh_PC(x)
for x in C_nonzero:
hsh_PC(x)
hsh_PC(crypto.encodepoint_into(tmp_bf, sI))
hsh_PC(sD)
hsh_PC(Cout_bf)
mu_P = crypto.decodeint(hsh_P.digest())
mu_C = crypto.decodeint(hsh_C.digest())
del (hsh_PC, hsh_P, hsh_C)
c_to_hash = crypto.get_keccak() # domain, P, C, C_offset, message, aG, aH
c_to_hash.update(_HASH_KEY_CLSAG_ROUND)
for i in range(len(P)):
c_to_hash.update(P[i])
for i in range(len(P)):
c_to_hash.update(C_nonzero[i])
c_to_hash.update(Cout_bf)
c_to_hash.update(message)
chasher = c_to_hash.copy()
crypto.scalarmult_base_into(tmp, a)
chasher.update(crypto.encodepoint_into(tmp_bf, tmp)) # aG
crypto.scalarmult_into(tmp, H, a)
chasher.update(crypto.encodepoint_into(tmp_bf, tmp)) # aH
c = crypto.decodeint(chasher.digest())
del (chasher, H)
L = crypto.new_point()
R = crypto.new_point()
c_p = crypto.new_scalar()
c_c = crypto.new_scalar()
i = (index + 1) % len(P)
if i == 0:
crypto.sc_copy(sc1, c)
mg_buff.append(int_serialize.dump_uvarint_b(len(P)))
for _ in range(len(P)):
mg_buff.append(bytearray(32))
while i != index:
crypto.random_scalar(tmp_sc)
crypto.encodeint_into(mg_buff[i + 1], tmp_sc)
crypto.sc_mul_into(c_p, mu_P, c)
crypto.sc_mul_into(c_c, mu_C, c)
# L = tmp_sc * G + c_P * P[i] + c_c * C[i]
crypto.add_keys2_into(L, tmp_sc, c_p, crypto.decodepoint_into(tmp, P[i]))
crypto.decodepoint_into(tmp, C_nonzero[i]) # C = C_nonzero - Cout
crypto.point_sub_into(tmp, tmp, Cout)
crypto.scalarmult_into(tmp, tmp, c_c)
crypto.point_add_into(L, L, tmp)
# R = tmp_sc * HP + c_p * I + c_c * D
crypto.hash_to_point_into(tmp, P[i])
crypto.add_keys3_into(R, tmp_sc, tmp, c_p, sI)
crypto.point_add_into(R, R, crypto.scalarmult_into(tmp, D, c_c))
chasher = c_to_hash.copy()
chasher.update(crypto.encodepoint_into(tmp_bf, L))
chasher.update(crypto.encodepoint_into(tmp_bf, R))
crypto.decodeint_into(c, chasher.digest())
P[i] = None
C_nonzero[i] = None
i = (i + 1) % len(P)
if i == 0:
crypto.sc_copy(sc1, c)
if i & 3 == 0:
gc.collect()
# Final scalar = a - c * (mu_P * p + mu_c * Z)
crypto.sc_mul_into(tmp_sc, mu_P, p)
crypto.sc_muladd_into(tmp_sc, mu_C, z, tmp_sc)
crypto.sc_mulsub_into(tmp_sc, c, tmp_sc, a)
crypto.encodeint_into(mg_buff[index + 1], tmp_sc)
mg_buff.append(crypto.encodeint(sc1))
mg_buff.append(sD)
return mg_buff
def _key_vector(rows):
@ -287,13 +479,6 @@ def _key_matrix(rows, cols):
return rv
def _generate_random_vector(n):
"""
Generates vector of random scalars
"""
return [crypto.random_scalar() for _ in range(0, n)]
def _hasher_message(message):
"""
Returns incremental hasher for MLSAG

View File

@ -1,6 +1,10 @@
from apps.monero.xmr import crypto
from apps.monero.xmr.keccak_hasher import KeccakXmrArchive
if False:
from typing import List, Union
from apps.monero.xmr.serialize_messages.tx_rsig_bulletproof import Bulletproof
class PreMlsagHasher:
"""
@ -19,23 +23,23 @@ class PreMlsagHasher:
self.state = 1
def set_message(self, message):
def set_message(self, message: bytes):
self.kc_master.update(message)
def set_type_fee(self, rv_type, fee):
def set_type_fee(self, rv_type: int, fee: int):
if self.state != 1:
raise ValueError("State error")
self.state = 2
self.rtcsig_hasher.uint(rv_type, 1) # UInt8
self.rtcsig_hasher.uvarint(fee) # UVarintType
def set_ecdh(self, ecdh):
def set_ecdh(self, ecdh: bytes):
if self.state != 2 and self.state != 3 and self.state != 4:
raise ValueError("State error")
self.state = 4
self.rtcsig_hasher.buffer(ecdh)
def set_out_pk_commitment(self, out_pk_commitment):
def set_out_pk_commitment(self, out_pk_commitment: bytes):
if self.state != 4 and self.state != 5:
raise ValueError("State error")
self.state = 5
@ -50,7 +54,7 @@ class PreMlsagHasher:
self.kc_master.update(c_hash)
self.rtcsig_hasher = None
def rsig_val(self, p, bulletproof, raw=False):
def rsig_val(self, p: Union[bytes, List[bytes], Bulletproof], raw: bool = False):
if self.state == 8:
raise ValueError("State error")
@ -66,31 +70,22 @@ class PreMlsagHasher:
self.rsig_hasher.update(p)
return
if bulletproof:
self.rsig_hasher.update(p.A)
self.rsig_hasher.update(p.S)
self.rsig_hasher.update(p.T1)
self.rsig_hasher.update(p.T2)
self.rsig_hasher.update(p.taux)
self.rsig_hasher.update(p.mu)
for i in range(len(p.L)):
self.rsig_hasher.update(p.L[i])
for i in range(len(p.R)):
self.rsig_hasher.update(p.R[i])
self.rsig_hasher.update(p.a)
self.rsig_hasher.update(p.b)
self.rsig_hasher.update(p.t)
# Hash Bulletproof
self.rsig_hasher.update(p.A)
self.rsig_hasher.update(p.S)
self.rsig_hasher.update(p.T1)
self.rsig_hasher.update(p.T2)
self.rsig_hasher.update(p.taux)
self.rsig_hasher.update(p.mu)
for i in range(len(p.L)):
self.rsig_hasher.update(p.L[i])
for i in range(len(p.R)):
self.rsig_hasher.update(p.R[i])
self.rsig_hasher.update(p.a)
self.rsig_hasher.update(p.b)
self.rsig_hasher.update(p.t)
else:
for i in range(64):
self.rsig_hasher.update(p.asig.s0[i])
for i in range(64):
self.rsig_hasher.update(p.asig.s1[i])
self.rsig_hasher.update(p.asig.ee)
for i in range(64):
self.rsig_hasher.update(p.Ci[i])
def get_digest(self):
def get_digest(self) -> bytes:
if self.state != 6:
raise ValueError("State error")
self.state = 8

View File

@ -1,8 +1,11 @@
from apps.monero.xmr import crypto
if False:
from typing import Tuple, Optional
from typing import List, Tuple, Optional, Dict
from apps.monero.xmr.types import Ge25519, Sc25519
from apps.monero.xmr.credentials import AccountCreds
Subaddresses = Dict[bytes, Tuple[int, int]]
class XmrException(Exception):
@ -13,22 +16,20 @@ class XmrNoSuchAddressException(XmrException):
pass
def get_subaddress_secret_key(secret_key, index=None, major=None, minor=None):
def get_subaddress_secret_key(secret_key: Sc25519, major: int = 0, minor: int = 0):
"""
Builds subaddress secret key from the subaddress index
Hs(SubAddr || a || index_major || index_minor)
"""
if index:
major = index.major
minor = index.minor
if major == 0 and minor == 0:
return secret_key
return crypto.get_subaddress_secret_key(secret_key, major, minor)
def get_subaddress_spend_public_key(view_private, spend_public, major, minor):
def get_subaddress_spend_public_key(
view_private: Sc25519, spend_public: Ge25519, major: int, minor: int
) -> Ge25519:
"""
Generates subaddress spend public key D_{major, minor}
"""
@ -41,7 +42,9 @@ def get_subaddress_spend_public_key(view_private, spend_public, major, minor):
return D
def derive_subaddress_public_key(out_key, derivation, output_index):
def derive_subaddress_public_key(
out_key: Ge25519, derivation: Ge25519, output_index: int
) -> Ge25519:
"""
out_key - H_s(derivation || varint(output_index))G
"""
@ -52,7 +55,7 @@ def derive_subaddress_public_key(out_key, derivation, output_index):
return point4
def generate_key_image(public_key, secret_key):
def generate_key_image(public_key: bytes, secret_key: Sc25519) -> Ge25519:
"""
Key image: secret_key * H_p(pub_key)
"""
@ -62,43 +65,67 @@ def generate_key_image(public_key, secret_key):
def is_out_to_account(
subaddresses: dict,
subaddresses: Subaddresses,
out_key: Ge25519,
derivation: Ge25519,
additional_derivations: list,
additional_derivation: Ge25519,
output_index: int,
creds: Optional[AccountCreds] = None,
sub_addr_major: int = None,
sub_addr_minor: int = None,
):
"""
Checks whether the given transaction is sent to the account.
Searches subaddresses for the computed subaddress_spendkey.
Corresponds to is_out_to_acc_precomp() in the Monero codebase.
If found, returns (major, minor), derivation, otherwise None.
If (creds, sub_addr_major, sub_addr_minor) are specified,
subaddress is checked directly (avoids the need to store
large subaddresses dicts).
"""
subaddress_spendkey = crypto.encodepoint(
derive_subaddress_public_key(out_key, derivation, output_index)
subaddress_spendkey_obj = derive_subaddress_public_key(
out_key, derivation, output_index
)
if subaddress_spendkey in subaddresses:
return subaddresses[subaddress_spendkey], derivation
if additional_derivations and len(additional_derivations) > 0:
if output_index >= len(additional_derivations):
raise ValueError("Wrong number of additional derivations")
subaddress_spendkey = derive_subaddress_public_key(
out_key, additional_derivations[output_index], output_index
sub_pub_key = None
if creds and sub_addr_major is not None and sub_addr_minor is not None:
sub_pub_key = get_subaddress_spend_public_key(
creds.view_key_private,
creds.spend_key_public,
sub_addr_major,
sub_addr_minor,
)
subaddress_spendkey = crypto.encodepoint(subaddress_spendkey)
if crypto.point_eq(subaddress_spendkey_obj, sub_pub_key):
return (sub_addr_major, sub_addr_minor), derivation
if subaddresses:
subaddress_spendkey = crypto.encodepoint(subaddress_spendkey_obj)
if subaddress_spendkey in subaddresses:
return (
subaddresses[subaddress_spendkey],
additional_derivations[output_index],
)
return subaddresses[subaddress_spendkey], derivation
if additional_derivation:
subaddress_spendkey_obj = derive_subaddress_public_key(
out_key, additional_derivation, output_index
)
if sub_pub_key and crypto.point_eq(subaddress_spendkey_obj, sub_pub_key):
return (sub_addr_major, sub_addr_minor), additional_derivation
if subaddresses:
subaddress_spendkey = crypto.encodepoint(subaddress_spendkey_obj)
if subaddress_spendkey in subaddresses:
return subaddresses[subaddress_spendkey], additional_derivation
return None
def generate_tx_spend_and_key_image(
ack, out_key, recv_derivation, real_output_index, received_index: tuple
ack: AccountCreds,
out_key: Ge25519,
recv_derivation: Ge25519,
real_output_index: int,
received_index: Tuple[int, int],
) -> Optional[Tuple[Sc25519, Ge25519]]:
"""
Generates UTXO spending key and key image.
@ -156,12 +183,14 @@ def generate_tx_spend_and_key_image(
def generate_tx_spend_and_key_image_and_derivation(
creds,
subaddresses: dict,
creds: AccountCreds,
subaddresses: Subaddresses,
out_key: Ge25519,
tx_public_key: Ge25519,
additional_tx_public_keys: list,
additional_tx_public_key: Ge25519,
real_output_index: int,
sub_addr_major: int = None,
sub_addr_minor: int = None,
) -> Tuple[Sc25519, Ge25519, Ge25519]:
"""
Generates UTXO spending key and key image and corresponding derivation.
@ -172,26 +201,31 @@ def generate_tx_spend_and_key_image_and_derivation(
:param subaddresses:
:param out_key: real output (from input RCT) destination key
:param tx_public_key: R, transaction public key
:param additional_tx_public_keys: Additional Rs, for subaddress destinations
:param additional_tx_public_key: Additional Rs, for subaddress destinations
:param real_output_index: index of the real output in the RCT
:param sub_addr_major: subaddress major index
:param sub_addr_minor: subaddress minor index
:return:
"""
recv_derivation = crypto.generate_key_derivation(
tx_public_key, creds.view_key_private
)
additional_recv_derivations = []
for add_pub_key in additional_tx_public_keys:
additional_recv_derivations.append(
crypto.generate_key_derivation(add_pub_key, creds.view_key_private)
)
additional_recv_derivation = (
crypto.generate_key_derivation(additional_tx_public_key, creds.view_key_private)
if additional_tx_public_key
else None
)
subaddr_recv_info = is_out_to_account(
subaddresses,
out_key,
recv_derivation,
additional_recv_derivations,
additional_recv_derivation,
real_output_index,
creds,
sub_addr_major,
sub_addr_minor,
)
if subaddr_recv_info is None:
raise XmrNoSuchAddressException("No such addr")
@ -202,7 +236,12 @@ def generate_tx_spend_and_key_image_and_derivation(
return xi, ki, recv_derivation
def compute_subaddresses(creds, account: int, indices, subaddresses=None):
def compute_subaddresses(
creds: AccountCreds,
account: int,
indices: List[int],
subaddresses: Optional[Subaddresses] = None,
) -> Subaddresses:
"""
Computes subaddress public spend key for receiving transactions.
@ -228,12 +267,12 @@ def compute_subaddresses(creds, account: int, indices, subaddresses=None):
return subaddresses
def generate_keys(recovery_key):
def generate_keys(recovery_key: Sc25519) -> Tuple[Sc25519, Ge25519]:
pub = crypto.scalarmult_base(recovery_key)
return recovery_key, pub
def generate_monero_keys(seed):
def generate_monero_keys(seed: bytes) -> Tuple[Sc25519, Ge25519, Sc25519, Ge25519]:
"""
Generates spend key / view key from the seed in the same manner as Monero code does.
@ -246,7 +285,9 @@ def generate_monero_keys(seed):
return spend_sec, spend_pub, view_sec, view_pub
def generate_sub_address_keys(view_sec, spend_pub, major, minor):
def generate_sub_address_keys(
view_sec: Sc25519, spend_pub: Ge25519, major: int, minor: int
) -> Tuple[Ge25519, Ge25519]:
if major == 0 and minor == 0: # special case, Monero-defined
return spend_pub, crypto.scalarmult_base(view_sec)
@ -257,7 +298,7 @@ def generate_sub_address_keys(view_sec, spend_pub, major, minor):
return D, C
def commitment_mask(key, buff=None):
def commitment_mask(key: bytes, buff: Optional[Sc25519] = None) -> Sc25519:
"""
Generates deterministic commitment mask for Bulletproof2
"""

View File

@ -12,8 +12,13 @@ import gc
from apps.monero.xmr import crypto
if False:
from typing import List
from apps.monero.xmr.types import Sc25519
from apps.monero.xmr.serialize_messages.tx_rsig_bulletproof import Bulletproof
def prove_range_bp_batch(amounts, masks):
def prove_range_bp_batch(amounts: List[int], masks: List[Sc25519]) -> Bulletproof:
"""Calculates Bulletproof in batches"""
from apps.monero.xmr import bulletproof as bp
@ -25,7 +30,7 @@ def prove_range_bp_batch(amounts, masks):
return bp_proof
def verify_bp(bp_proof, amounts, masks):
def verify_bp(bp_proof: Bulletproof, amounts: List[int], masks: List[Sc25519]) -> bool:
"""Verifies Bulletproof"""
from apps.monero.xmr import bulletproof as bp

View File

@ -5,7 +5,6 @@ from apps.monero.xmr.serialize.message_types import BlobType
_c0 = const(0)
_c1 = const(1)
_c32 = const(32)
_c64 = const(64)
#
# cryptonote_basic.h

View File

@ -1,24 +0,0 @@
from micropython import const
from apps.monero.xmr.serialize.message_types import ContainerType, MessageType
from apps.monero.xmr.serialize_messages.base import ECKey
_c0 = const(0)
class KeyV(ContainerType):
FIX_SIZE = _c0
ELEM_TYPE = ECKey
class KeyM(ContainerType):
FIX_SIZE = _c0
ELEM_TYPE = KeyV
class CtKey(MessageType):
__slots__ = ("dest", "mask")
@classmethod
def f_specs(cls):
return (("dest", ECKey), ("mask", ECKey))

View File

@ -0,0 +1,6 @@
class CtKey:
__slots__ = ("dest", "mask")
def __init__(self, dest, mask):
self.dest = dest
self.mask = mask

View File

@ -1,68 +1,13 @@
from micropython import const
from apps.monero.xmr.serialize.base_types import UInt8, UVarintType
from apps.monero.xmr.serialize.message_types import (
ContainerType,
MessageType,
VariantType,
)
from apps.monero.xmr.serialize_messages.base import ECPublicKey, Hash, KeyImage
_c0 = const(0)
_c1 = const(1)
_c32 = const(32)
_c64 = const(64)
class TxoutToScript(MessageType):
__slots__ = ("keys", "script")
VARIANT_CODE = 0x0
@classmethod
def f_specs(cls):
return (("keys", ContainerType, ECPublicKey), ("script", ContainerType, UInt8))
class TxoutToKey(MessageType):
__slots__ = ("key",)
VARIANT_CODE = 0x2
@classmethod
def f_specs(cls):
return (("key", ECPublicKey),)
class TxoutToScriptHash(MessageType):
__slots__ = ("hash",)
VARIANT_CODE = 0x1
@classmethod
def f_specs(cls):
return (("hash", Hash),)
class TxoutTargetV(VariantType):
@classmethod
def f_specs(cls):
return (
("txout_to_script", TxoutToScript),
("txout_to_scripthash", TxoutToScriptHash),
("txout_to_key", TxoutToKey),
)
class TxinGen(MessageType):
__slots__ = ("height",)
VARIANT_CODE = 0xFF
@classmethod
def f_specs(cls):
return (("height", UVarintType),)
from apps.monero.xmr.serialize.base_types import UVarintType
from apps.monero.xmr.serialize.message_types import ContainerType, MessageType
from apps.monero.xmr.serialize_messages.base import KeyImage
class TxinToKey(MessageType):
__slots__ = ("amount", "key_offsets", "k_image")
VARIANT_CODE = 0x2
VARIANT_CODE = const(0x2)
@classmethod
def f_specs(cls):
@ -71,32 +16,3 @@ class TxinToKey(MessageType):
("key_offsets", ContainerType, UVarintType),
("k_image", KeyImage),
)
class TxinToScript(MessageType):
__slots__ = ()
VARIANT_CODE = _c0
class TxinToScriptHash(MessageType):
__slots__ = ()
VARIANT_CODE = _c1
class TxInV(VariantType):
@classmethod
def f_specs(cls):
return (
("txin_gen", TxinGen),
("txin_to_script", TxinToScript),
("txin_to_scripthash", TxinToScriptHash),
("txin_to_key", TxinToKey),
)
class TxOut(MessageType):
__slots__ = ("amount", "target")
@classmethod
def f_specs(cls):
return (("amount", UVarintType), ("target", TxoutTargetV))

View File

@ -1,9 +1,17 @@
from apps.monero.xmr.serialize.message_types import MessageType
from micropython import const
from apps.monero.xmr.serialize.message_types import ContainerType, MessageType
from apps.monero.xmr.serialize_messages.base import ECKey
from apps.monero.xmr.serialize_messages.ct_keys import KeyV
class _KeyV(ContainerType):
FIX_SIZE = const(0)
ELEM_TYPE = ECKey
class Bulletproof(MessageType):
__slots__ = ("A", "S", "T1", "T2", "taux", "mu", "L", "R", "a", "b", "t")
@classmethod
def f_specs(cls):
return (
@ -13,8 +21,8 @@ class Bulletproof(MessageType):
("T2", ECKey),
("taux", ECKey),
("mu", ECKey),
("L", KeyV),
("R", KeyV),
("L", _KeyV),
("R", _KeyV),
("a", ECKey),
("b", ECKey),
("t", ECKey),

View File

@ -0,0 +1,308 @@
from common import *
if not utils.BITCOIN_ONLY:
from apps.monero.xmr import crypto, mlsag
from apps.monero.xmr.serialize_messages.tx_ct_key import CtKey
from trezor.crypto import random
import ubinascii
class TmpKey:
def __init__(self, d, c):
self.dest = d
self.commitment = c
@unittest.skipUnless(not utils.BITCOIN_ONLY, "altcoin")
class TestMoneroClsag(unittest.TestCase):
def verify_clsag(self, msg, ss, sc1, sI, sD, pubs, C_offset):
n = len(pubs)
c = crypto.new_scalar()
D_8 = crypto.new_point()
tmp_bf = bytearray(32)
C_offset_bf = crypto.encodepoint(C_offset)
crypto.sc_copy(c, sc1)
crypto.point_mul8_into(D_8, sD)
hsh_P = crypto.get_keccak() # domain, I, D, P, C, C_offset
hsh_C = crypto.get_keccak() # domain, I, D, P, C, C_offset
hsh_P.update(mlsag._HASH_KEY_CLSAG_AGG_0)
hsh_C.update(mlsag._HASH_KEY_CLSAG_AGG_1)
def hsh_PC(x):
hsh_P.update(x)
hsh_C.update(x)
for x in pubs:
hsh_PC(x.dest)
for x in pubs:
hsh_PC(x.commitment)
hsh_PC(crypto.encodepoint_into(tmp_bf, sI))
hsh_PC(crypto.encodepoint_into(tmp_bf, sD))
hsh_PC(C_offset_bf)
mu_P = crypto.decodeint(hsh_P.digest())
mu_C = crypto.decodeint(hsh_C.digest())
c_to_hash = crypto.get_keccak() # domain, P, C, C_offset, message, L, R
c_to_hash.update(mlsag._HASH_KEY_CLSAG_ROUND)
for i in range(len(pubs)):
c_to_hash.update(pubs[i].dest)
for i in range(len(pubs)):
c_to_hash.update(pubs[i].commitment)
c_to_hash.update(C_offset_bf)
c_to_hash.update(msg)
c_p = crypto.new_scalar()
c_c = crypto.new_scalar()
L = crypto.new_point()
R = crypto.new_point()
tmp_pt = crypto.new_point()
i = 0
while i < n:
crypto.sc_mul_into(c_p, mu_P, c)
crypto.sc_mul_into(c_c, mu_C, c)
C_P = crypto.point_sub(
crypto.decodepoint_into(tmp_pt, pubs[i].commitment), C_offset
)
crypto.add_keys2_into(
L, ss[i], c_p, crypto.decodepoint_into(tmp_pt, pubs[i].dest)
)
crypto.point_add_into(L, L, crypto.scalarmult_into(tmp_pt, C_P, c_c))
HP = crypto.hash_to_point(pubs[i].dest)
crypto.add_keys3_into(R, ss[i], HP, c_p, sI)
crypto.point_add_into(R, R, crypto.scalarmult_into(tmp_pt, D_8, c_c))
chasher = c_to_hash.copy()
chasher.update(crypto.encodepoint_into(tmp_bf, L))
chasher.update(crypto.encodepoint_into(tmp_bf, R))
crypto.decodeint_into(c, chasher.digest())
i += 1
res = crypto.sc_sub(c, sc1)
if not crypto.sc_eq(res, crypto.sc_0()):
raise ValueError("Signature error")
def gen_clsag_test(self, ring_size=11, index=None):
res = self.gen_clsag_sig(ring_size=11, index=index)
msg, scalars, sc1, sI, sD, ring2, Cp = res
self.verify_clsag(msg, scalars, sc1, sI, sD, ring2, Cp)
def gen_clsag_sig(self, ring_size=11, index=None):
msg = random.bytes(32)
amnt = crypto.sc_init(random.uniform(0xFFFFFF) + 12)
priv = crypto.random_scalar()
msk = crypto.random_scalar()
alpha = crypto.random_scalar()
P = crypto.scalarmult_base(priv)
C = crypto.add_keys2(msk, amnt, crypto.xmr_H())
Cp = crypto.add_keys2(alpha, amnt, crypto.xmr_H())
ring = []
for i in range(ring_size - 1):
tk = TmpKey(
crypto.encodepoint(crypto.scalarmult_base(crypto.random_scalar())),
crypto.encodepoint(crypto.scalarmult_base(crypto.random_scalar())),
)
ring.append(tk)
index = index if index is not None else random.uniform(len(ring))
ring.insert(index, TmpKey(crypto.encodepoint(P), crypto.encodepoint(C)))
ring2 = list(ring)
mg_buffer = []
self.assertTrue(
crypto.point_eq(
crypto.scalarmult_base(priv), crypto.decodepoint(ring[index].dest)
)
)
self.assertTrue(
crypto.point_eq(
crypto.scalarmult_base(crypto.sc_sub(msk, alpha)),
crypto.point_sub(crypto.decodepoint(ring[index].commitment), Cp),
)
)
mlsag.generate_clsag_simple(
msg, ring, CtKey(priv, msk), alpha, Cp, index, mg_buffer,
)
sD = crypto.decodepoint(mg_buffer[-1])
sc1 = crypto.decodeint(mg_buffer[-2])
scalars = [crypto.decodeint(x) for x in mg_buffer[1:-2]]
H = crypto.new_point()
sI = crypto.new_point()
crypto.hash_to_point_into(H, crypto.encodepoint(P))
crypto.scalarmult_into(sI, H, priv) # I = p*H
return msg, scalars, sc1, sI, sD, ring2, Cp
def verify_monero_generated(self, clsag):
msg = ubinascii.unhexlify(clsag["msg"])
sI = crypto.decodepoint(ubinascii.unhexlify(clsag["sI"]))
sD = crypto.decodepoint(ubinascii.unhexlify(clsag["sD"]))
sc1 = crypto.decodeint(ubinascii.unhexlify(clsag["sc1"]))
Cout = crypto.decodepoint(ubinascii.unhexlify(clsag["cout"]))
scalars = [crypto.decodeint(ubinascii.unhexlify(x)) for x in clsag["ss"]]
ring = []
for e in clsag["ring"]:
ring.append(TmpKey(ubinascii.unhexlify(e[0]), ubinascii.unhexlify(e[1])))
self.verify_clsag(msg, scalars, sc1, sI, sD, ring, Cout)
def test_monero_generated_clsag_01(self):
clsag = {
"msg": "0100000000000000000000000000000000000000000000000000000000000000",
"cout": "8e3afb92d8ae1264417489259e38f7205a62baea86ae9592cd91988b9cc48102",
"sI": "a1c7f4a316ddd16374fe495d402be60566047ae5a1352554e98ebff118705303",
"sD": "cd80b5c7f3f597de6e20bcef669a4ba9eb3eb89ead12ab1c24c92acd609afcb2",
"sc1": "cf4f48ed60771d4e8d02e9e0af37281ceeb66573bd528ac256a7e17794a75602",
"ss":
["aaeffa564b5b0ff1e4ed72c9b595cd0241ac64eeb41b902a35688e369922d704"
, "1defc134a853252d734d19b29d8f2fabc85a8ae24ebcf8f050d4daf8a335e901"
, "cdf9ac576f0c7ceb7eb22c1a1254a801d0d2915e59870be8b1ab68cd1281120d"
, "d1973493d8224aaa9732878b9a88d448ea16185f94e5bafd82816277682fa108"
, "a130e076845e512687575942bf3694bcb44eb19eb1181af9a1fc2254949b7c0f"
, "26f5b6ea154d6bd4a969c742563d75f1bfcd5ded3af78669e45ba95e76c48605"
, "5b695d3be46b826fd11e043028dee2aa25cf36910e86537fcd1cd3f5cb49650e"
, "37e811ebb4a2b9c35556b4af911a03a93468f599956c034092c3ece9e1169208"
, "a361ceec9aacd65da6d3e686fbcd0c1aef26096321be7f01653157ee6096a201"
, "f9b762ef1df69bb12ca76a97dce11f7840b8ec63c3dc2683f7ae71cb79c49103"
, "ea010fa6a35f3bd3d7899a7a2a8df4d3ef9c9dfbbd56fe43ff5c7442821d3508"
]
, "ring": [
["241c0295b4c3a149e5ac7997963e125d0fc6cc8adad9349df3b01ff611936c87",
"3a24a4c418ccb2ceb83672d01534a73ff1e9f548937d5ddd7f1971c9b398868c"],
["ec432ccfbf730077cb2d8c59968e2796148a590eec7928ecf268d883ced0de5b",
"2973d6e9c27538fd0f7c003e014311e9403dcb6e7d86b66df65176a579943bda"],
["0cfeafc313a6a2e60110778d53d61fa1705e9049b8afba0f51c1127f6855c07f",
"ffa4d4c77202907832294243a96886920017b67fbe5b3800bcc1457c4a4a1ff0"],
["bd4eca22dc010a214524901b88bdda27e427217ff784c47520ee76743caba036",
"e07135f8398459133c2969184e70610b9b995f73e44acf54b6eaed6227e68bbc"],
["73c8d57d0128c99fc2ab0be8cee5fe5c1288b98e51822a6681846035fcc53fea",
"2987499fde3f4353013206d89fe2d7c6ad3cd9a66c9a36d17749e39112513572"],
["385c538901b79c6bd2ddea5191e808b1414c9dfdcaf424841d843dd788cb89ad",
"ec5f987fe138c6cb1d47ff75d77852b7c0a94ba1f0b93d22c0463f75986605bd"],
["fed06cb761745a6f087d1af13f84670ecbf1523d72b46e8bd0698d1cdfb398bc",
"5d81df981fb885f947b9404cb63cb06fe4e001be281f2bdfb3c638d54ec6e49e"],
["667d1edfb83a17bd81fcf7831362b6c9038f26340ee1fe56d41f62cb0b32e989",
"e9ceba97867b43cd5420c94fa61cc5f11e440e261df74dfc8b1c07ec4b13aa3c"],
["e1e76da5bd52fc065f9af40efde5f733f9673974d14c6af8d200d8576ac3a90d",
"97358d6ddad38b2707fb864bfcaaab935851af66d50bcbac569d159d740bdf71"],
["4fd5d0db88283c63905d5095a76b11a75337e43f403f8469175ba9c49741552e",
"af0ab85872a6355d5c82c1f9a2a41488146e19b272887a1f7385cc26bef3f1d8"],
["37e1a4c49a22340fa5ac2c22c1b7a891e7191cdc53911700a317c0d8b92bbf4e",
"5c89d29dad77de7d76ece8bb81c7c8cd15008f63c5a14ab1c984b3833e7bbce3"]
]
}
self.verify_monero_generated(clsag)
def test_monero_generated_clsag_02(self):
clsag = {
"msg": "0100000000000000000000000000000000000000000000000000000000000000",
"cout": "fdf2503d3217dbf73ababd16f5ab5a63d64c047db1d02b0888a50d2570f3a793",
"sI": "917fdd3086c056503ffdb1840f03c78d48bfe6d9d60b4efb194bd9798d03acaa",
"sD": "769d0ca9b272ac02c5efad7df6b5c00f2995c99ca80f4597136decba9a0dd36f",
"sc1": "fe5c7eb39a32d2aea12e6d127d847b72ea810bfbf3d5bbe23c40e7abdd12900e",
"ss":
["da2940c66cc2405032d959325c8804e216f76b36e71b2ae6b76417ed9c10a80a"
, "ca763505c2e5ebacf72098f8cba89ea6826aa448501f03d439c7a838a88bba0e"
, "b2eadee4c121e85b0c2a09d56c665ba19ee8ebc451f1e9e96cf72c874f945104"
, "5a79523fdc0df9a54ab3937c878bd5a02e62bff77efc338728deb060ecda4509"
, "dfadddc51866cde5206269270f44ca2f6350ca0b1328a968773fcacf57031502"
, "a964f3549a10fc8bdb2f8217df0e9b08e90477be19a665b94b73ce417622450b"
, "48e805427109268b04bf378c869501dbebb79c0cbe664bf7eb0ca222376d1c0f"
, "33f36d9a699e92a66d4b9fdf6c1123ae99701b117fbe8f0af9faec51e45eb409"
, "25ef746a03aaf59701d1d47ea3b9e9f092662cebc9d44902ce18e81cc5035f01"
, "2ba3022d4f9b57da7429499715592073f1608cf270318840a5fd3890bbf5950a"
, "8149ec0d965c9881d6a4adedca7d3c9090359dbfae56dbab526be102722aab09"
]
, "ring": [
["081b048be784e1ff6f3b7ebe602690c27723b5d9952405bcdcbed31d16125067",
"6090eccb73d2e1fc7bc7644a4fad04e5fe93d953a1258307c44d5b23cd636bf9"],
["e2f0f100f1634d7c7dd5a09bc6dd7ee53506d73536aa743e8ea049528e4cb2aa",
"632438f9aeda72eb9c6c434391cf9fa2f71788bea598a5d5729a5d502865932a"],
["6744197cfde37ad1901d518f112c0f4d820c23122a016949e300eec2ab88916c",
"1b251d5b32e22de29a4f99a0ed1de32754636175075e21b25d7283036eb85541"],
["0e86bb7ee0b4728f2fedde7ac5019b54de7b2bb19b44d1864e6346dac6c171ab",
"5a3c85e93890f802d4148140733dcdcd676353fce1bd774ce28034fc2ec00253"],
["1847ce49d9552651395b2fa80637c131a31036f0bfc5abb63526701cd1a32320",
"a9cb55bc24e6e1fb894c511f2edd4b7bda4c75a608657d952e85bab83ec98a52"],
["5c5d0b678f5045b0304e3c48027bd7e9ccaee1dac4449ed1f34b204868ca5651",
"badf83ccba38f2194f924a4f7fb7c2fd966b1e16c1fddeb3658033aa009febe0"],
["81961aa4c241a91d498d8f3057b31373d9fc72b6e7d7f98bf497e3dfe705eeaa",
"a0e632fbb801d6bce99ef97d7bb6acd945aff5cd7fab56c0e6fec6900a3babd7"],
["cbd89f10ddf152bd9c756d145ef4cda1d56a31f1e1936759bee04b7a8a815c76",
"8b835b8180f36e79ba79528e0d3401f439cc1c7f99e4bcfb3cb4aa2b60b1afc1"],
["a7bc55e955a825730f5dcdc3f8126717d7647cbca8a6b90e08b77269aeed3533",
"8da31e80698c9b5181b2e8d9773136083a34e3e72c92134d8201d9c368d89284"],
["a7902cec90d3f2de25c8ddc87075159fd00f219a51a1e7dcac17c2b8a91887e9",
"2b1e848b6649abefbd6b399504a169252358e7ff6bde8fa7a773b9cf0a167069"],
["9fc3d5fb7de8cfc59982f7b20f3f5c145ad191088e7f59c10908dc5d55863bee",
"b8de2bc9bb46d475007230a92af14afb6f9dd2804b5c31355a282b40ccdadc92"]
]
}
self.verify_monero_generated(clsag)
def test_clsag(self):
self.gen_clsag_test(ring_size=11, index=None)
self.gen_clsag_test(ring_size=11, index=None)
self.gen_clsag_test(ring_size=11, index=None)
self.gen_clsag_test(ring_size=11, index=0)
self.gen_clsag_test(ring_size=11, index=9)
self.gen_clsag_test(ring_size=11, index=10)
self.gen_clsag_test(ring_size=2, index=0)
def test_clsag_invalid_sI(self):
res = self.gen_clsag_sig(ring_size=11, index=5)
msg, scalars, sc1, sI, sD, ring2, Cp = res
with self.assertRaises(ValueError):
sI = crypto.point_mul8(sI)
self.verify_clsag(msg, scalars, sc1, sI, sD, ring2, Cp)
def test_clsag_invalid_sD(self):
res = self.gen_clsag_sig(ring_size=11, index=5)
msg, scalars, sc1, sI, sD, ring2, Cp = res
with self.assertRaises(ValueError):
sD = crypto.scalarmult_base(crypto.random_scalar())
self.verify_clsag(msg, scalars, sc1, sI, sD, ring2, Cp)
def test_clsag_invalid_P(self):
res = self.gen_clsag_sig(ring_size=11, index=5)
msg, scalars, sc1, sI, sD, ring2, Cp = res
with self.assertRaises(ValueError):
ring2[5].dest = crypto.encodepoint(
crypto.point_mul8(crypto.decodepoint(ring2[5].dest))
)
self.verify_clsag(msg, scalars, sc1, sI, sD, ring2, Cp)
def test_clsag_invalid_P(self):
res = self.gen_clsag_sig(ring_size=11, index=5)
msg, scalars, sc1, sI, sD, ring2, Cp = res
with self.assertRaises(ValueError):
ring2[5].commitment = crypto.encodepoint(
crypto.point_mul8(crypto.decodepoint(ring2[5].dest))
)
self.verify_clsag(msg, scalars, sc1, sI, sD, ring2, Cp)
def test_clsag_invalid_Cp(self):
res = self.gen_clsag_sig(ring_size=11, index=5)
msg, scalars, sc1, sI, sD, ring2, Cp = res
with self.assertRaises(ValueError):
Cp = crypto.point_add(Cp, crypto.scalarmult_base(crypto.sc_init(1)))
self.verify_clsag(msg, scalars, sc1, sI, sD, ring2, Cp)
def test_clsag_invalid_index(self):
res = self.gen_clsag_sig(ring_size=11, index=5)
msg, scalars, sc1, sI, sD, ring2, Cp = res
with self.assertRaises(ValueError):
ring2[5], ring2[6] = ring2[6], ring2[5]
self.verify_clsag(msg, scalars, sc1, sI, sD, ring2, Cp)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,64 @@
from common import *
if not utils.BITCOIN_ONLY:
from trezor.crypto import chacha20poly1305
from apps.monero.signing import offloading_keys
from apps.monero.signing import step_09_sign_input
from apps.monero.signing.state import State
import ubinascii
@unittest.skipUnless(not utils.BITCOIN_ONLY, "altcoin")
class TestMoneroProto(unittest.TestCase):
def test_sign_keys(self):
mst = ubinascii.unhexlify(b"ca3bbe08a178a4508c3992a47ba775799e7626a365ed136e803fe5f2df2ce01c")
self.assertEqual(offloading_keys.key_signature(mst, 0, True)[:12], ubinascii.unhexlify(b'bb665d97ac7c77995578e352'))
self.assertEqual(offloading_keys.key_signature(mst, 0, False), ubinascii.unhexlify(b'87bb70af81bb7325f73e8b962167579454d126ff8ee51472922d7c103fc60f5f'))
self.assertEqual(offloading_keys.key_signature(mst, 3, True)[:12], ubinascii.unhexlify(b'b2ef8e4e4eec72ce3096622a'))
self.assertEqual(offloading_keys.key_signature(mst, 3, False), ubinascii.unhexlify(b'e4331602a83a68c892a83693a1b961564048d9349111b85b8b4b52a1adcf36da'))
def test_sig_seal(self):
mst = ubinascii.unhexlify(b"ca3bbe08a178a4508c3992a47ba775799e7626a365ed136e803fe5f2df2ce01c")
st = State(None)
st.last_step = st.STEP_SIGN
st.opening_key = mst
st.current_input_index = 3
mg_buff = [
'0b',
'02fe9ee789007254215b41351109f186620624a3c1ad2ba89628194528672adf04f900ebf9ad3b0cc1ac9ae1f03167f74d6e04175df5001c91d09d29dbefd6bc0b',
'021d46f6db8a349caca48a4dfee155b9dee927d0f25cdf5bcd724358c611b47906de6cedad47fd26070927f3954bcaf7a0e126699bf961ca4e8124abefe8aaeb05',
'02ae933994effe2b348b09bfab783bf9adb58b09659d8f5bd058cca252d763b600541807dcb0ea9fe253e59f23ce36cc811d627acae5e2abdc00b7ed155f3e6b0f',
'0203dd7138c7378444fe3c1b1572a351f88505aeab2d9f8ed4a8f67d66e76983072d8ae6e496b3953a8603543c2dc64749ee15fe3575e4505b502bfe696f06690e',
'0287b572b6c096bc11a8c10fe1fc4ba2085633f8e1bdd2e39df8f46c9bf733ca068261d8006f22ee2bfaf4366e26d42b00befdddd9058a5c87a0f39c757f121909',
'021e2ea38aa07601e07a3d7623a97e68d3251525304d2a748548c7b46d07c20b0c78506b19cae49d569d0a8c4979c74f7d8d19f7e595d307ddf00faf3d8f621c0d',
'0214f758c8fb4a521a1e3d25b9fb535974f6aab1c1dda5988e986dda7e17140909a7b7bdb3d5e17a2ebd5deb3530d10c6f5d6966f525c1cbca408059949ff65304',
'02f707c4a37066a692986ddfdd2ca71f68c6f45a956d45eaf6e8e7a2e5272ac3033eb26ca2b55bf86e90ab8ddcdbad88a82ded88deb552614190440169afcee004',
'02edb8a5b8cc02a2e03b95ea068084ae2496f21d4dfd0842c63836137e37047b06d5a0160994396c98630d8b47878e9c18fea4fb824588c143e05c4b18bfea2301',
'02aa59c2ef76ac97c261279a1c6ed3724d66a437fe8df0b85e8858703947a2b10f04e49912a0626c09849c3b4a3ea46166cd909b9fd561257730c91cbccf4abe07',
'02c64a98c59c4a3d7c583de65404c5a54b350a25011dfca70cd84e3f6e570428026236028fce31bfd8d9fc5401867ab5349eb0859c65df05b380899a7bdfee9003',
'03da465e27f7feec31353cb668f0e8965391f983b06c0684b35b00af38533603',
]
mg_buff = [ubinascii.unhexlify(x) for x in mg_buff]
mg_buff_b = list(mg_buff)
mg_res = step_09_sign_input._protect_signature(st, mg_buff)
iv = offloading_keys.key_signature(mst, st.current_input_index, True)[:12]
key = offloading_keys.key_signature(mst, st.current_input_index, False)
cipher = chacha20poly1305(key, iv)
ciphertext = cipher.encrypt(b"".join(mg_buff_b))
ciphertext += cipher.finish()
self.assertEqual(b"".join(mg_res), ciphertext)
cipher = chacha20poly1305(key, iv)
ciphertext = b"".join(mg_res)
exp_tag, ciphertext = ciphertext[-16:], ciphertext[:-16]
plaintext = cipher.decrypt(ciphertext)
tag = cipher.finish()
self.assertEqual(tag, exp_tag)
self.assertEqual(plaintext, b"".join(mg_buff_b))
if __name__ == "__main__":
unittest.main()

View File

@ -11,72 +11,14 @@ if not utils.BITCOIN_ONLY:
from apps.monero.xmr.serialize.readwriter import MemoryReaderWriter
from apps.monero.xmr.serialize_messages.base import ECPoint
from apps.monero.xmr.serialize_messages.tx_prefix import (
TxinGen,
TxinToKey,
TxInV,
TxOut,
TxoutToKey,
)
if not utils.BITCOIN_ONLY:
class XmrTstData(object):
"""Simple tests data generator"""
def __init__(self, *args, **kwargs):
super(XmrTstData, self).__init__()
self.ec_offset = 0
def reset(self):
self.ec_offset = 0
def generate_ec_key(self, use_offset=True):
"""
Returns test EC key, 32 element byte array
:param use_offset:
:return:
"""
offset = 0
if use_offset:
offset = self.ec_offset
self.ec_offset += 1
return bytearray(range(offset, offset + 32))
def gen_transaction_prefix(self):
"""
Returns test transaction prefix
:return:
"""
vin = [
TxinToKey(
amount=123, key_offsets=[1, 2, 3, 2 ** 76], k_image=bytearray(range(32))
),
TxinToKey(
amount=456, key_offsets=[9, 8, 7, 6], k_image=bytearray(range(32, 64))
),
TxinGen(height=99),
]
vout = [
TxOut(amount=11, target=TxoutToKey(key=bytearray(range(32)))),
TxOut(amount=34, target=TxoutToKey(key=bytearray(range(64, 96)))),
]
msg = TransactionPrefix(
version=2, unlock_time=10, vin=vin, vout=vout, extra=list(range(31))
)
return msg
@unittest.skipUnless(not utils.BITCOIN_ONLY, "altcoin")
class TestMoneroSerializer(unittest.TestCase):
def __init__(self, *args, **kwargs):
super(TestMoneroSerializer, self).__init__(*args, **kwargs)
self.tdata = XmrTstData()
def setUp(self):
self.tdata.reset()
def test_varint(self):
"""
@ -110,19 +52,6 @@ class TestMoneroSerializer(unittest.TestCase):
test_deser = ECPoint.load(MemoryReaderWriter(writer.get_buffer()))
self.assertEqual(ec_data, test_deser)
def test_simple_msg(self):
"""
TxinGen
:return:
"""
msg = TxinGen(height=42)
writer = MemoryReaderWriter()
TxinGen.dump(writer, msg)
test_deser = TxinGen.load(MemoryReaderWriter(writer.get_buffer()))
self.assertEqual(msg.height, test_deser.height)
def test_txin_to_key(self):
"""
TxinToKey
@ -139,22 +68,6 @@ class TestMoneroSerializer(unittest.TestCase):
self.assertEqual(msg.amount, test_deser.amount)
self.assertEqual(msg, test_deser)
def test_txin_variant(self):
"""
TxInV
:return:
"""
msg1 = TxinToKey(
amount=123, key_offsets=[1, 2, 3, 2 ** 76], k_image=bytearray(range(32))
)
writer = MemoryReaderWriter()
TxInV.dump(writer, msg1)
test_deser = TxInV.load(MemoryReaderWriter(writer.get_buffer()))
self.assertEqual(test_deser.__class__, TxinToKey)
self.assertEqual(msg1, test_deser)
if __name__ == "__main__":
unittest.main()