feat(core): Implement SLIP-0025 CoinJoin accounts.

andrewkozlik/slip25b
Andrew Kozlik 2 years ago
parent 5aa356f044
commit 3529748be1

@ -0,0 +1 @@
Add SLIP-0025 CoinJoin accounts.

@ -4,7 +4,7 @@ import storage.cache
import storage.device
from trezor import config, utils, wire, workflow
from trezor.enums import MessageType
from trezor.messages import Success
from trezor.messages import Success, UnlockPath
from . import workflow_handlers
@ -183,6 +183,58 @@ async def handle_DoPreauthorized(
return await handler(ctx, req, authorization.get()) # type: ignore [Expected 2 positional arguments]
async def handle_UnlockPath(ctx: wire.Context, msg: UnlockPath) -> protobuf.MessageType:
from trezor.crypto import hmac
from trezor.messages import UnlockedPathRequest
from trezor.ui.layouts import confirm_action
from apps.common.paths import SLIP25_PURPOSE
from apps.common.seed import Slip21Node, get_seed
from apps.common.writers import write_uint32_le
_KEYCHAIN_MAC_KEY_PATH = [b"TREZOR", b"Keychain MAC key"]
# UnlockPath is relevant only for SLIP-25 paths.
# Note: Currently we only allow unlocking the entire SLIP-25 purpose subtree instead of
# per-coin or per-account unlocking in order to avoid UI complexity and because the keychain
# pattern handling would need to account for this.
if len(msg.address_n) != 1 or msg.address_n[0] != SLIP25_PURPOSE:
raise wire.DataError("Invalid path")
seed = await get_seed(ctx)
node = Slip21Node(seed)
node.derive_path(_KEYCHAIN_MAC_KEY_PATH)
mac = utils.HashWriter(hmac(hmac.SHA256, node.key()))
for i in msg.address_n:
write_uint32_le(mac, i)
expected_mac = mac.get_digest()
# Require confirmation to access SLIP25 paths unless already authorized.
if msg.mac:
if len(msg.mac) != len(expected_mac) or not utils.consteq(
expected_mac, msg.mac
):
raise wire.DataError("Invalid MAC")
else:
await confirm_action(
ctx,
"confirm_coinjoin_access",
title="CoinJoin account",
description="Do you want to allow access to CoinJoin accounts?",
)
wire_types = (MessageType.GetAddress, MessageType.GetPublicKey, MessageType.SignTx)
req = await ctx.call_any(UnlockedPathRequest(mac=expected_mac), *wire_types)
assert req.MESSAGE_WIRE_TYPE is not None
handler = workflow_handlers.find_registered_handler(
ctx.iface, req.MESSAGE_WIRE_TYPE
)
if handler is None:
return wire.unexpected_message()
return await handler(ctx, req, msg) # type: ignore [Expected 2 positional arguments]
async def handle_CancelAuthorization(
ctx: wire.Context, msg: CancelAuthorization
) -> protobuf.MessageType:
@ -293,6 +345,7 @@ def boot() -> None:
workflow_handlers.register(MessageType.EndSession, handle_EndSession)
workflow_handlers.register(MessageType.Ping, handle_Ping)
workflow_handlers.register(MessageType.DoPreauthorized, handle_DoPreauthorized)
workflow_handlers.register(MessageType.UnlockPath, handle_UnlockPath)
workflow_handlers.register(
MessageType.CancelAuthorization, handle_CancelAuthorization
)

@ -2,7 +2,7 @@ from micropython import const
from typing import TYPE_CHECKING
from trezor import utils, wire
from trezor.messages import AuthorizeCoinJoin
from trezor.messages import AuthorizeCoinJoin, GetPublicKey
from apps.common import authorization
@ -26,6 +26,10 @@ class CoinJoinAuthorization:
def __init__(self, params: AuthorizeCoinJoin) -> None:
self.params = params
def check_get_public_key(self, msg: GetPublicKey) -> bool:
# Check whether the current params matches the parameters of the XPUB request.
return self.params.address_n == msg.address_n
def check_get_ownership_proof(self, msg: GetOwnershipProof) -> bool:
# Check whether the current params matches the parameters of the request.
coordinator = utils.empty_bytearray(1 + len(self.params.coordinator.encode()))

@ -8,7 +8,8 @@ from trezor.strings import format_amount
from trezor.ui.layouts import confirm_action, confirm_coinjoin, confirm_metadata
from apps.common import authorization, safety_checks
from apps.common.paths import validate_path
from apps.common.keychain import FORBIDDEN_KEY_PATH
from apps.common.paths import SLIP25_PURPOSE, validate_path
from .authorization import FEE_RATE_DECIMALS
from .common import BIP32_WALLET_DEPTH
@ -47,6 +48,9 @@ async def authorize_coinjoin(
if not msg.address_n:
raise wire.DataError("Empty path not allowed.")
if msg.address_n[0] != SLIP25_PURPOSE and safety_checks.is_strict():
raise FORBIDDEN_KEY_PATH
await confirm_action(
ctx,
"coinjoin_coordinator",
@ -57,19 +61,19 @@ async def authorize_coinjoin(
icon=ui.ICON_RECOVERY,
)
max_fee_per_vbyte = format_amount(msg.max_fee_per_kvbyte, 3)
await confirm_coinjoin(ctx, coin.coin_name, msg.max_rounds, max_fee_per_vbyte)
validation_path = msg.address_n + [0] * BIP32_WALLET_DEPTH
await validate_path(
ctx,
keychain,
validation_path,
msg.address_n[0] == SLIP25_PURPOSE,
validate_path_against_script_type(
coin, address_n=validation_path, script_type=msg.script_type
),
)
max_fee_per_vbyte = format_amount(msg.max_fee_per_kvbyte, 3)
if msg.max_fee_per_kvbyte > coin.maxfee_kb:
await confirm_metadata(
ctx,
@ -80,6 +84,8 @@ async def authorize_coinjoin(
ButtonRequestType.FeeOverThreshold,
)
await confirm_coinjoin(ctx, coin.coin_name, msg.max_rounds, max_fee_per_vbyte)
authorization.set(msg)
return Success(message="CoinJoin authorized")

@ -2,21 +2,39 @@ from typing import TYPE_CHECKING
from trezor import wire
from trezor.enums import InputScriptType
from trezor.messages import HDNodeType, PublicKey
from trezor.messages import AuthorizeCoinJoin, HDNodeType, PublicKey, UnlockPath
from apps.common import coininfo, paths
from apps.common.keychain import get_keychain
from apps.common.keychain import FORBIDDEN_KEY_PATH, get_keychain
from . import authorization
if TYPE_CHECKING:
from trezor.messages import GetPublicKey
from trezor.protobuf import MessageType
async def get_public_key(ctx: wire.Context, msg: GetPublicKey) -> PublicKey:
async def get_public_key(
ctx: wire.Context, msg: GetPublicKey, auth_msg: MessageType | None = None
) -> PublicKey:
coin_name = msg.coin_name or "Bitcoin"
script_type = msg.script_type or InputScriptType.SPENDADDRESS
coin = coininfo.by_name(coin_name)
curve_name = msg.ecdsa_curve_name or coin.curve_name
# Require preauthorization to access SLIP25 paths.
if msg.address_n and msg.address_n[0] == paths.SLIP25_PURPOSE:
if auth_msg is not None and AuthorizeCoinJoin.is_type_of(auth_msg):
if not authorization.from_cached_message(auth_msg).check_get_public_key(
msg
):
raise wire.ProcessError("Unauthorized operation")
elif auth_msg is not None and UnlockPath.is_type_of(auth_msg):
if auth_msg.address_n != msg.address_n[: len(auth_msg.address_n)]:
raise FORBIDDEN_KEY_PATH
else:
raise FORBIDDEN_KEY_PATH
keychain = await get_keychain(ctx, curve_name, [paths.AlwaysMatchingSchema])
node = keychain.derive(msg.address_n)

@ -4,10 +4,11 @@ from typing import TYPE_CHECKING
from trezor import wire
from trezor.enums import InputScriptType
from trezor.messages import AuthorizeCoinJoin, GetAddress, UnlockPath
from apps.common import coininfo
from apps.common.keychain import get_keychain
from apps.common.paths import PATTERN_BIP44, PathSchema
from apps.common.paths import PATTERN_BIP44, SLIP25_PURPOSE, PathSchema
from . import authorization
from .common import BITCOIN_NAMES
@ -19,8 +20,6 @@ if TYPE_CHECKING:
from trezor.protobuf import MessageType
from trezor.messages import (
AuthorizeCoinJoin,
GetAddress,
GetOwnershipId,
GetOwnershipProof,
GetPublicKey,
@ -66,6 +65,10 @@ PATTERN_BIP49 = "m/49'/coin_type'/account'/change/address_index"
PATTERN_BIP84 = "m/84'/coin_type'/account'/change/address_index"
# BIP-86 for taproot: https://github.com/bitcoin/bips/blob/master/bip-0086.mediawiki
PATTERN_BIP86 = "m/86'/coin_type'/account'/change/address_index"
# SLIP-25 for CoinJoin: https://github.com/satoshilabs/slips/blob/master/slip-0025.md
# Only account=0 and script_type=1 are supported for now.
PATTERN_SLIP25_TAPROOT = "m/10025'/coin_type'/0'/1'/change/address_index"
PATTERN_SLIP25_TAPROOT_EXTERNAL = "m/10025'/coin_type'/0'/1'/0/address_index"
# compatibility patterns, will be removed in the future
PATTERN_GREENADDRESS_A = "m/[1,4]/address_index"
@ -151,13 +154,16 @@ def validate_path_against_script_type(
elif coin.taproot and script_type == InputScriptType.SPENDTAPROOT:
patterns.append(PATTERN_BIP86)
patterns.append(PATTERN_SLIP25_TAPROOT)
return any(
PathSchema.parse(pattern, coin.slip44).match(address_n) for pattern in patterns
)
def get_schemas_for_coin(coin: coininfo.CoinInfo) -> Iterable[PathSchema]:
def get_schemas_for_coin(
coin: coininfo.CoinInfo, unlock_pattern: str | None = None
) -> Iterable[PathSchema]:
# basic patterns
patterns = [
PATTERN_BIP44,
@ -206,6 +212,9 @@ def get_schemas_for_coin(coin: coininfo.CoinInfo) -> Iterable[PathSchema]:
if coin.taproot:
patterns.append(PATTERN_BIP86)
if unlock_pattern:
patterns.append(unlock_pattern)
schemas = [PathSchema.parse(pattern, coin.slip44) for pattern in patterns]
# Some wallets such as Electron-Cash (BCH) store coins on Bitcoin paths.
@ -234,23 +243,44 @@ def get_coin_by_name(coin_name: str | None) -> coininfo.CoinInfo:
async def get_keychain_for_coin(
ctx: wire.Context, coin_name: str | None
ctx: wire.Context, coin_name: str | None, unlock_pattern: str | None = None
) -> tuple[Keychain, coininfo.CoinInfo]:
coin = get_coin_by_name(coin_name)
schemas = get_schemas_for_coin(coin)
schemas = get_schemas_for_coin(coin, unlock_pattern)
slip21_namespaces = [[b"SLIP-0019"], [b"SLIP-0024"]]
keychain = await get_keychain(ctx, coin.curve_name, schemas, slip21_namespaces)
return keychain, coin
def _get_unlock_pattern(
msg: MessageType, auth_msg: MessageType | None = None
) -> str | None:
if AuthorizeCoinJoin.is_type_of(msg):
return PATTERN_SLIP25_TAPROOT
if (
auth_msg is not None
and (AuthorizeCoinJoin.is_type_of(auth_msg) or UnlockPath.is_type_of(auth_msg))
and auth_msg.address_n
and auth_msg.address_n[0] == SLIP25_PURPOSE
):
if GetAddress.is_type_of(msg):
return PATTERN_SLIP25_TAPROOT_EXTERNAL
else:
return PATTERN_SLIP25_TAPROOT
return None
def with_keychain(func: HandlerWithCoinInfo[MsgOut]) -> Handler[MsgIn, MsgOut]:
async def wrapper(
ctx: wire.Context,
msg: MsgIn,
auth_msg: MessageType | None = None,
) -> MsgOut:
keychain, coin = await get_keychain_for_coin(ctx, msg.coin_name)
if auth_msg:
unlock_pattern = _get_unlock_pattern(msg, auth_msg)
keychain, coin = await get_keychain_for_coin(ctx, msg.coin_name, unlock_pattern)
if auth_msg is not None and AuthorizeCoinJoin.is_type_of(auth_msg):
auth_obj = authorization.from_cached_message(auth_msg)
return await func(ctx, msg, keychain, coin, auth_obj)
else:

@ -6,7 +6,11 @@ from trezor.enums import MessageType
from trezor.utils import ensure
WIRE_TYPES: dict[int, tuple[int, ...]] = {
MessageType.AuthorizeCoinJoin: (MessageType.SignTx, MessageType.GetOwnershipProof),
MessageType.AuthorizeCoinJoin: (
MessageType.GetOwnershipProof,
MessageType.GetPublicKey,
MessageType.SignTx,
),
}

@ -2,6 +2,7 @@ from micropython import const
from typing import TYPE_CHECKING
HARDENED = const(0x8000_0000)
SLIP25_PURPOSE = const(10025 | HARDENED)
if TYPE_CHECKING:
from typing import (

Loading…
Cancel
Save