mirror of
https://github.com/trezor/trezor-firmware.git
synced 2024-12-22 14:28:07 +00:00
feat(python): Implement entropy check workflow in device.reset().
This commit is contained in:
parent
f1cdbabb06
commit
19209a238a
1
python/.changelog.d/4155.added
Normal file
1
python/.changelog.d/4155.added
Normal file
@ -0,0 +1 @@
|
||||
Added support for entropy check workflow in device.reset().
|
@ -24,6 +24,7 @@ import click
|
||||
import requests
|
||||
|
||||
from .. import debuglink, device, exceptions, messages, ui
|
||||
from ..tools import format_path
|
||||
from . import ChoiceType, with_client
|
||||
|
||||
if t.TYPE_CHECKING:
|
||||
@ -222,6 +223,7 @@ def recover(
|
||||
@click.option("-s", "--skip-backup", is_flag=True)
|
||||
@click.option("-n", "--no-backup", is_flag=True)
|
||||
@click.option("-b", "--backup-type", type=ChoiceType(BACKUP_TYPE))
|
||||
@click.option("-e", "--entropy-check-count", type=click.IntRange(0))
|
||||
@with_client
|
||||
def setup(
|
||||
client: "TrezorClient",
|
||||
@ -233,6 +235,7 @@ def setup(
|
||||
skip_backup: bool,
|
||||
no_backup: bool,
|
||||
backup_type: messages.BackupType | None,
|
||||
entropy_check_count: int | None,
|
||||
) -> str:
|
||||
"""Perform device setup and generate new seed."""
|
||||
if strength:
|
||||
@ -261,7 +264,7 @@ def setup(
|
||||
"backup type. Traditional BIP39 backup may be generated instead."
|
||||
)
|
||||
|
||||
return device.reset(
|
||||
resp, path_xpubs = device.reset_entropy_check(
|
||||
client,
|
||||
strength=strength,
|
||||
passphrase_protection=passphrase_protection,
|
||||
@ -271,8 +274,17 @@ def setup(
|
||||
skip_backup=skip_backup,
|
||||
no_backup=no_backup,
|
||||
backup_type=backup_type,
|
||||
entropy_check_count=entropy_check_count,
|
||||
)
|
||||
|
||||
if isinstance(resp, messages.Success):
|
||||
click.echo("XPUBs for the generated seed")
|
||||
for path, xpub in path_xpubs:
|
||||
click.echo(f"{format_path(path)}: {xpub}")
|
||||
return resp.message or ""
|
||||
else:
|
||||
raise RuntimeError(f"Received {resp.__class__}")
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option("-t", "--group-threshold", type=int)
|
||||
|
@ -16,14 +16,18 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import hmac
|
||||
import os
|
||||
import time
|
||||
import warnings
|
||||
from typing import TYPE_CHECKING, Callable, Iterable, Optional
|
||||
from typing import TYPE_CHECKING, Any, Callable, Iterable, List, Optional, Tuple
|
||||
|
||||
from slip10 import SLIP10
|
||||
|
||||
from . import messages
|
||||
from .exceptions import Cancelled, TrezorException
|
||||
from .tools import Address, expect, session
|
||||
from .tools import Address, expect, parse_path, session
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .client import TrezorClient
|
||||
@ -230,9 +234,53 @@ def recover(
|
||||
return res
|
||||
|
||||
|
||||
def is_slip39_backup_type(backup_type: messages.BackupType):
|
||||
return backup_type in (
|
||||
messages.BackupType.Slip39_Basic,
|
||||
messages.BackupType.Slip39_Advanced,
|
||||
messages.BackupType.Slip39_Single_Extendable,
|
||||
messages.BackupType.Slip39_Basic_Extendable,
|
||||
messages.BackupType.Slip39_Advanced_Extendable,
|
||||
)
|
||||
|
||||
|
||||
def _seed_from_entropy(
|
||||
internal_entropy: bytes,
|
||||
external_entropy: bytes,
|
||||
strength: int,
|
||||
backup_type: messages.BackupType,
|
||||
) -> bytes:
|
||||
entropy = hashlib.sha256(internal_entropy + external_entropy).digest()
|
||||
secret = entropy[: strength // 8]
|
||||
|
||||
if len(secret) * 8 != strength:
|
||||
raise ValueError("Entropy length mismatch")
|
||||
|
||||
if backup_type == messages.BackupType.Bip39:
|
||||
import mnemonic
|
||||
|
||||
bip39 = mnemonic.Mnemonic("english")
|
||||
words = bip39.to_mnemonic(secret)
|
||||
seed = bip39.to_seed(words, passphrase="")
|
||||
elif is_slip39_backup_type(backup_type):
|
||||
import shamir_mnemonic
|
||||
|
||||
seed = shamir_mnemonic.cipher.decrypt(
|
||||
secret, b"", iteration_exponent=1, identifier=0, extendable=True
|
||||
)
|
||||
else:
|
||||
raise ValueError("Unknown backup type.")
|
||||
|
||||
return seed
|
||||
|
||||
|
||||
@expect(messages.Success, field="message", ret_type=str)
|
||||
def reset(*args: Any, **kwargs: Any) -> "MessageType":
|
||||
return reset_entropy_check(*args, **kwargs)[0]
|
||||
|
||||
|
||||
@session
|
||||
def reset(
|
||||
def reset_entropy_check(
|
||||
client: "TrezorClient",
|
||||
display_random: bool = False,
|
||||
strength: Optional[int] = None,
|
||||
@ -244,7 +292,9 @@ def reset(
|
||||
skip_backup: bool = False,
|
||||
no_backup: bool = False,
|
||||
backup_type: messages.BackupType = messages.BackupType.Bip39,
|
||||
) -> "MessageType":
|
||||
entropy_check_count: Optional[int] = None,
|
||||
paths: List[Address] = [],
|
||||
) -> Tuple["MessageType", Iterable[Tuple[Address, str]]]:
|
||||
if display_random:
|
||||
warnings.warn(
|
||||
"display_random ignored. The feature is deprecated.",
|
||||
@ -268,6 +318,10 @@ def reset(
|
||||
else:
|
||||
strength = 128
|
||||
|
||||
if not paths:
|
||||
# Get XPUBs for the first BTC SegWit v0 account and first ETH account.
|
||||
paths = [parse_path("m/84h/0h/0h"), parse_path("m/44h/60h/0h")]
|
||||
|
||||
# Begin with device reset workflow
|
||||
msg = messages.ResetDevice(
|
||||
strength=strength,
|
||||
@ -278,17 +332,61 @@ def reset(
|
||||
skip_backup=bool(skip_backup),
|
||||
no_backup=bool(no_backup),
|
||||
backup_type=backup_type,
|
||||
entropy_check=entropy_check_count is not None,
|
||||
)
|
||||
|
||||
resp = client.call(msg)
|
||||
if not isinstance(resp, messages.EntropyRequest):
|
||||
raise RuntimeError("Invalid response, expected EntropyRequest")
|
||||
|
||||
external_entropy = os.urandom(32)
|
||||
# LOG.debug("Computer generated entropy: " + external_entropy.hex())
|
||||
ret = client.call(messages.EntropyAck(entropy=external_entropy))
|
||||
while True:
|
||||
xpubs = []
|
||||
|
||||
external_entropy = os.urandom(32)
|
||||
entropy_commitment = resp.entropy_commitment
|
||||
resp = client.call(messages.EntropyAck(entropy=external_entropy))
|
||||
|
||||
if entropy_check_count is None:
|
||||
break
|
||||
|
||||
if not isinstance(resp, messages.Success):
|
||||
return resp, []
|
||||
|
||||
for path in paths:
|
||||
resp = client.call(messages.GetPublicKey(address_n=path))
|
||||
if not isinstance(resp, messages.PublicKey):
|
||||
return resp, []
|
||||
xpubs.append(resp.xpub)
|
||||
|
||||
if entropy_check_count <= 0:
|
||||
resp = client.call(messages.ResetDeviceFinish())
|
||||
break
|
||||
|
||||
entropy_check_count -= 1
|
||||
|
||||
resp = client.call(messages.ResetDeviceContinue())
|
||||
if not isinstance(resp, messages.EntropyRequest):
|
||||
raise RuntimeError("Invalid response, expected EntropyRequest")
|
||||
|
||||
# Check the entropy commitment from the previous round.
|
||||
assert resp.prev_entropy
|
||||
if (
|
||||
hmac.HMAC(key=resp.prev_entropy, msg=b"", digestmod=hashlib.sha256).digest()
|
||||
!= entropy_commitment
|
||||
):
|
||||
raise RuntimeError("Invalid entropy commitment.")
|
||||
|
||||
# Derive the seed and check that XPUBs match.
|
||||
seed = _seed_from_entropy(
|
||||
resp.prev_entropy, external_entropy, strength, backup_type
|
||||
)
|
||||
slip10 = SLIP10.from_seed(seed)
|
||||
for path, xpub in zip(paths, xpubs):
|
||||
if slip10.get_xpub_from_path(path) != xpub:
|
||||
raise RuntimeError("Invalid XPUB in entropy check")
|
||||
|
||||
client.init_device()
|
||||
return ret
|
||||
return resp, zip(paths, xpubs)
|
||||
|
||||
|
||||
@expect(messages.Success, field="message", ret_type=str)
|
||||
|
@ -222,6 +222,23 @@ def parse_path(nstr: str) -> Address:
|
||||
raise ValueError("Invalid BIP32 path", nstr) from e
|
||||
|
||||
|
||||
def format_path(path: Address, flag: str = "h") -> str:
|
||||
"""
|
||||
Convert BIP32 path list of uint32 integers with hardened flags to string.
|
||||
Several conventions are supported to denote the hardened flag: 1', 1h
|
||||
|
||||
e.g.: [0, 0x80000001, 1] -> "m/0/1h/1"
|
||||
|
||||
:param path: list of integers
|
||||
:return: path string
|
||||
"""
|
||||
nstr = "m"
|
||||
for i in path:
|
||||
nstr += f"/{unharden(i)}{flag if is_hardened(i) else ''}"
|
||||
|
||||
return nstr
|
||||
|
||||
|
||||
def prepare_message_bytes(txt: AnyStr) -> bytes:
|
||||
"""
|
||||
Make message suitable for protobuf.
|
||||
|
Loading…
Reference in New Issue
Block a user