chore(core): decrease monero size by 2630 bytes

pull/2633/head
grdddj 2 years ago committed by matejcik
parent 16f1d3da86
commit 45b4b609db

@ -45,28 +45,31 @@ if __debug__:
return Failure(**kwargs)
async def diag(ctx, msg, **kwargs) -> Failure:
log.debug(__name__, "----diagnostics")
ins = msg.ins # local_cache_attribute
debug = log.debug # local_cache_attribute
debug(__name__, "----diagnostics")
gc.collect()
if msg.ins == 0:
if ins == 0:
check_mem(0)
return retit()
elif msg.ins == 1:
elif ins == 1:
check_mem(1)
micropython.mem_info(1)
return retit()
elif msg.ins == 2:
log.debug(__name__, "_____________________________________________")
log.debug(__name__, "_____________________________________________")
log.debug(__name__, "_____________________________________________")
elif ins == 2:
debug(__name__, "_____________________________________________")
debug(__name__, "_____________________________________________")
debug(__name__, "_____________________________________________")
return retit()
elif msg.ins == 3:
elif ins == 3:
pass
elif msg.ins == 4:
elif ins == 4:
total = 0
monero = 0
@ -78,7 +81,7 @@ if __debug__:
log.info(__name__, "Total modules: %s, Monero modules: %s", total, monero)
return retit()
elif msg.ins in [5, 6, 7]:
elif ins in [5, 6, 7]:
check_mem()
from apps.monero.xmr import bulletproof as bp
@ -95,19 +98,19 @@ if __debug__:
masks = [crypto.random_scalar(), crypto.random_scalar()]
check_mem("BP pre input")
if msg.ins == 5:
if ins == 5:
bp_res = bpi.prove_testnet(vals[0], masks[0])
check_mem("BP post prove")
bpi.verify_testnet(bp_res)
check_mem("BP post verify")
elif msg.ins == 6:
elif ins == 6:
bp_res = bpi.prove(vals[0], masks[0])
check_mem("BP post prove")
bpi.verify(bp_res)
check_mem("BP post verify")
elif msg.ins == 7:
elif ins == 7:
bp_res = bpi.prove_batch(vals, masks)
check_mem("BP post prove")
bpi.verify(bp_res)

