1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-01-22 21:30:56 +00:00

feat(python): Implement entropy check workflow in device.reset().

This commit is contained in:
Andrew Kozlik 2024-09-04 17:03:32 +02:00 committed by Andrew Kozlik
parent 5d2fa78b3f
commit f573db6953
4 changed files with 137 additions and 9 deletions

View File

@ -0,0 +1 @@
Added support for entropy check workflow in device.reset().

View File

@ -24,6 +24,7 @@ import click
import requests
from .. import debuglink, device, exceptions, messages, ui
from ..tools import format_path
from . import ChoiceType, with_client
if t.TYPE_CHECKING:
@ -222,6 +223,7 @@ def recover(
@click.option("-s", "--skip-backup", is_flag=True)
@click.option("-n", "--no-backup", is_flag=True)
@click.option("-b", "--backup-type", type=ChoiceType(BACKUP_TYPE))
@click.option("-e", "--entropy-check-count", type=click.IntRange(0))
@with_client
def setup(
client: "TrezorClient",
@ -233,6 +235,7 @@ def setup(
skip_backup: bool,
no_backup: bool,
backup_type: messages.BackupType | None,
entropy_check_count: int | None,
) -> str:
"""Perform device setup and generate new seed."""
if strength:
@ -261,7 +264,7 @@ def setup(
"backup type. Traditional BIP39 backup may be generated instead."
)
return device.reset(
resp, path_xpubs = device.reset_entropy_check(
client,
strength=strength,
passphrase_protection=passphrase_protection,
@ -271,8 +274,17 @@ def setup(
skip_backup=skip_backup,
no_backup=no_backup,
backup_type=backup_type,
entropy_check_count=entropy_check_count,
)
if isinstance(resp, messages.Success):
click.echo("XPUBs for the generated seed")
for path, xpub in path_xpubs:
click.echo(f"{format_path(path)}: {xpub}")
return resp.message or ""
else:
raise RuntimeError(f"Received {resp.__class__}")
@cli.command()
@click.option("-t", "--group-threshold", type=int)

View File

@ -16,14 +16,18 @@
from __future__ import annotations
import hashlib
import hmac
import os
import time
import warnings
from typing import TYPE_CHECKING, Callable, Iterable, Optional
from typing import TYPE_CHECKING, Any, Callable, Iterable, List, Optional, Tuple
from slip10 import SLIP10
from . import messages
from .exceptions import Cancelled, TrezorException
from .tools import Address, expect, session
from .tools import Address, expect, parse_path, session
if TYPE_CHECKING:
from .client import TrezorClient
@ -230,9 +234,53 @@ def recover(
return res
def is_slip39_backup_type(backup_type: messages.BackupType):
return backup_type in (
messages.BackupType.Slip39_Basic,
messages.BackupType.Slip39_Advanced,
messages.BackupType.Slip39_Single_Extendable,
messages.BackupType.Slip39_Basic_Extendable,
messages.BackupType.Slip39_Advanced_Extendable,
)
def _seed_from_entropy(
internal_entropy: bytes,
external_entropy: bytes,
strength: int,
backup_type: messages.BackupType,
) -> bytes:
entropy = hashlib.sha256(internal_entropy + external_entropy).digest()
secret = entropy[: strength // 8]
if len(secret) * 8 != strength:
raise ValueError("Entropy length mismatch")
if backup_type == messages.BackupType.Bip39:
import mnemonic
bip39 = mnemonic.Mnemonic("english")
words = bip39.to_mnemonic(secret)
seed = bip39.to_seed(words, passphrase="")
elif is_slip39_backup_type(backup_type):
import shamir_mnemonic
seed = shamir_mnemonic.cipher.decrypt(
secret, b"", iteration_exponent=1, identifier=0, extendable=True
)
else:
raise ValueError("Unknown backup type.")
return seed
@expect(messages.Success, field="message", ret_type=str)
def reset(*args: Any, **kwargs: Any) -> "MessageType":
return reset_entropy_check(*args, **kwargs)[0]
@session
def reset(
def reset_entropy_check(
client: "TrezorClient",
display_random: bool = False,
strength: Optional[int] = None,
@ -244,7 +292,9 @@ def reset(
skip_backup: bool = False,
no_backup: bool = False,
backup_type: messages.BackupType = messages.BackupType.Bip39,
) -> "MessageType":
entropy_check_count: Optional[int] = None,
paths: List[Address] = [],
) -> Tuple["MessageType", Iterable[Tuple[Address, str]]]:
if display_random:
warnings.warn(
"display_random ignored. The feature is deprecated.",
@ -268,6 +318,10 @@ def reset(
else:
strength = 128
if not paths:
# Get XPUBs for the first BTC SegWit v0 account and first ETH account.
paths = [parse_path("m/84h/0h/0h"), parse_path("m/44h/60h/0h")]
# Begin with device reset workflow
msg = messages.ResetDevice(
strength=strength,
@ -278,17 +332,61 @@ def reset(
skip_backup=bool(skip_backup),
no_backup=bool(no_backup),
backup_type=backup_type,
entropy_check=entropy_check_count is not None,
)
resp = client.call(msg)
if not isinstance(resp, messages.EntropyRequest):
raise RuntimeError("Invalid response, expected EntropyRequest")
while True:
xpubs = []
external_entropy = os.urandom(32)
# LOG.debug("Computer generated entropy: " + external_entropy.hex())
ret = client.call(messages.EntropyAck(entropy=external_entropy))
entropy_commitment = resp.entropy_commitment
resp = client.call(messages.EntropyAck(entropy=external_entropy))
if entropy_check_count is None:
break
if not isinstance(resp, messages.EntropyCheckReady):
return resp, []
for path in paths:
resp = client.call(messages.GetPublicKey(address_n=path))
if not isinstance(resp, messages.PublicKey):
return resp, []
xpubs.append(resp.xpub)
if entropy_check_count <= 0:
resp = client.call(messages.EntropyCheckContinue(finish=True))
break
entropy_check_count -= 1
resp = client.call(messages.EntropyCheckContinue(finish=False))
if not isinstance(resp, messages.EntropyRequest):
raise RuntimeError("Invalid response, expected EntropyRequest")
# Check the entropy commitment from the previous round.
assert resp.prev_entropy
if (
hmac.HMAC(key=resp.prev_entropy, msg=b"", digestmod=hashlib.sha256).digest()
!= entropy_commitment
):
raise RuntimeError("Invalid entropy commitment.")
# Derive the seed and check that XPUBs match.
seed = _seed_from_entropy(
resp.prev_entropy, external_entropy, strength, backup_type
)
slip10 = SLIP10.from_seed(seed)
for path, xpub in zip(paths, xpubs):
if slip10.get_xpub_from_path(path) != xpub:
raise RuntimeError("Invalid XPUB in entropy check")
client.init_device()
return ret
return resp, zip(paths, xpubs)
@expect(messages.Success, field="message", ret_type=str)

View File

@ -222,6 +222,23 @@ def parse_path(nstr: str) -> Address:
raise ValueError("Invalid BIP32 path", nstr) from e
def format_path(path: Address, flag: str = "h") -> str:
"""
Convert BIP32 path list of uint32 integers with hardened flags to string.
Several conventions are supported to denote the hardened flag: 1', 1h
e.g.: [0, 0x80000001, 1] -> "m/0/1h/1"
:param path: list of integers
:return: path string
"""
nstr = "m"
for i in path:
nstr += f"/{unharden(i)}{flag if is_hardened(i) else ''}"
return nstr
def prepare_message_bytes(txt: AnyStr) -> bytes:
"""
Make message suitable for protobuf.