mirror of
https://github.com/trezor/trezor-firmware.git
synced 2025-01-29 16:51:30 +00:00
feat(python): introduce device.setup()
for entropy check
this deprecates `device.reset()`, and moves the new arguments to `device.setup()`. Also it changes default backup type on core devices to SLIP39-single, same as Suite, and randomizes the number of entropy check rounds, if not provided from outside.
This commit is contained in:
parent
4610655d77
commit
53bdef5bb4
@ -1 +1 @@
|
||||
Added support for entropy check workflow in device.reset().
|
||||
Added support for entropy check workflow in `device.reset()`.
|
||||
|
1
python/.changelog.d/4464.added
Normal file
1
python/.changelog.d/4464.added
Normal file
@ -0,0 +1 @@
|
||||
Introduced `device.setup()` as a cleaner upgrade to `device.reset()`.
|
1
python/.changelog.d/4464.deprecated
Normal file
1
python/.changelog.d/4464.deprecated
Normal file
@ -0,0 +1 @@
|
||||
`device.reset()` is deprecated, migrate to `device.setup()`
|
@ -19,9 +19,11 @@ from __future__ import annotations
|
||||
import hashlib
|
||||
import hmac
|
||||
import os
|
||||
import random
|
||||
import secrets
|
||||
import time
|
||||
import warnings
|
||||
from typing import TYPE_CHECKING, Any, Callable, Iterable, List, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, Callable, Iterable, Optional, Tuple
|
||||
|
||||
from slip10 import SLIP10
|
||||
|
||||
@ -36,6 +38,9 @@ if TYPE_CHECKING:
|
||||
|
||||
RECOVERY_BACK = "\x08" # backspace character, sent literally
|
||||
|
||||
SLIP39_EXTENDABLE_MIN_VERSION = (2, 7, 1)
|
||||
ENTROPY_CHECK_MIN_VERSION = (2, 8, 7)
|
||||
|
||||
|
||||
@expect(messages.Success, field="message", ret_type=str)
|
||||
@session
|
||||
@ -292,21 +297,103 @@ def reset_entropy_check(
|
||||
skip_backup: bool = False,
|
||||
no_backup: bool = False,
|
||||
backup_type: messages.BackupType = messages.BackupType.Bip39,
|
||||
entropy_check_count: Optional[int] = None,
|
||||
paths: List[Address] = [],
|
||||
) -> Tuple["MessageType", Iterable[Tuple[Address, str]]]:
|
||||
) -> str | None:
|
||||
warnings.warn(
|
||||
"reset() is deprecated. Use setup() instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
if display_random:
|
||||
warnings.warn(
|
||||
"display_random ignored. The feature is deprecated.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
if language is not None:
|
||||
warnings.warn(
|
||||
"language ignored. Use change_language() to set device language.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
setup(
|
||||
client,
|
||||
strength=strength,
|
||||
passphrase_protection=passphrase_protection,
|
||||
pin_protection=pin_protection,
|
||||
label=label,
|
||||
u2f_counter=u2f_counter,
|
||||
skip_backup=skip_backup,
|
||||
no_backup=no_backup,
|
||||
backup_type=backup_type,
|
||||
)
|
||||
|
||||
return _return_success(messages.Success(message="Initialized"))
|
||||
|
||||
|
||||
def _get_external_entropy() -> bytes:
|
||||
return secrets.token_bytes(32)
|
||||
|
||||
|
||||
@session
|
||||
def setup(
|
||||
client: "TrezorClient",
|
||||
*,
|
||||
strength: Optional[int] = None,
|
||||
passphrase_protection: bool = True,
|
||||
pin_protection: bool = False,
|
||||
label: Optional[str] = None,
|
||||
u2f_counter: int = 0,
|
||||
skip_backup: bool = False,
|
||||
no_backup: bool = False,
|
||||
backup_type: Optional[messages.BackupType] = None,
|
||||
entropy_check_count: Optional[int] = None,
|
||||
paths: Iterable[Address] = [],
|
||||
_get_entropy: Callable[[], bytes] = _get_external_entropy,
|
||||
) -> Iterable[Tuple[Address, str]]:
|
||||
"""Create a new wallet on device.
|
||||
|
||||
On supporting devices, automatically performs the entropy check: for N rounds, ask
|
||||
the device to generate a new seed and provide XPUBs derived from that seed. In the
|
||||
next round, the previous round's seed is revealed and verified that it was generated
|
||||
with the appropriate entropy and that it matches the provided XPUBs.
|
||||
|
||||
On round N+1, instead of revealing, the final seed is stored on device.
|
||||
|
||||
This function returns the XPUBs from the last round. Caller SHOULD store these XPUBs
|
||||
and periodically check that the device still generates the same ones, to ensure that
|
||||
the device has not maliciously switched to a pre-generated seed.
|
||||
|
||||
The caller can provide a list of interesting derivation paths to be used in the
|
||||
entropy check. If an empty list is provided, the function will use the first BTC
|
||||
SegWit v0 account and the first ETH account.
|
||||
|
||||
Returned XPUBs are in the form of tuples (derivation path, xpub).
|
||||
|
||||
Specifying an entropy check count other than 0 on devices that don't support it,
|
||||
such as Trezor Model One, will result in an error. If not specified, a random value
|
||||
between 2 and 8 is chosen on supporting devices.
|
||||
|
||||
Args:
|
||||
* client: TrezorClient instance.
|
||||
* strength: Entropy strength in bits. Default is 128 for the core family, and 256
|
||||
for Trezor Model One.
|
||||
* passphrase_protection: Enable passphrase feature. Defaults to True.
|
||||
* pin_protection: Enable and set up device PIN as part of the setup flow. Defaults
|
||||
to False.
|
||||
* label: Device label.
|
||||
* u2f_counter: U2F counter value.
|
||||
* skip_backup: Skip the backup step. Defaults to False.
|
||||
* no_backup: Do not create backup (seedless mode). Defaults to False.
|
||||
* entropy_check_count: Number of rounds for the entropy check.
|
||||
|
||||
Returns:
|
||||
Sequence of tuples (derivation path, xpub) from the last round of the entropy
|
||||
check.
|
||||
"""
|
||||
|
||||
if client.features.initialized:
|
||||
raise RuntimeError(
|
||||
"Device is initialized already. Call wipe_device() and try again."
|
||||
@ -318,10 +405,24 @@ def reset_entropy_check(
|
||||
else:
|
||||
strength = 128
|
||||
|
||||
if backup_type is None:
|
||||
if client.version < SLIP39_EXTENDABLE_MIN_VERSION:
|
||||
# includes Trezor One 1.x.x
|
||||
backup_type = messages.BackupType.Bip39
|
||||
else:
|
||||
backup_type = messages.BackupType.Slip39_Single_Extendable
|
||||
|
||||
if not paths:
|
||||
# Get XPUBs for the first BTC SegWit v0 account and first ETH account.
|
||||
paths = [parse_path("m/84h/0h/0h"), parse_path("m/44h/60h/0h")]
|
||||
|
||||
if entropy_check_count is None:
|
||||
if client.version < ENTROPY_CHECK_MIN_VERSION:
|
||||
# includes Trezor One 1.x.x
|
||||
entropy_check_count = 0
|
||||
else:
|
||||
entropy_check_count = random.randint(2, 8)
|
||||
|
||||
# Begin with device reset workflow
|
||||
msg = messages.ResetDevice(
|
||||
strength=strength,
|
||||
@ -332,61 +433,147 @@ def reset_entropy_check(
|
||||
skip_backup=bool(skip_backup),
|
||||
no_backup=bool(no_backup),
|
||||
backup_type=backup_type,
|
||||
entropy_check=entropy_check_count is not None,
|
||||
entropy_check=entropy_check_count > 0,
|
||||
)
|
||||
|
||||
resp = client.call(msg)
|
||||
if not isinstance(resp, messages.EntropyRequest):
|
||||
raise RuntimeError("Invalid response, expected EntropyRequest")
|
||||
|
||||
while True:
|
||||
if entropy_check_count > 0:
|
||||
xpubs = _reset_with_entropycheck(
|
||||
client, msg, entropy_check_count, paths, _get_entropy
|
||||
)
|
||||
else:
|
||||
_reset_no_entropycheck(client, msg, _get_entropy)
|
||||
xpubs = []
|
||||
|
||||
external_entropy = os.urandom(32)
|
||||
entropy_commitment = resp.entropy_commitment
|
||||
resp = client.call(messages.EntropyAck(entropy=external_entropy))
|
||||
client.init_device()
|
||||
return xpubs
|
||||
|
||||
if entropy_check_count is None:
|
||||
break
|
||||
|
||||
if not isinstance(resp, messages.EntropyCheckReady):
|
||||
return resp, []
|
||||
def _reset_no_entropycheck(
|
||||
client: "TrezorClient",
|
||||
msg: messages.ResetDevice,
|
||||
get_entropy: Callable[[], bytes],
|
||||
) -> None:
|
||||
"""Simple reset workflow without entropy checks:
|
||||
|
||||
>> ResetDevice
|
||||
<< EntropyRequest
|
||||
>> EntropyAck(entropy=...)
|
||||
<< Success
|
||||
"""
|
||||
assert msg.entropy_check is False
|
||||
client.call(msg, expect=messages.EntropyRequest)
|
||||
client.call(messages.EntropyAck(entropy=get_entropy()), expect=messages.Success)
|
||||
|
||||
|
||||
def _reset_with_entropycheck(
|
||||
client: "TrezorClient",
|
||||
reset_msg: messages.ResetDevice,
|
||||
entropy_check_count: int,
|
||||
paths: Iterable[Address],
|
||||
get_entropy: Callable[[], bytes],
|
||||
) -> list[tuple[Address, str]]:
|
||||
"""Reset workflow with entropy checks:
|
||||
|
||||
>> ResetDevice
|
||||
repeat n times:
|
||||
<< EntropyRequest(entropy_commitment=..., prev_entropy=...)
|
||||
>> EntropyAck(entropy=...)
|
||||
<< EntropyCheckReady
|
||||
>> GetPublicKey(...)
|
||||
<< PublicKey(...)
|
||||
>> EntropyCheckContinue(finish=False)
|
||||
last round:
|
||||
>> EntropyCheckContinue(finish=True)
|
||||
<< Success
|
||||
|
||||
After each round, the device reveals its internal entropy via the prev_entropy
|
||||
field. This function verifies that the entropy matches the respective commitment,
|
||||
then recalculate the seed for the previous round, and verifies that the public keys
|
||||
generated by the device match that seed.
|
||||
|
||||
Returns the list of XPUBs from the last round. Caller is responsible for storing
|
||||
those XPUBs and later verifying that these are still valid.
|
||||
"""
|
||||
assert reset_msg.strength is not None
|
||||
assert reset_msg.backup_type is not None
|
||||
strength = reset_msg.strength
|
||||
backup_type = reset_msg.backup_type
|
||||
|
||||
def get_xpubs() -> list[tuple[Address, str]]:
|
||||
xpubs = []
|
||||
for path in paths:
|
||||
resp = client.call(messages.GetPublicKey(address_n=path))
|
||||
if not isinstance(resp, messages.PublicKey):
|
||||
return resp, []
|
||||
xpubs.append(resp.xpub)
|
||||
resp = client.call(
|
||||
messages.GetPublicKey(address_n=path), expect=messages.PublicKey
|
||||
)
|
||||
xpubs.append((path, resp.xpub))
|
||||
return xpubs
|
||||
|
||||
def verify_entropy_commitment(
|
||||
internal_entropy: bytes | None,
|
||||
external_entropy: bytes,
|
||||
entropy_commitment: bytes | None,
|
||||
xpubs: list[tuple[Address, str]],
|
||||
) -> None:
|
||||
if internal_entropy is None or entropy_commitment is None:
|
||||
raise TrezorException("Invalid entropy check response.")
|
||||
calculated_commitment = hmac.HMAC(
|
||||
key=internal_entropy, msg=b"", digestmod=hashlib.sha256
|
||||
).digest()
|
||||
if calculated_commitment != entropy_commitment:
|
||||
raise TrezorException("Invalid entropy commitment.")
|
||||
|
||||
seed = _seed_from_entropy(
|
||||
internal_entropy, external_entropy, strength, backup_type
|
||||
)
|
||||
slip10 = SLIP10.from_seed(seed)
|
||||
for path, xpub in xpubs:
|
||||
if slip10.get_xpub_from_path(path) != xpub:
|
||||
raise TrezorException("Invalid XPUB in entropy check")
|
||||
|
||||
xpubs = []
|
||||
resp = client.call(reset_msg, expect=messages.EntropyRequest)
|
||||
entropy_commitment = resp.entropy_commitment
|
||||
|
||||
while True:
|
||||
# provide external entropy for this round
|
||||
external_entropy = get_entropy()
|
||||
client.call(
|
||||
messages.EntropyAck(entropy=external_entropy),
|
||||
expect=messages.EntropyCheckReady,
|
||||
)
|
||||
|
||||
# fetch xpubs for the current round
|
||||
xpubs = get_xpubs()
|
||||
|
||||
if entropy_check_count <= 0:
|
||||
resp = client.call(messages.EntropyCheckContinue(finish=True))
|
||||
# last round, wait for a Success and exit the loop
|
||||
client.call(
|
||||
messages.EntropyCheckContinue(finish=True),
|
||||
expect=messages.Success,
|
||||
)
|
||||
break
|
||||
|
||||
entropy_check_count -= 1
|
||||
|
||||
resp = client.call(messages.EntropyCheckContinue(finish=False))
|
||||
if not isinstance(resp, messages.EntropyRequest):
|
||||
raise RuntimeError("Invalid response, expected EntropyRequest")
|
||||
# Next round starts.
|
||||
resp = client.call(
|
||||
messages.EntropyCheckContinue(finish=False),
|
||||
expect=messages.EntropyRequest,
|
||||
)
|
||||
|
||||
# Check the entropy commitment from the previous round.
|
||||
assert resp.prev_entropy
|
||||
if (
|
||||
hmac.HMAC(key=resp.prev_entropy, msg=b"", digestmod=hashlib.sha256).digest()
|
||||
!= entropy_commitment
|
||||
):
|
||||
raise RuntimeError("Invalid entropy commitment.")
|
||||
|
||||
# Derive the seed and check that XPUBs match.
|
||||
seed = _seed_from_entropy(
|
||||
resp.prev_entropy, external_entropy, strength, backup_type
|
||||
verify_entropy_commitment(
|
||||
resp.prev_entropy, external_entropy, entropy_commitment, xpubs
|
||||
)
|
||||
slip10 = SLIP10.from_seed(seed)
|
||||
for path, xpub in zip(paths, xpubs):
|
||||
if slip10.get_xpub_from_path(path) != xpub:
|
||||
raise RuntimeError("Invalid XPUB in entropy check")
|
||||
# Update the entropy commitment for the next round.
|
||||
entropy_commitment = resp.entropy_commitment
|
||||
|
||||
client.init_device()
|
||||
return resp, zip(paths, xpubs)
|
||||
# TODO when we grow an API for auto-opening an empty passphrase session,
|
||||
# we should run the following piece:
|
||||
# xpubs_verify = get_xpubs()
|
||||
# if xpubs != xpubs_verify:
|
||||
# raise TrezorException("Invalid XPUBs after entropy check phase")
|
||||
|
||||
return xpubs
|
||||
|
||||
|
||||
@expect(messages.Success, field="message", ret_type=str)
|
||||
|
Loading…
Reference in New Issue
Block a user