@ -1,60 +1,64 @@
from typing import TYPE_CHECKING
from trezor import wire
from trezor.messages import MoneroAddress
from trezor.ui.layouts import show_address
from apps.common import paths
from apps.common.keychain import auto_keychain
from apps.monero import misc
from apps.monero.xmr import addresses, crypto_helpers, monero
from apps.monero.xmr.networks import net_version
if TYPE_CHECKING:
from trezor.messages import MoneroGetAddress
from trezor.messages import MoneroGetAddress, MoneroAddress
from trezor.wire import Context
from apps.common.keychain import Keychain
@auto_keychain(__name__)
async def get_address(
ctx: wire.Context, msg: MoneroGetAddress, keychain: Keychain
ctx: Context, msg: MoneroGetAddress, keychain: Keychain
) -> MoneroAddress:
from trezor import wire
from trezor.messages import MoneroAddress
from trezor.ui.layouts import show_address
from apps.common import paths
from apps.monero import misc
from apps.monero.xmr import addresses, crypto_helpers, monero
from apps.monero.xmr.networks import net_version
account = msg.account # local_cache_attribute
minor = msg.minor # local_cache_attribute
payment_id = msg.payment_id # local_cache_attribute
await paths.validate_path(ctx, keychain, msg.address_n)
creds = misc.get_creds(keychain, msg.address_n, msg.network_type)
addr = creds.address
have_subaddress = (
msg.account is not None
and msg.minor is not None
and (msg.account, msg.minor) != (0, 0)
account is not None and minor is not None and (account, minor) != (0, 0)
)
have_payment_id = msg.payment_id is not None
have_payment_id = payment_id is not None
if (msg.account is None) != (msg.minor is None):
if (account is None) != (minor is None):
raise wire.ProcessError("Invalid subaddress indexes")
if have_payment_id and have_subaddress:
raise wire.DataError("Subaddress cannot be integrated")
if have_payment_id:
assert msg.payment_id is not None
if len(msg.payment_id) != 8:
assert payment_id is not None
if len(payment_id) != 8:
raise ValueError("Invalid payment ID length")
addr = addresses.encode_addr(
net_version(msg.network_type, False, True),
crypto_helpers.encodepoint(creds.spend_key_public),
crypto_helpers.encodepoint(creds.view_key_public),
msg.payment_id,
payment_id,
)
if have_subaddress:
assert msg.account is not None
assert msg.minor is not None
assert account is not None
assert minor is not None
pub_spend, pub_view = monero.generate_sub_address_keys(
creds.view_key_private, creds.spend_key_public, msg.account, msg.minor
creds.view_key_private, creds.spend_key_public, account, minor
)
addr = addresses.encode_addr(
@ -67,7 +71,7 @@ async def get_address(
title = paths.address_n_to_str(msg.address_n)
await show_address(
ctx,
address=addr,
addr,
address_qr="monero:" + addr,
title=title,
)

@ -17,25 +17,27 @@ using the view key, which the host possess.
from micropython import const
from typing import TYPE_CHECKING
from trezor import utils, wire
from trezor.messages import MoneroGetTxKeyAck
from apps.common import paths
from apps.common.keychain import auto_keychain
from apps.monero import layout, misc
from apps.monero.xmr import chacha_poly, crypto, crypto_helpers
_GET_TX_KEY_REASON_TX_DERIVATION = const(1)
if TYPE_CHECKING:
from trezor.messages import MoneroGetTxKeyRequest
from trezor.messages import MoneroGetTxKeyRequest, MoneroGetTxKeyAck
from apps.common.keychain import Keychain
from trezor.wire import Context
@auto_keychain(__name__)
async def get_tx_keys(
ctx: wire.Context, msg: MoneroGetTxKeyRequest, keychain: Keychain
ctx: Context, msg: MoneroGetTxKeyRequest, keychain: Keychain
) -> MoneroGetTxKeyAck:
from trezor import utils, wire
from trezor.messages import MoneroGetTxKeyAck
from apps.common import paths
from apps.monero import layout, misc
from apps.monero.xmr import chacha_poly, crypto, crypto_helpers
await paths.validate_path(ctx, keychain, msg.address_n)
do_deriv = msg.reason == _GET_TX_KEY_REASON_TX_DERIVATION

@ -1,15 +1,10 @@
from typing import TYPE_CHECKING
from trezor.messages import MoneroWatchKey
from apps.common import paths
from apps.common.keychain import auto_keychain
from apps.monero import layout, misc
from apps.monero.xmr import crypto_helpers
if TYPE_CHECKING:
from trezor.wire import Context
from trezor.messages import MoneroGetWatchKey
from trezor.messages import MoneroGetWatchKey, MoneroWatchKey
from apps.common.keychain import Keychain
@ -18,6 +13,11 @@ if TYPE_CHECKING:
async def get_watch_only(
ctx: Context, msg: MoneroGetWatchKey, keychain: Keychain
) -> MoneroWatchKey:
from apps.common import paths
from apps.monero import layout, misc
from apps.monero.xmr import crypto_helpers
from trezor.messages import MoneroWatchKey
await paths.validate_path(ctx, keychain, msg.address_n)
await layout.require_confirm_watchkey(ctx)

@ -1,24 +1,19 @@
import gc
from typing import TYPE_CHECKING
from trezor import log, wire
from trezor.crypto import random
from trezor.messages import (
MoneroExportedKeyImage,
MoneroKeyImageExportInitAck,
MoneroKeyImageSyncFinalAck,
MoneroKeyImageSyncFinalRequest,
MoneroKeyImageSyncStepAck,
MoneroKeyImageSyncStepRequest,
)
from apps.common import paths
from trezor.wire import DataError
from apps.common.keychain import auto_keychain
from apps.monero import layout, misc
from apps.monero.xmr import chacha_poly, crypto, crypto_helpers, key_image, monero
from apps.monero import layout
if TYPE_CHECKING:
from trezor.messages import MoneroKeyImageExportInitRequest
from trezor.messages import (
MoneroKeyImageExportInitRequest,
MoneroKeyImageSyncFinalAck,
MoneroKeyImageExportInitAck,
MoneroKeyImageSyncStepAck,
MoneroKeyImageSyncStepRequest,
)
from trezor.wire import Context
from apps.common.keychain import Keychain
@ -27,8 +22,15 @@ if TYPE_CHECKING:
@auto_keychain(__name__)
async def key_image_sync(
ctx: wire.Context, msg: MoneroKeyImageExportInitRequest, keychain: Keychain
ctx: Context, msg: MoneroKeyImageExportInitRequest, keychain: Keychain
) -> MoneroKeyImageSyncFinalAck:
import gc
from trezor.messages import (
MoneroKeyImageSyncFinalAck,
MoneroKeyImageSyncFinalRequest,
MoneroKeyImageSyncStepRequest,
)
state = KeyImageSync()
res = await _init_step(state, ctx, msg, keychain)
@ -37,13 +39,20 @@ async def key_image_sync(
res = await _sync_step(state, ctx, step)
gc.collect()
await ctx.call(res, MoneroKeyImageSyncFinalRequest)
res = await _final_step(state, ctx)
return res
# _final_step
if state.current_output + 1 != state.num_outputs:
raise DataError("Invalid number of outputs")
final_hash = state.hasher.digest()
if final_hash != state.expected_hash:
raise DataError("Invalid number of outputs")
return MoneroKeyImageSyncFinalAck(enc_key=state.enc_key)
class KeyImageSync:
def __init__(self):
from apps.monero.xmr import crypto_helpers
self.current_output = -1
self.num_outputs = 0
self.expected_hash = b""
@ -55,10 +64,16 @@ class KeyImageSync:
async def _init_step(
s: KeyImageSync,
ctx: wire.Context,
ctx: Context,
msg: MoneroKeyImageExportInitRequest,
keychain: Keychain,
) -> MoneroKeyImageExportInitAck:
from trezor.messages import MoneroKeyImageExportInitAck
from trezor.crypto import random
from apps.common import paths
from apps.monero.xmr import monero
from apps.monero import misc
await paths.validate_path(ctx, keychain, msg.address_n)
s.creds = misc.get_creds(keychain, msg.address_n, msg.network_type)
@ -78,12 +93,19 @@ async def _init_step(
async def _sync_step(
s: KeyImageSync, ctx: wire.Context, tds: MoneroKeyImageSyncStepRequest
s: KeyImageSync, ctx: Context, tds: MoneroKeyImageSyncStepRequest
) -> MoneroKeyImageSyncStepAck:
from trezor import log
from trezor.messages import (
MoneroExportedKeyImage,
MoneroKeyImageSyncStepAck,
)
from apps.monero.xmr import chacha_poly, crypto, key_image
assert s.creds is not None
if not tds.tdis:
raise wire.DataError("Empty")
raise DataError("Empty")
kis = []
buff = bytearray(32 * 3)
@ -94,7 +116,7 @@ async def _sync_step(
for td in tds.tdis:
s.current_output += 1
if s.current_output >= s.num_outputs:
raise wire.DataError("Too many outputs")
raise DataError("Too many outputs")
if __debug__:
log.debug(__name__, "ki_sync, step i: %d", s.current_output)
@ -116,14 +138,3 @@ async def _sync_step(
kis.append(MoneroExportedKeyImage(iv=nonce, blob=ciph))
return MoneroKeyImageSyncStepAck(kis=kis)
async def _final_step(s, ctx: wire.Context) -> MoneroKeyImageSyncFinalAck:
if s.current_output + 1 != s.num_outputs:
raise wire.DataError("Invalid number of outputs")
final_hash = s.hasher.digest()
if final_hash != s.expected_hash:
raise wire.DataError("Invalid number of outputs")
return MoneroKeyImageSyncFinalAck(enc_key=s.enc_key)

@ -1,13 +1,8 @@
from typing import TYPE_CHECKING
from trezor import strings, ui
from trezor import ui
from trezor.enums import ButtonRequestType
from trezor.ui.layouts import (
confirm_action,
confirm_blob,
confirm_metadata,
confirm_output,
)
from trezor.ui.layouts import confirm_action, confirm_metadata
from trezor.ui.popup import Popup
DUMMY_PAYMENT_ID = b"\x00\x00\x00\x00\x00\x00\x00\x00"
@ -24,7 +19,12 @@ if TYPE_CHECKING:
from .signing.state import State
BRT_SignTx = ButtonRequestType.SignTx # global_import_cache
def _format_amount(value: int) -> str:
from trezor import strings
return f"{strings.format_amount(value, 12)} XMR"
@ -36,7 +36,7 @@ async def require_confirm_watchkey(ctx: Context) -> None:
description="Do you really want to export watch-only credentials?",
icon=ui.ICON_SEND,
icon_color=ui.GREEN,
br_code=ButtonRequestType.SignTx,
br_code=BRT_SignTx,
)
@ -48,7 +48,7 @@ async def require_confirm_keyimage_sync(ctx: Context) -> None:
description="Do you really want to\nsync key images?",
icon=ui.ICON_SEND,
icon_color=ui.GREEN,
br_code=ButtonRequestType.SignTx,
br_code=BRT_SignTx,
)
@ -60,15 +60,16 @@ async def require_confirm_live_refresh(ctx: Context) -> None:
description="Do you really want to\nstart refresh?",
icon=ui.ICON_SEND,
icon_color=ui.GREEN,
br_code=ButtonRequestType.SignTx,
br_code=BRT_SignTx,
)
async def require_confirm_tx_key(ctx: Context, export_key: bool = False) -> None:
if export_key:
description = "Do you really want to export tx_key?"
else:
description = "Do you really want to export tx_der\nfor tx_proof?"
description = (
"Do you really want to export tx_key?"
if export_key
else "Do you really want to export tx_der\nfor tx_proof?"
)
await confirm_action(
ctx,
"export_tx_key",
@ -76,7 +77,7 @@ async def require_confirm_tx_key(ctx: Context, export_key: bool = False) -> None
description=description,
icon=ui.ICON_SEND,
icon_color=ui.GREEN,
br_code=ButtonRequestType.SignTx,
br_code=BRT_SignTx,
)
@ -93,6 +94,7 @@ async def require_confirm_transaction(
outputs = tsx_data.outputs
change_idx = get_change_addr_idx(outputs, tsx_data.change_dts)
payment_id = tsx_data.payment_id # local_cache_attribute
if tsx_data.unlock_time != 0:
await _require_confirm_unlock_time(ctx, tsx_data.unlock_time)
@ -105,17 +107,17 @@ async def require_confirm_transaction(
if is_dummy:
continue # Dummy output does not need confirmation
if tsx_data.integrated_indices and idx in tsx_data.integrated_indices:
cur_payment = tsx_data.payment_id
cur_payment = payment_id
else:
cur_payment = None
await _require_confirm_output(ctx, dst, network_type, cur_payment)
if (
tsx_data.payment_id
payment_id
and not tsx_data.integrated_indices
and tsx_data.payment_id != DUMMY_PAYMENT_ID
and payment_id != DUMMY_PAYMENT_ID
):
await _require_confirm_payment_id(ctx, tsx_data.payment_id)
await _require_confirm_payment_id(ctx, payment_id)
await _require_confirm_fee(ctx, tsx_data.fee)
await transaction_step(state, 0)
@ -132,6 +134,7 @@ async def _require_confirm_output(
"""
from apps.monero.xmr.addresses import encode_addr
from apps.monero.xmr.networks import net_version
from trezor.ui.layouts import confirm_output
version = net_version(network_type, dst.is_subaddress, payment_id is not None)
addr = encode_addr(
@ -140,20 +143,22 @@ async def _require_confirm_output(
await confirm_output(
ctx,
address=addr,
amount=_format_amount(dst.amount),
font_amount=ui.BOLD,
br_code=ButtonRequestType.SignTx,
addr,
_format_amount(dst.amount),
ui.BOLD,
br_code=BRT_SignTx,
)
async def _require_confirm_payment_id(ctx: Context, payment_id: bytes) -> None:
from trezor.ui.layouts import confirm_blob
await confirm_blob(
ctx,
"confirm_payment_id",
title="Payment ID",
data=payment_id,
br_code=ButtonRequestType.SignTx,
"Payment ID",
payment_id,
br_code=BRT_SignTx,
)
@ -161,9 +166,9 @@ async def _require_confirm_fee(ctx: Context, fee: int) -> None:
await confirm_metadata(
ctx,
"confirm_final",
title="Confirm fee",
content="{}",
param=_format_amount(fee),
"Confirm fee",
"{}",
_format_amount(fee),
hide_continue=True,
hold=True,
)
@ -176,7 +181,7 @@ async def _require_confirm_unlock_time(ctx: Context, unlock_time: int) -> None:
"Confirm unlock time",
"Unlock time for this transaction is set to {}",
str(unlock_time),
br_code=ButtonRequestType.SignTx,
BRT_SignTx,
)
@ -187,6 +192,8 @@ class TransactionStep(ui.Component):
self.info = info
def on_render(self) -> None:
from trezor import ui # local_cache_global
state = self.state
info = self.info
ui.header("Signing transaction", ui.ICON_SEND, ui.TITLE_GREY, ui.BG, ui.BLUE)
@ -217,6 +224,8 @@ class LiveRefreshStep(ui.Component):
self.current = current
def on_render(self) -> None:
from trezor import ui # local_cache_global
current = self.current
ui.header("Refreshing", ui.ICON_SEND, ui.TITLE_GREY, ui.BG, ui.BLUE)
p = (1000 * current // 8) % 1000

@ -1,23 +1,16 @@
import gc
from typing import TYPE_CHECKING
import storage.cache
from trezor import log
from trezor.enums import MessageType
from trezor.messages import (
MoneroLiveRefreshFinalAck,
MoneroLiveRefreshStartAck,
MoneroLiveRefreshStepAck,
MoneroLiveRefreshStepRequest,
)
from apps.common import paths
from apps.common.keychain import auto_keychain
from apps.monero import layout, misc
from apps.monero.xmr import chacha_poly, crypto, crypto_helpers, key_image, monero
if TYPE_CHECKING:
from trezor.messages import MoneroLiveRefreshStartRequest
from trezor.messages import (
MoneroLiveRefreshStepAck,
MoneroLiveRefreshStepRequest,
MoneroLiveRefreshStartRequest,
MoneroLiveRefreshFinalAck,
MoneroLiveRefreshStartAck,
)
from trezor.wire import Context
from apps.common.keychain import Keychain
@ -28,6 +21,10 @@ if TYPE_CHECKING:
async def live_refresh(
ctx: Context, msg: MoneroLiveRefreshStartRequest, keychain: Keychain
) -> MoneroLiveRefreshFinalAck:
import gc
from trezor.enums import MessageType
from trezor.messages import MoneroLiveRefreshFinalAck, MoneroLiveRefreshStepRequest
state = LiveRefreshState()
res = await _init_step(state, ctx, msg, keychain)
@ -57,11 +54,15 @@ async def _init_step(
msg: MoneroLiveRefreshStartRequest,
keychain: Keychain,
) -> MoneroLiveRefreshStartAck:
import storage.cache as storage_cache
from apps.common import paths
from trezor.messages import MoneroLiveRefreshStartAck
await paths.validate_path(ctx, keychain, msg.address_n)
if not storage.cache.get(storage.cache.APP_MONERO_LIVE_REFRESH):
if not storage_cache.get(storage_cache.APP_MONERO_LIVE_REFRESH):
await layout.require_confirm_live_refresh(ctx)
storage.cache.set(storage.cache.APP_MONERO_LIVE_REFRESH, b"\x01")
storage_cache.set(storage_cache.APP_MONERO_LIVE_REFRESH, b"\x01")
s.creds = misc.get_creds(keychain, msg.address_n, msg.network_type)
@ -71,6 +72,10 @@ async def _init_step(
async def _refresh_step(
s: LiveRefreshState, ctx: Context, msg: MoneroLiveRefreshStepRequest
) -> MoneroLiveRefreshStepAck:
from trezor.messages import MoneroLiveRefreshStepAck
from trezor import log
from apps.monero.xmr import chacha_poly, crypto, crypto_helpers, key_image, monero
assert s.creds is not None
buff = bytearray(32 * 3)

@ -1,21 +1,22 @@
import gc
from typing import TYPE_CHECKING
from trezor import log, utils, wire
from trezor.enums import MessageType
from apps.common.keychain import auto_keychain
from apps.monero.signing.state import State
if TYPE_CHECKING:
from trezor.messages import MoneroTransactionFinalAck
from apps.common.keychain import Keychain
from apps.monero.signing.state import State
from trezor.wire import Context
@auto_keychain(__name__)
async def sign_tx(
ctx: wire.Context, received_msg, keychain: Keychain
ctx: Context, received_msg, keychain: Keychain
) -> MoneroTransactionFinalAck:
import gc
from trezor import log, utils
from apps.monero.signing.state import State
state = State(ctx)
mods = utils.unimport_begin()
@ -27,7 +28,7 @@ async def sign_tx(
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
result_msg, accept_msgs = await sign_tx_dispatch(state, received_msg, keychain)
result_msg, accept_msgs = await _sign_tx_dispatch(state, received_msg, keychain)
if accept_msgs is None:
break
@ -41,8 +42,13 @@ async def sign_tx(
return result_msg
async def sign_tx_dispatch(state: State, msg, keychain: Keychain) -> tuple:
if msg.MESSAGE_WIRE_TYPE == MessageType.MoneroTransactionInitRequest:
async def _sign_tx_dispatch(state: State, msg, keychain: Keychain) -> tuple:
from trezor.enums import MessageType
from trezor import wire
MESSAGE_WIRE_TYPE = msg.MESSAGE_WIRE_TYPE # local_cache_attribute
if MESSAGE_WIRE_TYPE == MessageType.MoneroTransactionInitRequest:
from apps.monero.signing import step_01_init_transaction
return (
@ -52,7 +58,7 @@ async def sign_tx_dispatch(state: State, msg, keychain: Keychain) -> tuple:
(MessageType.MoneroTransactionSetInputRequest,),
)
elif msg.MESSAGE_WIRE_TYPE == MessageType.MoneroTransactionSetInputRequest:
elif MESSAGE_WIRE_TYPE == MessageType.MoneroTransactionSetInputRequest:
from apps.monero.signing import step_02_set_input
return (
@ -63,7 +69,7 @@ async def sign_tx_dispatch(state: State, msg, keychain: Keychain) -> tuple:
),
)
elif msg.MESSAGE_WIRE_TYPE == MessageType.MoneroTransactionInputViniRequest:
elif MESSAGE_WIRE_TYPE == MessageType.MoneroTransactionInputViniRequest:
from apps.monero.signing import step_04_input_vini
return (
@ -76,7 +82,7 @@ async def sign_tx_dispatch(state: State, msg, keychain: Keychain) -> tuple:
),
)
elif msg.MESSAGE_WIRE_TYPE == MessageType.MoneroTransactionAllInputsSetRequest:
elif MESSAGE_WIRE_TYPE == MessageType.MoneroTransactionAllInputsSetRequest:
from apps.monero.signing import step_05_all_inputs_set
return (
@ -84,7 +90,7 @@ async def sign_tx_dispatch(state: State, msg, keychain: Keychain) -> tuple:
(MessageType.MoneroTransactionSetOutputRequest,),
)
elif msg.MESSAGE_WIRE_TYPE == MessageType.MoneroTransactionSetOutputRequest:
elif MESSAGE_WIRE_TYPE == MessageType.MoneroTransactionSetOutputRequest:
from apps.monero.signing import step_06_set_output
is_offloaded_bp = bool(msg.is_offloaded_bp)
@ -101,7 +107,7 @@ async def sign_tx_dispatch(state: State, msg, keychain: Keychain) -> tuple:
),
)
elif msg.MESSAGE_WIRE_TYPE == MessageType.MoneroTransactionAllOutSetRequest:
elif MESSAGE_WIRE_TYPE == MessageType.MoneroTransactionAllOutSetRequest:
from apps.monero.signing import step_07_all_outputs_set
return (
@ -109,7 +115,7 @@ async def sign_tx_dispatch(state: State, msg, keychain: Keychain) -> tuple:
(MessageType.MoneroTransactionSignInputRequest,),
)
elif msg.MESSAGE_WIRE_TYPE == MessageType.MoneroTransactionSignInputRequest:
elif MESSAGE_WIRE_TYPE == MessageType.MoneroTransactionSignInputRequest:
from apps.monero.signing import step_09_sign_input
return (
@ -130,7 +136,7 @@ async def sign_tx_dispatch(state: State, msg, keychain: Keychain) -> tuple:
),
)
elif msg.MESSAGE_WIRE_TYPE == MessageType.MoneroTransactionFinalRequest:
elif MESSAGE_WIRE_TYPE == MessageType.MoneroTransactionFinalRequest:
from apps.monero.signing import step_10_sign_final
return step_10_sign_final.final_msg(state), None

@ -1,15 +1,15 @@
from trezor import wire
from trezor.wire import DataError
class Error(wire.DataError):
class Error(DataError):
pass
class ChangeAddressError(wire.DataError):
class ChangeAddressError(DataError):
pass
class NotEnoughOutputsError(wire.DataError):
class NotEnoughOutputsError(DataError):
pass

@ -1,9 +1,7 @@
from micropython import const
from typing import TYPE_CHECKING
from trezor import utils
from apps.monero.xmr import crypto_helpers
from apps.monero.xmr.crypto_helpers import compute_hmac
if TYPE_CHECKING:
from trezor.messages import (
@ -27,6 +25,8 @@ def _build_key(
"""
Creates an unique-purpose key
"""
from trezor import utils
from apps.monero.xmr import crypto_helpers
key_buff = _BUILD_KEY_BUFFER
utils.ensure(len(secret) == _SECRET_LENGTH, "Invalid key length")
@ -67,14 +67,14 @@ def hmac_key_txin_comm(key_hmac: bytes, idx: int) -> bytes:
return _build_key(key_hmac, b"txin-comm", idx)
def hmac_key_txdst(key_hmac: bytes, idx: int) -> bytes:
def _hmac_key_txdst(key_hmac: bytes, idx: int) -> bytes:
"""
TxDestinationEntry[i] hmac key
"""
return _build_key(key_hmac, b"txdest", idx)
def hmac_key_txout(key_hmac: bytes, idx: int) -> bytes:
def _hmac_key_txout(key_hmac: bytes, idx: int) -> bytes:
"""
(TxDestinationEntry[i] || tx.vout[i]) hmac key
"""
@ -132,7 +132,7 @@ def gen_hmac_vini(
kwriter.write(vini_bin)
hmac_key_vini = hmac_key_txin(key, idx)
hmac_vini = crypto_helpers.compute_hmac(hmac_key_vini, kwriter.get_digest())
hmac_vini = compute_hmac(hmac_key_vini, kwriter.get_digest())
return hmac_vini
@ -149,8 +149,8 @@ def gen_hmac_vouti(
kwriter.write(protobuf.dump_message_buffer(dst_entr))
kwriter.write(tx_out_bin)
hmac_key_vouti = hmac_key_txout(key, idx)
hmac_vouti = crypto_helpers.compute_hmac(hmac_key_vouti, kwriter.get_digest())
hmac_key_vouti = _hmac_key_txout(key, idx)
hmac_vouti = compute_hmac(hmac_key_vouti, kwriter.get_digest())
return hmac_vouti
@ -166,8 +166,8 @@ def gen_hmac_tsxdest(
kwriter = get_keccak_writer()
kwriter.write(protobuf.dump_message_buffer(dst_entr))
hmac_key = hmac_key_txdst(key, idx)
hmac_tsxdest = crypto_helpers.compute_hmac(hmac_key, kwriter.get_digest())
hmac_key = _hmac_key_txdst(key, idx)
hmac_tsxdest = compute_hmac(hmac_key, kwriter.get_digest())
return hmac_tsxdest

@ -1,11 +1,5 @@
import gc
from micropython import const
from typing import TYPE_CHECKING
from trezor import log
from apps.monero.xmr import crypto
if TYPE_CHECKING:
from trezor.wire import Context
from apps.monero.xmr.crypto import Point, Scalar
@ -17,17 +11,18 @@ if TYPE_CHECKING:
class State:
STEP_INIT = const(0)
STEP_INP = const(100)
STEP_VINI = const(300)
STEP_ALL_IN = const(350)
STEP_OUT = const(400)
STEP_ALL_OUT = const(500)
STEP_SIGN = const(600)
STEP_INIT = 0
STEP_INP = 100
STEP_VINI = 300
STEP_ALL_IN = 350
STEP_OUT = 400
STEP_ALL_OUT = 500
STEP_SIGN = 600
def __init__(self, ctx: Context) -> None:
from apps.monero.xmr.keccak_hasher import KeccakXmrArchive
from apps.monero.xmr.mlsag_hasher import PreMlsagHasher
from apps.monero.xmr import crypto
self.ctx = ctx
@ -140,6 +135,9 @@ class State:
self.full_message: bytes | None = None
def mem_trace(self, x=None, collect: bool = False) -> None:
import gc
from trezor import log
if __debug__:
log.debug(
__name__,

@ -2,11 +2,10 @@
Initializes a new transaction.
"""
import gc
from typing import TYPE_CHECKING
from apps.monero import layout, misc, signing
from apps.monero.signing.state import State
from apps.monero import signing
from apps.monero.xmr import crypto, crypto_helpers, monero
if TYPE_CHECKING:
@ -18,6 +17,8 @@ if TYPE_CHECKING:
MoneroTransactionRsigData,
)
from apps.monero.signing.state import State
async def init_transaction(
state: State,
@ -26,8 +27,13 @@ async def init_transaction(
tsx_data: MoneroTransactionData,
keychain,
) -> MoneroTransactionInitAck:
import gc
from apps.monero.signing import offloading_keys
from apps.common import paths
from apps.monero import layout, misc
mem_trace = state.mem_trace # local_cache_attribute
outputs = tsx_data.outputs # local_cache_attribute
await paths.validate_path(state.ctx, keychain, address_n)
@ -39,10 +45,10 @@ async def init_transaction(
state.fee = state.fee if state.fee > 0 else 0
state.tx_priv = crypto.random_scalar()
state.tx_pub = crypto.scalarmult_base_into(None, state.tx_priv)
state.mem_trace(1)
mem_trace(1)
state.input_count = tsx_data.num_inputs
state.output_count = len(tsx_data.outputs)
state.output_count = len(outputs)
assert state.input_count is not None
state.progress_total = 4 + 3 * state.input_count + state.output_count
state.progress_cur = 0
@ -54,7 +60,7 @@ async def init_transaction(
state.creds.address = None
state.creds.network_type = None
gc.collect()
state.mem_trace(3)
mem_trace(3)
# Basic transaction parameters
state.output_change = tsx_data.change_dts
@ -67,7 +73,7 @@ async def init_transaction(
raise ValueError("Unsupported hard-fork version")
# Ensure change is correct
_check_change(state, tsx_data.outputs)
_check_change(state, outputs)
# At least two outputs are required, this applies also for sweep txs
# where one fake output is added. See _check_change for more info
@ -75,7 +81,7 @@ async def init_transaction(
raise signing.NotEnoughOutputsError("At least two outputs are required")
_check_rsig_data(state, tsx_data.rsig_data)
_check_subaddresses(state, tsx_data.outputs)
_check_subaddresses(state, outputs)
# Extra processing, payment id
_process_payment_id(state, tsx_data)
@ -86,7 +92,7 @@ async def init_transaction(
state.tx_prefix_hasher.uvarint(2) # current Monero transaction format (RingCT = 2)
state.tx_prefix_hasher.uvarint(tsx_data.unlock_time)
state.tx_prefix_hasher.uvarint(state.input_count) # ContainerType, size
state.mem_trace(10, True)
mem_trace(10, True)
# Final message hasher
state.full_message_hasher.init()
@ -94,21 +100,25 @@ async def init_transaction(
# Sub address precomputation
if tsx_data.account is not None and tsx_data.minor_indices:
_precompute_subaddr(state, tsx_data.account, tsx_data.minor_indices)
state.mem_trace(5, True)
# _precompute_subaddr
# Precomputes subaddresses for account (major) and list of indices (minors)
# Subaddresses have to be stored in encoded form - unique representation.
# Single point can have multiple extended coordinates representation - would not match during subaddress search.
monero.compute_subaddresses(
state.creds, tsx_data.account, tsx_data.minor_indices, state.subaddresses
)
mem_trace(5, True)
# HMACs all outputs to disallow tampering.
# Each HMAC is then sent alongside the output
# and trezor validates it.
hmacs = []
for idx in range(state.output_count):
c_hmac = offloading_keys.gen_hmac_tsxdest(
state.key_hmac, tsx_data.outputs[idx], idx
)
c_hmac = offloading_keys.gen_hmac_tsxdest(state.key_hmac, outputs[idx], idx)
hmacs.append(c_hmac)
gc.collect()
state.mem_trace(6)
mem_trace(6)
from trezor.messages import (
MoneroTransactionInitAck,
@ -214,16 +224,12 @@ def _check_rsig_data(state: State, rsig_data: MoneroTransactionRsigData) -> None
if state.output_count > 2:
state.rsig_offload = True
_check_grouping(state)
def _check_grouping(state: State) -> None:
# _check_grouping
acc = 0
for x in state.rsig_grouping:
if x is None or x <= 0:
raise ValueError("Invalid grouping batch")
acc += x
if acc != state.output_count:
raise ValueError("Invalid grouping")
@ -295,15 +301,6 @@ def _compute_sec_keys(state: State, tsx_data: MoneroTransactionData) -> None:
state.key_enc = crypto_helpers.keccak_2hash(b"enc" + master_key)
def _precompute_subaddr(state: State, account: int, indices: list[int]) -> None:
"""
Precomputes subaddresses for account (major) and list of indices (minors)
Subaddresses have to be stored in encoded form - unique representation.
Single point can have multiple extended coordinates representation - would not match during subaddress search.
"""
monero.compute_subaddresses(state.creds, account, indices, state.subaddresses)
def _process_payment_id(state: State, tsx_data: MoneroTransactionData) -> None:
"""
Writes payment id to the `extra` field under the TX_EXTRA_NONCE = 0x02 tag.
@ -319,29 +316,29 @@ def _process_payment_id(state: State, tsx_data: MoneroTransactionData) -> None:
See:
- https://github.com/monero-project/monero/blob/ff7dc087ae5f7de162131cea9dbcf8eac7c126a1/src/cryptonote_basic/tx_extra.h
"""
payment_id = tsx_data.payment_id # local_cache_attribute
# encrypted payment id / dummy payment ID
view_key_pub_enc = None
if not tsx_data.payment_id or len(tsx_data.payment_id) == 8:
if not payment_id or len(payment_id) == 8:
view_key_pub_enc = _get_key_for_payment_id_encryption(
tsx_data, state.change_address(), True
)
if not tsx_data.payment_id:
if not payment_id:
return
elif len(tsx_data.payment_id) == 8:
elif len(payment_id) == 8:
view_key_pub = crypto_helpers.decodepoint(view_key_pub_enc)
payment_id_encr = _encrypt_payment_id(
tsx_data.payment_id, view_key_pub, state.tx_priv
)
payment_id_encr = _encrypt_payment_id(payment_id, view_key_pub, state.tx_priv)
extra_nonce = payment_id_encr
extra_prefix = 1 # TX_EXTRA_NONCE_ENCRYPTED_PAYMENT_ID
# plain text payment id
elif len(tsx_data.payment_id) == 32:
extra_nonce = tsx_data.payment_id
elif len(payment_id) == 32:
extra_nonce = payment_id
extra_prefix = 0 # TX_EXTRA_NONCE_PAYMENT_ID
else:

@ -13,38 +13,43 @@ key derived for exactly this purpose.
"""
from typing import TYPE_CHECKING
from apps.monero import layout
from apps.monero.xmr import crypto, crypto_helpers, monero, serialize
from .state import State
from apps.monero.xmr import crypto_helpers
if TYPE_CHECKING:
from trezor.messages import MoneroTransactionSourceEntry
from trezor.messages import MoneroTransactionSetInputAck
from .state import State
from trezor.messages import (
MoneroTransactionSourceEntry,
MoneroTransactionSetInputAck,
)
from apps.monero.xmr import crypto
async def set_input(
state: State, src_entr: MoneroTransactionSourceEntry
) -> MoneroTransactionSetInputAck:
from trezor.messages import MoneroTransactionSetInputAck
from apps.monero.xmr import chacha_poly
from apps.monero.xmr.serialize_messages.tx_prefix import TxinToKey
from apps.monero.xmr import chacha_poly, monero, serialize
from apps.monero.signing import offloading_keys
from apps.monero import layout
state.current_input_index += 1
current_input_index = state.current_input_index # local_cache_attribute
amount = src_entr.amount # local_cache_attribute
outputs = src_entr.outputs # local_cache_attribute
await layout.transaction_step(state, state.STEP_INP, state.current_input_index)
await layout.transaction_step(state, state.STEP_INP, current_input_index)
if state.last_step > state.STEP_INP:
raise ValueError("Invalid state transition")
if state.current_input_index >= state.input_count:
if current_input_index >= state.input_count:
raise ValueError("Too many inputs")
# real_output denotes which output in outputs is the real one (ours)
if src_entr.real_output >= len(src_entr.outputs):
if src_entr.real_output >= len(outputs):
raise ValueError(
f"real_output index {src_entr.real_output} bigger than output_keys.size() {len(src_entr.outputs)}"
f"real_output index {src_entr.real_output} bigger than output_keys.size() {len(outputs)}"
)
state.summary_inputs_money += src_entr.amount
state.summary_inputs_money += amount
# Secrets derivation
# the UTXO's one-time address P
@ -71,10 +76,8 @@ async def set_input(
# Construct tx.vin
# If multisig is used then ki in vini should be src_entr.multisig_kLRki.ki
vini = TxinToKey(amount=src_entr.amount, k_image=crypto_helpers.encodepoint(ki))
vini.key_offsets = _absolute_output_offsets_to_relative(
[x.idx for x in src_entr.outputs]
)
vini = TxinToKey(amount=amount, k_image=crypto_helpers.encodepoint(ki))
vini.key_offsets = _absolute_output_offsets_to_relative([x.idx for x in outputs])
if src_entr.rct:
vini.amount = 0
@ -87,36 +90,36 @@ async def set_input(
# HMAC(T_in,i || vin_i)
hmac_vini = offloading_keys.gen_hmac_vini(
state.key_hmac, src_entr, vini_bin, state.current_input_index
state.key_hmac, src_entr, vini_bin, current_input_index
)
state.mem_trace(3, True)
# PseudoOuts commitment, alphas stored to state
alpha, pseudo_out = _gen_commitment(state, src_entr.amount)
alpha, pseudo_out = _gen_commitment(state, amount)
pseudo_out = crypto_helpers.encodepoint(pseudo_out)
# The alpha is encrypted and passed back for storage
pseudo_out_hmac = crypto_helpers.compute_hmac(
offloading_keys.hmac_key_txin_comm(state.key_hmac, state.current_input_index),
offloading_keys.hmac_key_txin_comm(state.key_hmac, current_input_index),
pseudo_out,
)
alpha_enc = chacha_poly.encrypt_pack(
offloading_keys.enc_key_txin_alpha(state.key_enc, state.current_input_index),
offloading_keys.enc_key_txin_alpha(state.key_enc, current_input_index),
crypto_helpers.encodeint(alpha),
)
spend_enc = chacha_poly.encrypt_pack(
offloading_keys.enc_key_spend(state.key_enc, state.current_input_index),
offloading_keys.enc_key_spend(state.key_enc, current_input_index),
crypto_helpers.encodeint(xi),
)
state.last_step = state.STEP_INP
if state.current_input_index + 1 == state.input_count:
if current_input_index + 1 == state.input_count:
# When we finish the inputs processing, we no longer need
# the precomputed subaddresses so we clear them to save memory.
state.subaddresses = None
state.input_last_amount = src_entr.amount
state.input_last_amount = amount
return MoneroTransactionSetInputAck(
vini=vini_bin,
@ -139,6 +142,8 @@ def _gen_commitment(state: State, in_amount: int) -> tuple[crypto.Scalar, crypto
and the last A mask is computed in this special way.
Returns pseudo_out
"""
from apps.monero.xmr import crypto
alpha = crypto.random_scalar()
state.sumpouts_alphas = crypto.sc_add_into(None, state.sumpouts_alphas, alpha)
return alpha, crypto.gen_commitment_into(None, alpha, in_amount)
@ -163,17 +168,15 @@ def _absolute_output_offsets_to_relative(off: list[int]) -> list[int]:
def _get_additional_public_key(
src_entr: MoneroTransactionSourceEntry,
) -> crypto.Point | None:
additional_tx_keys = src_entr.real_out_additional_tx_keys # local_cache_attribute
additional_tx_pub_key = None
if len(src_entr.real_out_additional_tx_keys) == 1: # compression
additional_tx_pub_key = crypto_helpers.decodepoint(
src_entr.real_out_additional_tx_keys[0]
)
elif src_entr.real_out_additional_tx_keys:
if src_entr.real_output_in_tx_index >= len(
src_entr.real_out_additional_tx_keys
):
if len(additional_tx_keys) == 1: # compression
additional_tx_pub_key = crypto_helpers.decodepoint(additional_tx_keys[0])
elif additional_tx_keys:
if src_entr.real_output_in_tx_index >= len(additional_tx_keys):
raise ValueError("Wrong number of additional derivations")
additional_tx_pub_key = crypto_helpers.decodepoint(
src_entr.real_out_additional_tx_keys[src_entr.real_output_in_tx_index]
additional_tx_keys[src_entr.real_output_in_tx_index]
)
return additional_tx_pub_key

@ -5,17 +5,12 @@ after the sorting on tx.vin[i].ki. The sorting order was received in the previou
from typing import TYPE_CHECKING
from apps.monero import layout
from apps.monero.signing import offloading_keys
from apps.monero.xmr import crypto
from .state import State
if TYPE_CHECKING:
from trezor.messages import (
MoneroTransactionInputViniAck,
MoneroTransactionSourceEntry,
)
from .state import State
async def input_vini(
@ -25,15 +20,21 @@ async def input_vini(
vini_hmac: bytes,
orig_idx: int,
) -> MoneroTransactionInputViniAck:
from apps.monero import layout
from apps.monero.signing import offloading_keys
from apps.monero.xmr import crypto
from trezor.messages import MoneroTransactionInputViniAck
await layout.transaction_step(state, state.STEP_VINI, state.current_input_index + 1)
if state.last_step not in (state.STEP_INP, state.STEP_VINI):
STEP_VINI = state.STEP_VINI # local_cache_attribute
await layout.transaction_step(state, STEP_VINI, state.current_input_index + 1)
if state.last_step not in (state.STEP_INP, STEP_VINI):
raise ValueError("Invalid state transition")
if state.current_input_index >= state.input_count:
raise ValueError("Too many inputs")
if state.last_step < state.STEP_VINI:
if state.last_step < STEP_VINI:
state.current_input_index = -1
state.last_ki = None
@ -56,6 +57,6 @@ async def input_vini(
# Incremental hasing of tx.vin[i]
state.tx_prefix_hasher.buffer(vini_bin)
state.last_step = state.STEP_VINI
state.last_step = STEP_VINI
state.last_ki = cur_ki if state.current_input_index < state.input_count else None
return MoneroTransactionInputViniAck()

@ -5,22 +5,20 @@ If in the applicable offloading mode, generate commitment masks.
from typing import TYPE_CHECKING
from apps.monero import layout
from apps.monero.xmr import crypto
from .state import State
if TYPE_CHECKING:
from trezor.messages import MoneroTransactionAllInputsSetAck
from .state import State
async def all_inputs_set(state: State) -> MoneroTransactionAllInputsSetAck:
from apps.monero import layout
from apps.monero.xmr import crypto
from trezor.messages import MoneroTransactionAllInputsSetAck
state.mem_trace(0)
await layout.transaction_step(state, state.STEP_ALL_IN)
from trezor.messages import MoneroTransactionAllInputsSetAck
if state.last_step != state.STEP_VINI:
raise ValueError("Invalid state transition")
if state.current_input_index != state.input_count - 1:

@ -2,16 +2,12 @@
Output destinations are streamed one by one.
Computes destination one-time address, amount key, range proof + HMAC, out_pk, ecdh_info.
"""
import gc
from typing import TYPE_CHECKING
from trezor import utils
from apps.monero import layout, signing
from apps.monero.signing import offloading_keys
from apps.monero.xmr import crypto, crypto_helpers, serialize
from .state import State
from apps.monero.xmr import crypto, crypto_helpers
if TYPE_CHECKING:
from apps.monero.xmr.serialize_messages.tx_ecdh import EcdhTuple
@ -21,6 +17,7 @@ if TYPE_CHECKING:
MoneroTransactionSetOutputAck,
MoneroTransactionRsigData,
)
from .state import State
async def set_output(
@ -30,7 +27,11 @@ async def set_output(
rsig_data: MoneroTransactionRsigData,
is_offloaded_bp=False,
) -> MoneroTransactionSetOutputAck:
state.mem_trace(0, True)
from apps.monero import layout
mem_trace = state.mem_trace # local_cache_attribute
mem_trace(0, True)
mods = utils.unimport_begin()
# Progress update only for master message (skip for offloaded BP msg)
@ -39,32 +40,32 @@ async def set_output(
state, state.STEP_OUT, state.current_output_index + 1
)
state.mem_trace(1, True)
mem_trace(1, True)
dst_entr = _validate(state, dst_entr, dst_entr_hmac, is_offloaded_bp)
state.mem_trace(2, True)
mem_trace(2, True)
if not state.is_processing_offloaded:
# First output - we include the size of the container into the tx prefix hasher
if state.current_output_index == 0:
state.tx_prefix_hasher.uvarint(state.output_count)
state.mem_trace(4, True)
mem_trace(4, True)
state.output_amounts.append(dst_entr.amount)
state.summary_outs_money += dst_entr.amount
utils.unimport_end(mods)
state.mem_trace(5, True)
mem_trace(5, True)
# Compute tx keys and masks if applicable
tx_out_key, amount_key, derivation = _compute_tx_keys(state, dst_entr)
utils.unimport_end(mods)
state.mem_trace(6, True)
mem_trace(6, True)
# Range proof first, memory intensive (fragmentation)
rsig_data_new, mask = _range_proof(state, rsig_data)
utils.unimport_end(mods)
state.mem_trace(7, True)
mem_trace(7, True)
# If det masks & offloading, return as we are handling offloaded BP.
if state.is_processing_offloaded:
@ -80,29 +81,29 @@ async def set_output(
derivation,
)
del (derivation,)
state.mem_trace(11, True)
mem_trace(11, True)
out_pk_dest, out_pk_commitment, ecdh_info_bin = _get_ecdh_info_and_out_pk(
state=state,
tx_out_key=tx_out_key,
amount=dst_entr.amount,
mask=mask,
amount_key=amount_key,
state,
tx_out_key,
dst_entr.amount,
mask,
amount_key,
)
del (dst_entr, mask, amount_key, tx_out_key)
state.mem_trace(12, True)
mem_trace(12, True)
# Incremental hashing of the ECDH info.
# RctSigBase allows to hash only one of the (ecdh, out_pk) as they are serialized
# as whole vectors. We choose to hash ECDH first, because it saves state space.
state.full_message_hasher.set_ecdh(ecdh_info_bin)
state.mem_trace(13, True)
mem_trace(13, True)
# output_pk_commitment is stored to the state as it is used during the signature and hashed to the
# RctSigBase later. No need to store amount, it was already stored.
state.output_pk_commitments.append(out_pk_commitment)
state.last_step = state.STEP_OUT
state.mem_trace(14, True)
mem_trace(14, True)
from trezor.messages import MoneroTransactionSetOutputAck
@ -125,6 +126,8 @@ def _validate(
dst_entr_hmac: bytes,
is_offloaded_bp: bool,
) -> MoneroTransactionDestinationEntry:
ensure = utils.ensure # local_cache_attribute
if state.last_step not in (state.STEP_ALL_IN, state.STEP_OUT):
raise ValueError("Invalid state transition")
if is_offloaded_bp and (not state.rsig_offload):
@ -134,7 +137,7 @@ def _validate(
bidx = _get_rsig_batch(state, state.current_output_index)
last_in_batch = _is_last_in_batch(state, state.current_output_index, bidx)
utils.ensure(
ensure(
not last_in_batch or state.is_processing_offloaded != is_offloaded_bp,
"Offloaded BP out of order",
)
@ -143,15 +146,11 @@ def _validate(
if not state.is_processing_offloaded:
state.current_output_index += 1
utils.ensure(
not dst_entr or dst_entr.amount >= 0, "Destination with negative amount"
)
utils.ensure(
ensure(not dst_entr or dst_entr.amount >= 0, "Destination with negative amount")
ensure(
state.current_input_index + 1 == state.input_count, "Invalid number of inputs"
)
utils.ensure(
state.current_output_index < state.output_count, "Invalid output index"
)
ensure(state.current_output_index < state.output_count, "Invalid output index")
if not state.is_processing_offloaded:
# HMAC check of the destination
@ -159,9 +158,7 @@ def _validate(
state.key_hmac, dst_entr, state.current_output_index
)
utils.ensure(
crypto.ct_equals(dst_entr_hmac, dst_entr_hmac_computed), "HMAC failed"
)
ensure(crypto.ct_equals(dst_entr_hmac, dst_entr_hmac_computed), "HMAC failed")
del dst_entr_hmac_computed
else:
@ -247,29 +244,29 @@ def _range_proof(
Since HF10 the commitments are deterministic.
The range proof is incrementally hashed to the final_message.
"""
from apps.monero.signing import Error
rsig_offload = state.rsig_offload # local_cache_attribute
is_processing_offloaded = state.is_processing_offloaded # local_cache_attribute
provided_rsig = None
if rsig_data and rsig_data.rsig and len(rsig_data.rsig) > 0:
provided_rsig = rsig_data.rsig
if not state.rsig_offload and provided_rsig:
raise signing.Error("Provided unexpected rsig")
if not rsig_offload and provided_rsig:
raise Error("Provided unexpected rsig")
# Batching & validation
bidx = _get_rsig_batch(state, state.current_output_index)
last_in_batch = _is_last_in_batch(state, state.current_output_index, bidx)
if state.rsig_offload and provided_rsig and not last_in_batch:
raise signing.Error("Provided rsig too early")
if (
state.rsig_offload
and last_in_batch
and not provided_rsig
and state.is_processing_offloaded
):
raise signing.Error("Rsig expected, not provided")
if rsig_offload and provided_rsig and not last_in_batch:
raise Error("Provided rsig too early")
if rsig_offload and last_in_batch and not provided_rsig and is_processing_offloaded:
raise Error("Rsig expected, not provided")
# Batch not finished, skip range sig generation now
mask = state.output_masks[-1] if not state.is_processing_offloaded else None
offload_mask = mask and state.rsig_offload
mask = state.output_masks[-1] if not is_processing_offloaded else None
offload_mask = mask and rsig_offload
# If not last, do not proceed to the BP processing.
if not last_in_batch:
@ -285,11 +282,11 @@ def _range_proof(
rsig = None
state.mem_trace("pre-rproof" if __debug__ else None, collect=True)
if not state.rsig_offload:
if not rsig_offload:
# Bulletproof calculation in Trezor
rsig = _rsig_bp(state)
elif not state.is_processing_offloaded:
elif not is_processing_offloaded:
# Bulletproof offloaded to the host, deterministic masks. Nothing here, waiting for offloaded BP.
pass
@ -305,7 +302,7 @@ def _range_proof(
)
if state.current_output_index + 1 == state.output_count and (
not state.rsig_offload or state.is_processing_offloaded
not rsig_offload or is_processing_offloaded
):
# output masks and amounts are not needed anymore
state.output_amounts = None
@ -344,6 +341,7 @@ def _rsig_bp(state: State) -> bytes:
def _rsig_process_bp(state: State, rsig_data: MoneroTransactionRsigData):
from apps.monero.xmr import range_signatures
from apps.monero.xmr.serialize_messages.tx_rsig_bulletproof import BulletproofPlus
from apps.monero.xmr import serialize
bp_obj = serialize.parse_msg(rsig_data.rsig, BulletproofPlus)
rsig_data.rsig = None
@ -375,12 +373,17 @@ def _dump_rsig_bp_plus(rsig: BulletproofPlus) -> bytes:
buff_size = 32 * (6 + 2 * (len(rsig.L))) + 2
buff = bytearray(buff_size)
utils.memcpy(buff, 0, rsig.A, 0, 32)
utils.memcpy(buff, 32, rsig.A1, 0, 32)
utils.memcpy(buff, 32 * 2, rsig.B, 0, 32)
utils.memcpy(buff, 32 * 3, rsig.r1, 0, 32)
utils.memcpy(buff, 32 * 4, rsig.s1, 0, 32)
utils.memcpy(buff, 32 * 5, rsig.d1, 0, 32)
for i, var in enumerate(
(
rsig.A,
rsig.A1,
rsig.B,
rsig.r1,
rsig.s1,
rsig.d1,
)
):
utils.memcpy(buff, 32 * i, var, 0, 32)
_dump_rsig_lr(buff, 32 * 6, rsig)
return buff
@ -434,6 +437,8 @@ def _get_ecdh_info_and_out_pk(
Also encodes the two items - `mask` and `amount` - into ecdh info,
so the recipient is able to reconstruct the commitment.
"""
import gc
out_pk_dest = crypto_helpers.encodepoint(tx_out_key)
out_pk_commitment = crypto.gen_commitment_into(None, mask, amount)
@ -442,31 +447,14 @@ def _get_ecdh_info_and_out_pk(
ecdh_info = _ecdh_encode(amount, crypto_helpers.encodeint(amount_key))
# Manual ECDH info serialization
ecdh_info_bin = _serialize_ecdh(ecdh_info)
gc.collect()
return out_pk_dest, out_pk_commitment, ecdh_info_bin
def _serialize_ecdh(ecdh_info: EcdhTuple) -> bytes:
"""
Serializes ECDH according to the current format defined by the hard fork version
or the signature format respectively.
"""
# Since HF10 the amount is serialized to 8B and mask is deterministic
# Serializes ECDH according to the current format defined by the hard fork version
# or the signature format respectively.
ecdh_info_bin = bytearray(8)
ecdh_info_bin[:] = ecdh_info.amount[0:8]
return ecdh_info_bin
gc.collect()
def _ecdh_hash(shared_sec: bytes) -> bytes:
"""
Generates ECDH hash for amount masking for Bulletproof2
"""
data = bytearray(38)
data[0:6] = b"amount"
data[6:] = shared_sec
return crypto.fast_hash_into(None, data)
return out_pk_dest, out_pk_commitment, ecdh_info_bin
def _ecdh_encode(amount: int, amount_key: bytes) -> EcdhTuple:
@ -478,7 +466,15 @@ def _ecdh_encode(amount: int, amount_key: bytes) -> EcdhTuple:
ecdh_info = EcdhTuple(mask=crypto_helpers.NULL_KEY_ENC, amount=bytearray(32))
amnt = crypto.Scalar(amount)
crypto.encodeint_into(ecdh_info.amount, amnt)
crypto_helpers.xor8(ecdh_info.amount, _ecdh_hash(amount_key))
# _ecdh_hash
# Generates ECDH hash for amount masking for Bulletproof2
data = bytearray(38)
data[0:6] = b"amount"
data[6:] = amount_key
ecdh_hash = crypto.fast_hash_into(None, data)
crypto_helpers.xor8(ecdh_info.amount, ecdh_hash)
return ecdh_info

@ -4,29 +4,27 @@ into the tx extra field and then hashes it into the prefix hash.
The prefix hash is then complete.
"""
import gc
from typing import TYPE_CHECKING
from trezor import utils
from apps.monero import layout
from apps.monero.xmr import crypto
from .state import State
if TYPE_CHECKING:
from trezor.messages import MoneroTransactionAllOutSetAck
from .state import State
async def all_outputs_set(state: State) -> MoneroTransactionAllOutSetAck:
state.mem_trace(0)
import gc
from apps.monero import layout
mem_trace = state.mem_trace # local_cache_attribute
mem_trace(0)
await layout.transaction_step(state, state.STEP_ALL_OUT)
state.mem_trace(1)
mem_trace(1)
_validate(state)
state.is_processing_offloaded = False
state.mem_trace(2)
mem_trace(2)
extra_b = _set_tx_extra(state)
# tx public keys not needed anymore
@ -35,13 +33,24 @@ async def all_outputs_set(state: State) -> MoneroTransactionAllOutSetAck:
state.rsig_grouping = None
state.rsig_offload = None
gc.collect()
state.mem_trace(3)
mem_trace(3)
# Completes the transaction prefix hash by including extra
_set_tx_prefix(state, extra_b)
# _set_tx_prefix
# Adds `extra` to the tx_prefix_hash, which is the last needed item,
# so the tx_prefix_hash is now complete and can be incorporated
# into full_message_hash.
# Serializing "extra" type as BlobType.
# uvarint(len(extra)) || extra
state.tx_prefix_hasher.uvarint(len(extra_b))
state.tx_prefix_hasher.buffer(extra_b)
state.tx_prefix_hash = state.tx_prefix_hasher.get_digest()
state.tx_prefix_hasher = None
state.full_message_hasher.set_message(state.tx_prefix_hash)
state.output_change = None
gc.collect()
state.mem_trace(4)
mem_trace(4)
# In the multisig mode here needs to be a check whether currently computed
# transaction prefix matches expected transaction prefix sent in the
@ -57,7 +66,13 @@ async def all_outputs_set(state: State) -> MoneroTransactionAllOutSetAck:
rv_type=state.tx_type,
)
_out_pk(state)
# _out_pk
# Hashes out_pk into the full message.
if state.output_count != len(state.output_pk_commitments):
raise ValueError("Invalid number of ecdh")
for out in state.output_pk_commitments:
state.full_message_hasher.set_out_pk_commitment(out)
state.full_message_hasher.rctsig_base_done()
state.current_output_index = None
state.current_input_index = -1
@ -81,20 +96,23 @@ async def all_outputs_set(state: State) -> MoneroTransactionAllOutSetAck:
def _validate(state: State) -> None:
out_money = state.summary_outs_money # local_cache_attribute
in_money = state.summary_inputs_money # local_cache_attribute
if state.last_step != state.STEP_OUT:
raise ValueError("Invalid state transition")
if state.current_output_index + 1 != state.output_count:
raise ValueError("Invalid out num")
# Fee test
if state.fee != (state.summary_inputs_money - state.summary_outs_money):
if state.fee != (in_money - out_money):
raise ValueError(
f"Fee invalid {state.fee} vs {state.summary_inputs_money - state.summary_outs_money}, out: {state.summary_outs_money}"
f"Fee invalid {state.fee} vs {in_money - out_money}, out: {out_money}"
)
if state.summary_outs_money > state.summary_inputs_money:
if out_money > in_money:
raise ValueError(
f"Transaction inputs money ({state.summary_inputs_money}) less than outputs money ({state.summary_outs_money})"
f"Transaction inputs money ({in_money}) less than outputs money ({out_money})"
)
@ -104,8 +122,12 @@ def _set_tx_extra(state: State) -> bytes:
Extra field is supposed to be sorted (by sort_tx_extra() in the Monero)
Tag ordering: TX_EXTRA_TAG_PUBKEY, TX_EXTRA_TAG_ADDITIONAL_PUBKEYS, TX_EXTRA_NONCE
"""
from trezor import utils
from apps.monero.xmr import crypto
from apps.monero.xmr.serialize import int_serialize
extra_nonce = state.extra_nonce # local_cache_attribute
# Extra buffer length computation
# TX_EXTRA_TAG_PUBKEY (1B) | tx_pub_key (32B)
extra_size = 33
@ -120,8 +142,8 @@ def _set_tx_extra(state: State) -> bytes:
# TX_EXTRA_TAG_ADDITIONAL_PUBKEYS (1B) | varint | keys
extra_size += 1 + len_size + 32 * num_keys
if state.extra_nonce:
extra_size += len(state.extra_nonce)
if extra_nonce:
extra_size += len(extra_nonce)
extra = bytearray(extra_size)
extra[0] = 1 # TX_EXTRA_TAG_PUBKEY
@ -137,36 +159,8 @@ def _set_tx_extra(state: State) -> bytes:
extra[offset : offset + 32] = state.additional_tx_public_keys[idx]
offset += 32
if state.extra_nonce:
utils.memcpy(extra, offset, state.extra_nonce, 0, len(state.extra_nonce))
if extra_nonce:
utils.memcpy(extra, offset, extra_nonce, 0, len(extra_nonce))
state.extra_nonce = None
return extra
def _set_tx_prefix(state: State, extra: bytes) -> None:
"""
Adds `extra` to the tx_prefix_hash, which is the last needed item,
so the tx_prefix_hash is now complete and can be incorporated
into full_message_hash.
"""
# Serializing "extra" type as BlobType.
# uvarint(len(extra)) || extra
state.tx_prefix_hasher.uvarint(len(extra))
state.tx_prefix_hasher.buffer(extra)
state.tx_prefix_hash = state.tx_prefix_hasher.get_digest()
state.tx_prefix_hasher = None
state.full_message_hasher.set_message(state.tx_prefix_hash)
def _out_pk(state: State) -> None:
"""
Hashes out_pk into the full message.
"""
if state.output_count != len(state.output_pk_commitments):
raise ValueError("Invalid number of ecdh")
for out in state.output_pk_commitments:
state.full_message_hasher.set_out_pk_commitment(out)

@ -13,16 +13,10 @@ on output masks as pseudo outputs have to remain same.
import gc
from typing import TYPE_CHECKING
from trezor import utils
from apps.monero import layout
from apps.monero.xmr import crypto, crypto_helpers
from .state import State
if TYPE_CHECKING:
from trezor.messages import MoneroTransactionSourceEntry
from trezor.messages import MoneroTransactionSignInputAck
from .state import State
async def sign_input(
@ -49,12 +43,22 @@ async def sign_input(
:param orig_idx: original index of the src_entr before sorting (HMAC check)
:return: Generated signature MGs[i]
"""
from trezor import utils
from apps.monero import layout
from apps.monero.xmr import crypto_helpers
from apps.monero.xmr import crypto
ensure = utils.ensure # local_cache_attribute
mem_trace = state.mem_trace # local_cache_attribute
input_count = state.input_count # local_cache_attribute
outputs = src_entr.outputs # local_cache_attribute
await layout.transaction_step(state, state.STEP_SIGN, state.current_input_index + 1)
state.current_input_index += 1
if state.last_step not in (state.STEP_ALL_OUT, state.STEP_SIGN):
raise ValueError("Invalid state transition")
if state.current_input_index >= state.input_count:
if state.current_input_index >= input_count:
raise ValueError("Invalid inputs count")
if pseudo_out is None:
raise ValueError("SimpleRCT requires pseudo_out but none provided")
@ -78,11 +82,11 @@ async def sign_input(
if state.current_input_index > 0 and state.last_ki <= cur_ki:
raise ValueError("Key image order invalid")
state.last_ki = cur_ki if state.current_input_index < state.input_count else None
state.last_ki = cur_ki if state.current_input_index < input_count else None
del (cur_ki, vini_bin, vini_hmac, vini_hmac_comp)
gc.collect()
state.mem_trace(1, True)
mem_trace(1, True)
from apps.monero.xmr import chacha_poly
@ -94,9 +98,9 @@ async def sign_input(
)
# Last pseudo_out is recomputed so mask sums hold
if input_position + 1 == state.input_count:
if input_position + 1 == input_count:
# Recompute the lash alpha so the sum holds
state.mem_trace("Correcting alpha")
mem_trace("Correcting alpha")
alpha_diff = crypto.sc_sub_into(None, state.sumout, state.sumpouts_alphas)
crypto.sc_add_into(pseudo_out_alpha, pseudo_out_alpha, alpha_diff)
pseudo_out_c = crypto.gen_commitment_into(
@ -104,8 +108,8 @@ async def sign_input(
)
else:
if input_position + 1 == state.input_count:
utils.ensure(
if input_position + 1 == input_count:
ensure(
crypto.sc_eq(state.sumpouts_alphas, state.sumout) != 0, "Sum eq error"
)
@ -120,7 +124,7 @@ async def sign_input(
pseudo_out_c = crypto_helpers.decodepoint(pseudo_out)
state.mem_trace(2, True)
mem_trace(2, True)
# Spending secret
spend_key = crypto_helpers.decodeint(
@ -139,7 +143,7 @@ async def sign_input(
spend_enc,
)
utils.unimport_end(mods)
state.mem_trace(3, True)
mem_trace(3, True)
# Basic setup, sanity check
from apps.monero.xmr.serialize_messages.tx_ct_key import CtKey
@ -148,14 +152,14 @@ async def sign_input(
input_secret_key = CtKey(spend_key, crypto_helpers.decodeint(src_entr.mask))
# Private key correctness test
utils.ensure(
ensure(
crypto.point_eq(
crypto_helpers.decodepoint(src_entr.outputs[src_entr.real_output].key.dest),
crypto.scalarmult_base_into(None, input_secret_key.dest),
),
"Real source entry's destination does not equal spend key's",
)
utils.ensure(
ensure(
crypto.point_eq(
crypto_helpers.decodepoint(
src_entr.outputs[src_entr.real_output].key.commitment
@ -165,19 +169,19 @@ async def sign_input(
"Real source entry's mask does not equal spend key's",
)
state.mem_trace(4, True)
mem_trace(4, True)
from apps.monero.xmr import clsag
mg_buffer = []
ring_pubkeys = [x.key for x in src_entr.outputs if x]
utils.ensure(len(ring_pubkeys) == len(src_entr.outputs), "Invalid ring")
ring_pubkeys = [x.key for x in outputs if x]
ensure(len(ring_pubkeys) == len(outputs), "Invalid ring")
del src_entr
state.mem_trace(5, True)
mem_trace(5, True)
assert state.full_message is not None
state.mem_trace("CLSAG")
mem_trace("CLSAG")
clsag.generate_clsag_simple(
state.full_message,
ring_pubkeys,
@ -189,16 +193,16 @@ async def sign_input(
)
del (CtKey, input_secret_key, pseudo_out_alpha, clsag, ring_pubkeys)
state.mem_trace(6, True)
mem_trace(6, True)
from trezor.messages import MoneroTransactionSignInputAck
# Encrypt signature, reveal once protocol finishes OK
utils.unimport_end(mods)
state.mem_trace(7, True)
mem_trace(7, True)
mg_buffer = _protect_signature(state, mg_buffer)
state.mem_trace(8, True)
mem_trace(8, True)
state.last_step = state.STEP_SIGN
return MoneroTransactionSignInputAck(
signature=mg_buffer, pseudo_out=crypto_helpers.encodepoint(pseudo_out_c)

@ -8,23 +8,31 @@ so we can recover it just from the transaction and the spend key.
The private tx keys are used in other numerous Monero features.
"""
from trezor.crypto import random
from trezor.messages import MoneroTransactionFinalAck
from typing import TYPE_CHECKING
from apps.monero import misc
from apps.monero.xmr import chacha_poly, crypto, crypto_helpers
from .state import State
if TYPE_CHECKING:
from trezor.messages import MoneroTransactionFinalAck
from .state import State
def final_msg(state: State) -> MoneroTransactionFinalAck:
from trezor.messages import MoneroTransactionFinalAck
from apps.monero.xmr import chacha_poly
from trezor.crypto import random
from apps.monero import misc
from apps.monero.xmr import crypto, crypto_helpers
if state.last_step != state.STEP_SIGN:
raise ValueError("Invalid state transition")
if state.current_input_index != state.input_count - 1:
raise ValueError("Invalid input count")
tx_key, salt, rand_mult = _compute_tx_key(
state.creds.spend_key_private, state.tx_prefix_hash
# _compute_tx_key
salt = random.bytes(32)
rand_mult_num = crypto.random_scalar()
rand_mult = crypto_helpers.encodeint(rand_mult_num)
tx_key = misc.compute_tx_key(
state.creds.spend_key_private, state.tx_prefix_hash, salt, rand_mult_num
)
key_buff = crypto_helpers.encodeint(state.tx_priv) + b"".join(
@ -40,15 +48,3 @@ def final_msg(state: State) -> MoneroTransactionFinalAck:
tx_enc_keys=tx_enc_keys,
opening_key=state.opening_key,
)
def _compute_tx_key(
spend_key_private: crypto.Scalar, tx_prefix_hash: bytes
) -> tuple[bytes, bytes, bytes]:
salt = random.bytes(32)
rand_mult_num = crypto.random_scalar()
rand_mult = crypto_helpers.encodeint(rand_mult_num)
tx_key = misc.compute_tx_key(spend_key_private, tx_prefix_hash, salt, rand_mult_num)
return tx_key, salt, rand_mult

@ -1,25 +1,18 @@
from typing import TYPE_CHECKING
from trezor.crypto import monero as tcry
if TYPE_CHECKING:
from trezor.messages import MoneroAccountPublicAddress
from trezor.messages import MoneroTransactionDestinationEntry
def addr_to_hash(addr: MoneroAccountPublicAddress) -> bytes:
"""
Creates hashable address representation
"""
return bytes(addr.spend_public_key + addr.view_public_key)
def encode_addr(
version, spend_pub: bytes, view_pub: bytes, payment_id: bytes | None = None
) -> str:
"""
Builds Monero address from public keys
"""
from trezor.crypto import monero as tcry
buf = spend_pub + view_pub
if payment_id:
buf += bytes(payment_id)
@ -38,15 +31,18 @@ def classify_subaddresses(
single_dest_subaddress: MoneroAccountPublicAddress | None = None
addr_set = set()
for tx in tx_dests:
if change_addr and addr_eq(change_addr, tx.addr):
addr = tx.addr # local_cache_attribute
if change_addr and addr_eq(change_addr, addr):
continue
addr_hashed = addr_to_hash(tx.addr)
# addr_to_hash
# Creates hashable address representation
addr_hashed = bytes(addr.spend_public_key + addr.view_public_key)
if addr_hashed in addr_set:
continue
addr_set.add(addr_hashed)
if tx.is_subaddress:
num_subaddresses += 1
single_dest_subaddress = tx.addr
single_dest_subaddress = addr
else:
num_stdaddresses += 1
return num_stdaddresses, num_subaddresses, single_dest_subaddress

File diff suppressed because it is too large Load Diff

@ -1,10 +1,12 @@
from trezor.crypto import chacha20poly1305 as ChaCha20Poly1305, monero, random
from trezor.crypto import chacha20poly1305 as ChaCha20Poly1305
def encrypt(key: bytes, plaintext: bytes, associated_data: bytes | None = None):
"""
Uses ChaCha20Poly1305 for encryption
"""
from trezor.crypto import random
nonce = random.bytes(12)
cipher = ChaCha20Poly1305(key, nonce)
if associated_data:
@ -14,7 +16,7 @@ def encrypt(key: bytes, plaintext: bytes, associated_data: bytes | None = None):
return nonce, ciphertext + tag, b""
def decrypt(
def _decrypt(
key: bytes,
iv: bytes,
ciphertext: bytes,
@ -24,6 +26,8 @@ def decrypt(
"""
ChaCha20Poly1305 decryption
"""
from trezor.crypto import monero
cipher = ChaCha20Poly1305(key, iv)
if associated_data:
cipher.auth(associated_data)
@ -43,4 +47,4 @@ def encrypt_pack(key: bytes, plaintext: bytes, associated_data: bytes | None = N
def decrypt_pack(key: bytes, ciphertext: bytes):
cp = memoryview(ciphertext)
return decrypt(key, cp[:12], cp[12:], None)
return _decrypt(key, cp[:12], cp[12:], None)

@ -45,18 +45,16 @@ Author: Dusan Klinec, ph4r05, 2018
import gc
from typing import TYPE_CHECKING
from apps.monero.xmr import crypto, crypto_helpers
from apps.monero.xmr.serialize import int_serialize
if TYPE_CHECKING:
from typing import Any, TypeGuard, TypeVar
from .serialize_messages.tx_ct_key import CtKey
from trezor.messages import MoneroRctKeyPublic
from apps.monero.xmr import crypto
T = TypeVar("T")
def list_of_type(lst: list[Any], typ: type[T]) -> TypeGuard[list[T]]:
def _list_of_type(lst: list[Any], typ: type[T]) -> TypeGuard[list[T]]:
...
@ -88,6 +86,8 @@ def generate_clsag_simple(
:param index: specifies corresponding public key to the `in_sk` in the pubs array
:param mg_buff: buffer to store the signature to
"""
from apps.monero.xmr import crypto
cols = len(pubs)
if cols == 0:
raise ValueError("Empty pubs")
@ -118,23 +118,32 @@ def _generate_clsag(
index: int,
mg_buff: list[bytearray],
) -> list[bytes]:
sI = crypto.Point() # sig.I
sD = crypto.Point() # sig.D
sc1 = crypto.Scalar() # sig.c1
from apps.monero.xmr import crypto, crypto_helpers
from apps.monero.xmr.serialize import int_serialize
Point = crypto.Point # local_cache_attribute
Scalar = crypto.Scalar # local_cache_attribute
encodepoint_into = crypto.encodepoint_into # local_cache_attribute
sc_mul_into = crypto.sc_mul_into # local_cache_attribute
scalarmult_into = crypto.scalarmult_into # local_cache_attribute
sI = Point() # sig.I
sD = Point() # sig.D
sc1 = Scalar() # sig.c1
a = crypto.random_scalar()
H = crypto.Point()
D = crypto.Point()
H = Point()
D = Point()
Cout_bf = crypto_helpers.encodepoint(Cout)
tmp_sc = crypto.Scalar()
tmp = crypto.Point()
tmp_sc = Scalar()
tmp = Point()
tmp_bf = bytearray(32)
crypto.hash_to_point_into(H, P[index])
crypto.scalarmult_into(sI, H, p) # I = p*H
crypto.scalarmult_into(D, H, z) # D = z*H
crypto.sc_mul_into(tmp_sc, z, crypto_helpers.INV_EIGHT_SC) # 1/8*z
crypto.scalarmult_into(sD, H, tmp_sc) # sig.D = 1/8*z*H
scalarmult_into(sI, H, p) # I = p*H
scalarmult_into(D, H, z) # D = z*H
sc_mul_into(tmp_sc, z, crypto_helpers.INV_EIGHT_SC) # 1/8*z
scalarmult_into(sD, H, tmp_sc) # sig.D = 1/8*z*H
sD = crypto_helpers.encodepoint(sD)
hsh_P = crypto_helpers.get_keccak() # domain, I, D, P, C, C_offset
@ -153,7 +162,7 @@ def _generate_clsag(
for x in C_nonzero:
hsh_PC(x)
hsh_PC(crypto.encodepoint_into(tmp_bf, sI))
hsh_PC(encodepoint_into(tmp_bf, sI))
hsh_PC(sD)
hsh_PC(Cout_bf)
mu_P = crypto_helpers.decodeint(hsh_P.digest())
@ -161,26 +170,27 @@ def _generate_clsag(
del (hsh_PC, hsh_P, hsh_C)
c_to_hash = crypto_helpers.get_keccak() # domain, P, C, C_offset, message, aG, aH
c_to_hash.update(_HASH_KEY_CLSAG_ROUND)
update = c_to_hash.update # local_cache_attribute
update(_HASH_KEY_CLSAG_ROUND)
for i in range(len(P)):
c_to_hash.update(P[i])
update(P[i])
for i in range(len(P)):
c_to_hash.update(C_nonzero[i])
c_to_hash.update(Cout_bf)
c_to_hash.update(message)
update(C_nonzero[i])
update(Cout_bf)
update(message)
chasher = c_to_hash.copy()
crypto.scalarmult_base_into(tmp, a)
chasher.update(crypto.encodepoint_into(tmp_bf, tmp)) # aG
crypto.scalarmult_into(tmp, H, a)
chasher.update(crypto.encodepoint_into(tmp_bf, tmp)) # aH
chasher.update(encodepoint_into(tmp_bf, tmp)) # aG
scalarmult_into(tmp, H, a)
chasher.update(encodepoint_into(tmp_bf, tmp)) # aH
c = crypto_helpers.decodeint(chasher.digest())
del (chasher, H)
L = crypto.Point()
R = crypto.Point()
c_p = crypto.Scalar()
c_c = crypto.Scalar()
L = Point()
R = Point()
c_p = Scalar()
c_c = Scalar()
i = (index + 1) % len(P)
if i == 0:
crypto.sc_copy(sc1, c)
@ -193,24 +203,24 @@ def _generate_clsag(
crypto.random_scalar(tmp_sc)
crypto.encodeint_into(mg_buff[i + 1], tmp_sc)
crypto.sc_mul_into(c_p, mu_P, c)
crypto.sc_mul_into(c_c, mu_C, c)
sc_mul_into(c_p, mu_P, c)
sc_mul_into(c_c, mu_C, c)
# L = tmp_sc * G + c_P * P[i] + c_c * C[i]
crypto.add_keys2_into(L, tmp_sc, c_p, crypto.decodepoint_into(tmp, P[i]))
crypto.decodepoint_into(tmp, C_nonzero[i]) # C = C_nonzero - Cout
crypto.point_sub_into(tmp, tmp, Cout)
crypto.scalarmult_into(tmp, tmp, c_c)
scalarmult_into(tmp, tmp, c_c)
crypto.point_add_into(L, L, tmp)
# R = tmp_sc * HP + c_p * I + c_c * D
crypto.hash_to_point_into(tmp, P[i])
crypto.add_keys3_into(R, tmp_sc, tmp, c_p, sI)
crypto.point_add_into(R, R, crypto.scalarmult_into(tmp, D, c_c))
crypto.point_add_into(R, R, scalarmult_into(tmp, D, c_c))
chasher = c_to_hash.copy()
chasher.update(crypto.encodepoint_into(tmp_bf, L))
chasher.update(crypto.encodepoint_into(tmp_bf, R))
chasher.update(encodepoint_into(tmp_bf, L))
chasher.update(encodepoint_into(tmp_bf, R))
crypto.decodeint_into(c, chasher.digest())
P[i] = None # type: ignore
@ -224,13 +234,13 @@ def _generate_clsag(
gc.collect()
# Final scalar = a - c * (mu_P * p + mu_c * Z)
crypto.sc_mul_into(tmp_sc, mu_P, p)
sc_mul_into(tmp_sc, mu_P, p)
crypto.sc_muladd_into(tmp_sc, mu_C, z, tmp_sc)
crypto.sc_mulsub_into(tmp_sc, c, tmp_sc, a)
crypto.encodeint_into(mg_buff[index + 1], tmp_sc)
if TYPE_CHECKING:
assert list_of_type(mg_buff, bytes)
assert _list_of_type(mg_buff, bytes)
mg_buff.append(crypto_helpers.encodeint(sc1))
mg_buff.append(sD)

@ -1,8 +1,8 @@
from trezor.enums import MoneroNetworkType
from typing import TYPE_CHECKING
from apps.monero.xmr import crypto, crypto_helpers
from apps.monero.xmr.addresses import encode_addr
from apps.monero.xmr.networks import net_version
if TYPE_CHECKING:
from trezor.enums import MoneroNetworkType
from apps.monero.xmr import crypto
class AccountCreds:
@ -31,8 +31,16 @@ class AccountCreds:
cls,
priv_view_key: crypto.Scalar,
priv_spend_key: crypto.Scalar,
network_type: MoneroNetworkType = MoneroNetworkType.MAINNET,
network_type: MoneroNetworkType | None = None,
) -> "AccountCreds":
from apps.monero.xmr import crypto, crypto_helpers
from apps.monero.xmr.addresses import encode_addr
from apps.monero.xmr.networks import net_version
from trezor.enums import MoneroNetworkType
if network_type is None:
network_type = MoneroNetworkType.MAINNET
pub_view_key = crypto.scalarmult_base_into(None, priv_view_key)
pub_spend_key = crypto.scalarmult_base_into(None, priv_spend_key)
addr = encode_addr(
@ -41,10 +49,10 @@ class AccountCreds:
crypto_helpers.encodepoint(pub_view_key),
)
return cls(
view_key_private=priv_view_key,
spend_key_private=priv_spend_key,
view_key_public=pub_view_key,
spend_key_public=pub_spend_key,
address=addr,
network_type=network_type,
priv_view_key,
priv_spend_key,
pub_view_key,
pub_spend_key,
addr,
network_type,
)

@ -7,13 +7,20 @@
# https://tools.ietf.org/html/draft-josefsson-eddsa-ed25519-00#section-4
# https://github.com/monero-project/research-lab
from typing import TYPE_CHECKING
from trezor.crypto import monero as tcry
from trezor.crypto.hashlib import sha3_256
if TYPE_CHECKING:
from trezor.crypto.hashlib import sha3_256
NULL_KEY_ENC = b"\x00" * 32
def get_keccak(data: bytes | None = None) -> sha3_256:
from trezor.crypto.hashlib import sha3_256
return sha3_256(data=data, keccak=True)

@ -1,12 +1,9 @@
from typing import TYPE_CHECKING
from trezor.utils import HashWriter
from apps.monero.xmr import crypto_helpers
from apps.monero.xmr.serialize import int_serialize
if TYPE_CHECKING:
from trezor.utils import HashContext
from trezor.utils import HashContext, HashWriter
class KeccakXmrArchive:
@ -27,6 +24,9 @@ class KeccakXmrArchive:
def get_keccak_writer(ctx: HashContext | None = None) -> HashWriter:
from trezor.utils import HashWriter
from apps.monero.xmr import crypto_helpers
if ctx is None:
ctx = crypto_helpers.get_keccak()
return HashWriter(ctx)

@ -1,85 +1,66 @@
from typing import TYPE_CHECKING
from apps.monero.xmr import crypto, crypto_helpers, monero
from apps.monero.xmr.serialize.int_serialize import dump_uvarint_b
from apps.monero.xmr import crypto_helpers, monero
if TYPE_CHECKING:
from apps.monero.xmr.credentials import AccountCreds
from trezor.messages import MoneroTransferDetails
from apps.monero.xmr import crypto
Subaddresses = dict[bytes, tuple[int, int]]
Sig = list[list[crypto.Scalar]]
def compute_hash(rr: MoneroTransferDetails) -> bytes:
from apps.monero.xmr.serialize.int_serialize import dump_uvarint_b
kck = crypto_helpers.get_keccak()
kck.update(rr.out_key)
kck.update(rr.tx_pub_key)
update = kck.update # local_cache_attribute
update(rr.out_key)
update(rr.tx_pub_key)
if rr.additional_tx_pub_keys:
for x in rr.additional_tx_pub_keys:
kck.update(x)
kck.update(dump_uvarint_b(rr.internal_output_index))
update(x)
update(dump_uvarint_b(rr.internal_output_index))
return kck.digest()
def export_key_image(
creds: AccountCreds, subaddresses: Subaddresses, td: MoneroTransferDetails
) -> tuple[crypto.Point, Sig]:
out_key = crypto_helpers.decodepoint(td.out_key)
tx_pub_key = crypto_helpers.decodepoint(td.tx_pub_key)
decodepoint = crypto_helpers.decodepoint # local_cache_attribute
out_key = decodepoint(td.out_key)
tx_pub_key = decodepoint(td.tx_pub_key)
additional_tx_pub_keys = td.additional_tx_pub_keys # local_cache_attribute
additional_tx_pub_key = None
if len(td.additional_tx_pub_keys) == 1: # compression
additional_tx_pub_key = crypto_helpers.decodepoint(td.additional_tx_pub_keys[0])
elif td.additional_tx_pub_keys:
if td.internal_output_index >= len(td.additional_tx_pub_keys):
if len(additional_tx_pub_keys) == 1: # compression
additional_tx_pub_key = decodepoint(additional_tx_pub_keys[0])
elif additional_tx_pub_keys:
if td.internal_output_index >= len(additional_tx_pub_keys):
raise ValueError("Wrong number of additional derivations")
additional_tx_pub_key = crypto_helpers.decodepoint(
td.additional_tx_pub_keys[td.internal_output_index]
additional_tx_pub_key = decodepoint(
additional_tx_pub_keys[td.internal_output_index]
)
ki, sig = _export_key_image(
# _export_key_image
# Generates key image for the TXO + signature for the key image
r = monero.generate_tx_spend_and_key_image_and_derivation(
creds,
subaddresses,
out_key,
tx_pub_key,
additional_tx_pub_key,
td.internal_output_index,
True,
td.sub_addr_major,
td.sub_addr_minor,
)
return ki, sig
def _export_key_image(
creds: AccountCreds,
subaddresses: Subaddresses,
pkey: crypto.Point,
tx_pub_key: crypto.Point,
additional_tx_pub_key: crypto.Point | None,
out_idx: int,
test: bool = True,
sub_addr_major: int | None = None,
sub_addr_minor: int | None = None,
) -> tuple[crypto.Point, Sig]:
"""
Generates key image for the TXO + signature for the key image
"""
r = monero.generate_tx_spend_and_key_image_and_derivation(
creds,
subaddresses,
pkey,
tx_pub_key,
additional_tx_pub_key,
out_idx,
sub_addr_major,
sub_addr_minor,
)
xi, ki, _ = r[:3]
phash = crypto_helpers.encodepoint(ki)
sig = generate_ring_signature(phash, ki, [pkey], xi, 0, test)
sig = generate_ring_signature(phash, ki, [out_key], xi, 0, True)
return ki, sig
@ -97,6 +78,10 @@ def generate_ring_signature(
void crypto_ops::generate_ring_signature()
"""
from trezor.utils import memcpy
from apps.monero.xmr import crypto
Scalar = crypto.Scalar # local_cache_attribute
encodepoint_into = crypto.encodepoint_into # local_cache_attribute
if test:
t = crypto.scalarmult_base_into(None, sec)
@ -114,23 +99,23 @@ def generate_ring_signature(
memcpy(buff, 0, prefix_hash, 0, buff_off)
mvbuff = memoryview(buff)
sum = crypto.Scalar(0)
k = crypto.Scalar(0)
sum = Scalar(0)
k = Scalar(0)
sig = []
for _ in range(len(pubs)):
sig.append([crypto.Scalar(0), crypto.Scalar(0)]) # c, r
sig.append([Scalar(0), Scalar(0)]) # c, r
for i in range(len(pubs)):
if i == sec_idx:
k = crypto.random_scalar()
tmp3 = crypto.scalarmult_base_into(None, k)
crypto.encodepoint_into(mvbuff[buff_off : buff_off + 32], tmp3)
encodepoint_into(mvbuff[buff_off : buff_off + 32], tmp3)
buff_off += 32
tmp3 = crypto.hash_to_point_into(None, crypto_helpers.encodepoint(pubs[i]))
tmp2 = crypto.scalarmult_into(None, tmp3, k)
crypto.encodepoint_into(mvbuff[buff_off : buff_off + 32], tmp2)
encodepoint_into(mvbuff[buff_off : buff_off + 32], tmp2)
buff_off += 32
else:
@ -139,12 +124,12 @@ def generate_ring_signature(
tmp2 = crypto.ge25519_double_scalarmult_vartime_into(
None, tmp3, sig[i][0], sig[i][1]
)
crypto.encodepoint_into(mvbuff[buff_off : buff_off + 32], tmp2)
encodepoint_into(mvbuff[buff_off : buff_off + 32], tmp2)
buff_off += 32
tmp3 = crypto.hash_to_point_into(None, crypto_helpers.encodepoint(tmp3))
tmp2 = crypto.add_keys3_into(None, sig[i][1], tmp3, sig[i][0], image)
crypto.encodepoint_into(mvbuff[buff_off : buff_off + 32], tmp2)
encodepoint_into(mvbuff[buff_off : buff_off + 32], tmp2)
buff_off += 32
crypto.sc_add_into(sum, sum, sig[i][0])

@ -1,8 +1,5 @@
from typing import TYPE_CHECKING
from apps.monero.xmr import crypto_helpers
from apps.monero.xmr.keccak_hasher import KeccakXmrArchive
if TYPE_CHECKING:
from trezor.utils import HashContext
from .serialize_messages.tx_rsig_bulletproof import BulletproofPlus
@ -14,6 +11,9 @@ class PreMlsagHasher:
"""
def __init__(self) -> None:
from apps.monero.xmr import crypto_helpers
from apps.monero.xmr.keccak_hasher import KeccakXmrArchive
self.state = 0
self.kc_master: HashContext = crypto_helpers.get_keccak()
self.rsig_hasher: HashContext = crypto_helpers.get_keccak()
@ -61,6 +61,7 @@ class PreMlsagHasher:
) -> None:
if self.state == 8:
raise ValueError("State error")
update = self.rsig_hasher.update # local_cache_attribute
if raw:
# Avoiding problem with the memory fragmentation.
@ -69,22 +70,22 @@ class PreMlsagHasher:
# preserving the byte ordering
if isinstance(p, list):
for x in p:
self.rsig_hasher.update(x)
update(x)
else:
assert isinstance(p, bytes)
self.rsig_hasher.update(p)
update(p)
return
# Hash Bulletproof
fields = (p.A, p.A1, p.B, p.r1, p.s1, p.d1)
for fld in fields:
self.rsig_hasher.update(fld)
update(fld)
del (fields,)
for i in range(len(p.L)):
self.rsig_hasher.update(p.L[i])
update(p.L[i])
for i in range(len(p.R)):
self.rsig_hasher.update(p.R[i])
update(p.R[i])
def get_digest(self) -> bytes:
if self.state != 6:

@ -38,7 +38,7 @@ def get_subaddress_spend_public_key(
if major == 0 and minor == 0:
return spend_public
m = get_subaddress_secret_key(view_private, major=major, minor=minor)
m = get_subaddress_secret_key(view_private, major, minor)
M = crypto.scalarmult_base_into(None, m)
D = crypto.point_add_into(None, spend_public, M)
return D
@ -66,7 +66,7 @@ def generate_key_image(public_key: bytes, secret_key: crypto.Scalar) -> crypto.P
return point2
def is_out_to_account(
def _is_out_to_account(
subaddresses: Subaddresses,
out_key: crypto.Point,
derivation: crypto.Point,
@ -157,7 +157,7 @@ def generate_tx_spend_and_key_image(
scalar_step2 = scalar_step1
else:
subaddr_sk = get_subaddress_secret_key(
ack.view_key_private, major=received_index[0], minor=received_index[1]
ack.view_key_private, received_index[0], received_index[1]
)
scalar_step2 = crypto.sc_add_into(None, scalar_step1, subaddr_sk)
@ -223,7 +223,7 @@ def generate_tx_spend_and_key_image_and_derivation(
else None
)
subaddr_recv_info = is_out_to_account(
subaddr_recv_info = _is_out_to_account(
subaddresses,
out_key,
recv_derivation,
@ -266,7 +266,7 @@ def compute_subaddresses(
continue
pub = get_subaddress_spend_public_key(
creds.view_key_private, creds.spend_key_public, major=account, minor=idx
creds.view_key_private, creds.spend_key_public, account, idx
)
pub = crypto_helpers.encodepoint(pub)
subaddresses[pub] = (account, idx)
@ -299,7 +299,7 @@ def generate_sub_address_keys(
if major == 0 and minor == 0: # special case, Monero-defined
return spend_pub, crypto.scalarmult_base_into(None, view_sec)
m = get_subaddress_secret_key(view_sec, major=major, minor=minor)
m = get_subaddress_secret_key(view_sec, major, minor)
M = crypto.scalarmult_base_into(None, m)
D = crypto.point_add_into(None, spend_pub, M)
C = crypto.scalarmult_into(None, D, view_sec)

@ -1,4 +1,7 @@
from trezor.enums import MoneroNetworkType
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from trezor.enums import MoneroNetworkType
class MainNet:
@ -20,13 +23,18 @@ class StageNet:
def net_version(
network_type: MoneroNetworkType = MoneroNetworkType.MAINNET,
network_type: MoneroNetworkType = None,
is_subaddr: bool = False,
is_integrated: bool = False,
) -> bytes:
"""
Network version bytes used for address construction
"""
from trezor.enums import MoneroNetworkType
if network_type is None:
network_type = MoneroNetworkType.MAINNET
if is_integrated and is_subaddr:
raise ValueError("Subaddress cannot be integrated")

@ -11,7 +11,7 @@ Author: Dusan Klinec, ph4r05, 2018
import gc
from typing import TYPE_CHECKING
from apps.monero.xmr import crypto, crypto_helpers
from apps.monero.xmr import crypto
if TYPE_CHECKING:
from apps.monero.xmr.serialize_messages.tx_rsig_bulletproof import (
@ -40,6 +40,7 @@ def verify_bp(
) -> bool:
"""Verifies Bulletproof"""
from apps.monero.xmr import bulletproof as bp
from apps.monero.xmr import crypto_helpers
if amounts:
bp_proof.V = []

@ -1,7 +1,5 @@
from typing import TYPE_CHECKING
from apps.monero.xmr.serialize.int_serialize import dump_uvarint, load_uvarint
if TYPE_CHECKING:
from typing import Protocol, TypeVar, Union
@ -40,8 +38,12 @@ if TYPE_CHECKING:
class UVarintType:
@staticmethod
def load(reader: Reader) -> int:
from apps.monero.xmr.serialize.int_serialize import load_uvarint
return load_uvarint(reader)
@staticmethod
def dump(writer: Writer, n: int) -> None:
from apps.monero.xmr.serialize.int_serialize import dump_uvarint
return dump_uvarint(writer, n)

@ -59,15 +59,17 @@ class MemoryReaderWriter:
return nread
def write(self, buf: bytes) -> None:
assert isinstance(self.buffer, bytearray)
buffer = self.buffer # local_cache_attribute
assert isinstance(buffer, bytearray)
nwritten = len(buf)
nall = len(self.buffer)
nall = len(buffer)
towrite = nwritten
bufoff = 0
# Fill existing place in the buffer
while towrite > 0 and nall - self.woffset > 0:
self.buffer[self.woffset] = buf[bufoff]
buffer[self.woffset] = buf[bufoff]
self.woffset += 1
bufoff += 1
towrite -= 1
@ -83,7 +85,7 @@ class MemoryReaderWriter:
bufoff += 1
towrite -= 1
self.buffer.extend(chunk)
buffer.extend(chunk)
if self.do_gc:
chunk = None # dereference
gc.collect()

@ -1,6 +1,4 @@
from apps.monero.xmr.serialize.base_types import UVarintType
from apps.monero.xmr.serialize.message_types import ContainerType, MessageType
from apps.monero.xmr.serialize_messages.base import KeyImage
from apps.monero.xmr.serialize.message_types import MessageType
class TxinToKey(MessageType):
@ -8,6 +6,10 @@ class TxinToKey(MessageType):
@classmethod
def f_specs(cls) -> tuple:
from apps.monero.xmr.serialize.base_types import UVarintType
from apps.monero.xmr.serialize.message_types import ContainerType
from apps.monero.xmr.serialize_messages.base import KeyImage
return (
("amount", UVarintType),
("key_offsets", ContainerType, UVarintType),

@ -1,4 +1,3 @@
from micropython import const
from typing import TYPE_CHECKING
from apps.monero.xmr.serialize.message_types import ContainerType, MessageType
@ -9,7 +8,7 @@ if TYPE_CHECKING:
class _KeyV(ContainerType):
FIX_SIZE = const(0)
FIX_SIZE = 0
ELEM_TYPE: XmrType[bytes] = ECKey

Loading…
Cancel
Save