1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-03-12 22:26:08 +00:00
trezor-firmware/python/src/trezorlib/device.py

646 lines
21 KiB
Python

# This file is part of the Trezor project.
#
# Copyright (C) 2012-2022 SatoshiLabs and contributors
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License version 3
# as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the License along with this library.
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
from __future__ import annotations
import hashlib
import hmac
import random
import secrets
import time
import warnings
from typing import TYPE_CHECKING, Callable, Iterable, Optional, Tuple
from slip10 import SLIP10
from . import messages
from .exceptions import Cancelled, TrezorException
from .tools import Address, _deprecation_retval_helper, _return_success, parse_path
if TYPE_CHECKING:
from .transport.session import Session
RECOVERY_BACK = "\x08" # backspace character, sent literally
SLIP39_EXTENDABLE_MIN_VERSION = (2, 7, 1)
ENTROPY_CHECK_MIN_VERSION = (2, 8, 7)
def apply_settings(
session: "Session",
label: Optional[str] = None,
language: Optional[str] = None,
use_passphrase: Optional[bool] = None,
homescreen: Optional[bytes] = None,
passphrase_always_on_device: Optional[bool] = None,
auto_lock_delay_ms: Optional[int] = None,
display_rotation: Optional[messages.DisplayRotation] = None,
safety_checks: Optional[messages.SafetyCheckLevel] = None,
experimental_features: Optional[bool] = None,
hide_passphrase_from_host: Optional[bool] = None,
haptic_feedback: Optional[bool] = None,
) -> str | None:
if language is not None:
warnings.warn(
"language ignored. Use change_language() to set device language.",
DeprecationWarning,
)
settings = messages.ApplySettings(
label=label,
use_passphrase=use_passphrase,
homescreen=homescreen,
passphrase_always_on_device=passphrase_always_on_device,
auto_lock_delay_ms=auto_lock_delay_ms,
display_rotation=display_rotation,
safety_checks=safety_checks,
experimental_features=experimental_features,
hide_passphrase_from_host=hide_passphrase_from_host,
haptic_feedback=haptic_feedback,
)
out = session.call(settings, expect=messages.Success)
session.refresh_features()
return _return_success(out)
def _send_language_data(
session: "Session",
request: "messages.TranslationDataRequest",
language_data: bytes,
) -> None:
response = request
while not isinstance(response, messages.Success):
response = messages.TranslationDataRequest.ensure_isinstance(response)
data_length = response.data_length
data_offset = response.data_offset
chunk = language_data[data_offset : data_offset + data_length]
response = session.call(messages.TranslationDataAck(data_chunk=chunk))
def change_language(
session: "Session",
language_data: bytes,
show_display: bool | None = None,
) -> str | None:
data_length = len(language_data)
msg = messages.ChangeLanguage(data_length=data_length, show_display=show_display)
response = session.call(msg)
if data_length > 0:
response = messages.TranslationDataRequest.ensure_isinstance(response)
_send_language_data(session, response, language_data)
else:
messages.Success.ensure_isinstance(response)
session.refresh_features() # changing the language in features
return _return_success(messages.Success(message="Language changed."))
def apply_flags(session: "Session", flags: int) -> str | None:
out = session.call(messages.ApplyFlags(flags=flags), expect=messages.Success)
session.refresh_features()
return _return_success(out)
def change_pin(session: "Session", remove: bool = False) -> str | None:
ret = session.call(messages.ChangePin(remove=remove), expect=messages.Success)
session.refresh_features()
return _return_success(ret)
def change_wipe_code(session: "Session", remove: bool = False) -> str | None:
ret = session.call(messages.ChangeWipeCode(remove=remove), expect=messages.Success)
session.refresh_features()
return _return_success(ret)
def sd_protect(
session: "Session", operation: messages.SdProtectOperationType
) -> str | None:
ret = session.call(messages.SdProtect(operation=operation), expect=messages.Success)
session.refresh_features()
return _return_success(ret)
def wipe(session: "Session") -> str | None:
ret = session.call(messages.WipeDevice(), expect=messages.Success)
session.invalidate()
# if not session.features.bootloader_mode:
# session.refresh_features()
return _return_success(ret)
def recover(
session: "Session",
word_count: int = 24,
passphrase_protection: bool = False,
pin_protection: bool = True,
label: Optional[str] = None,
language: Optional[str] = None,
input_callback: Optional[Callable] = None,
input_method: messages.RecoveryDeviceInputMethod = messages.RecoveryDeviceInputMethod.ScrambledWords,
dry_run: Optional[bool] = None,
u2f_counter: Optional[int] = None,
*,
type: Optional[messages.RecoveryType] = None,
) -> messages.Success | None:
if language is not None:
warnings.warn(
"language ignored. Use change_language() to set device language.",
DeprecationWarning,
)
if dry_run is not None:
warnings.warn(
"Use type=RecoveryType.DryRun instead!",
DeprecationWarning,
stacklevel=3,
)
if type is not None:
raise ValueError("Cannot use both dry_run and type simultaneously.")
elif dry_run:
type = messages.RecoveryType.DryRun
else:
type = messages.RecoveryType.NormalRecovery
if type is None:
type = messages.RecoveryType.NormalRecovery
if session.features.model == "1" and input_callback is None:
raise RuntimeError("Input callback required for Trezor One")
if word_count not in (12, 18, 24):
raise ValueError("Invalid word count. Use 12/18/24")
if session.features.initialized and type == messages.RecoveryType.NormalRecovery:
raise RuntimeError(
"Device already initialized. Call device.wipe() and try again."
)
if u2f_counter is None:
u2f_counter = int(time.time())
msg = messages.RecoveryDevice(
word_count=word_count,
enforce_wordlist=True,
input_method=input_method,
type=type,
)
if type == messages.RecoveryType.NormalRecovery:
# set additional parameters
msg.passphrase_protection = passphrase_protection
msg.pin_protection = pin_protection
msg.label = label
msg.u2f_counter = u2f_counter
res = session.call(msg)
while isinstance(res, messages.WordRequest):
try:
assert input_callback is not None
inp = input_callback(res.type)
res = session.call(messages.WordAck(word=inp))
except Cancelled:
res = session.call(messages.Cancel())
# check that the result is a Success
res = messages.Success.ensure_isinstance(res)
# reinitialize the device
session.refresh_features()
return _deprecation_retval_helper(res)
def is_slip39_backup_type(backup_type: messages.BackupType):
return backup_type in (
messages.BackupType.Slip39_Basic,
messages.BackupType.Slip39_Advanced,
messages.BackupType.Slip39_Single_Extendable,
messages.BackupType.Slip39_Basic_Extendable,
messages.BackupType.Slip39_Advanced_Extendable,
)
def _seed_from_entropy(
internal_entropy: bytes,
external_entropy: bytes,
strength: int,
backup_type: messages.BackupType,
) -> bytes:
entropy = hashlib.sha256(internal_entropy + external_entropy).digest()
secret = entropy[: strength // 8]
if len(secret) * 8 != strength:
raise ValueError("Entropy length mismatch")
if backup_type == messages.BackupType.Bip39:
import mnemonic
bip39 = mnemonic.Mnemonic("english")
words = bip39.to_mnemonic(secret)
seed = bip39.to_seed(words, passphrase="")
elif is_slip39_backup_type(backup_type):
import shamir_mnemonic
seed = shamir_mnemonic.cipher.decrypt(
secret, b"", iteration_exponent=1, identifier=0, extendable=True
)
else:
raise ValueError("Unknown backup type.")
return seed
def reset(
session: "Session",
display_random: bool = False,
strength: Optional[int] = None,
passphrase_protection: bool = False,
pin_protection: bool = True,
label: Optional[str] = None,
language: Optional[str] = None,
u2f_counter: int = 0,
skip_backup: bool = False,
no_backup: bool = False,
backup_type: messages.BackupType = messages.BackupType.Bip39,
) -> str | None:
warnings.warn(
"reset() is deprecated. Use setup() instead.",
DeprecationWarning,
stacklevel=2,
)
if display_random:
warnings.warn(
"display_random ignored. The feature is deprecated.",
DeprecationWarning,
stacklevel=2,
)
if language is not None:
warnings.warn(
"language ignored. Use change_language() to set device language.",
DeprecationWarning,
stacklevel=2,
)
setup(
session,
strength=strength,
passphrase_protection=passphrase_protection,
pin_protection=pin_protection,
label=label,
u2f_counter=u2f_counter,
skip_backup=skip_backup,
no_backup=no_backup,
backup_type=backup_type,
)
return _return_success(messages.Success(message="Initialized"))
def _get_external_entropy() -> bytes:
return secrets.token_bytes(32)
def setup(
session: "Session",
*,
strength: Optional[int] = None,
passphrase_protection: bool = True,
pin_protection: bool = False,
label: Optional[str] = None,
u2f_counter: int = 0,
skip_backup: bool = False,
no_backup: bool = False,
backup_type: Optional[messages.BackupType] = None,
entropy_check_count: Optional[int] = None,
paths: Iterable[Address] = [],
_get_entropy: Callable[[], bytes] = _get_external_entropy,
) -> Iterable[Tuple[Address, str]]:
"""Create a new wallet on device.
On supporting devices, automatically performs the entropy check: for N rounds, ask
the device to generate a new seed and provide XPUBs derived from that seed. In the
next round, the previous round's seed is revealed and verified that it was generated
with the appropriate entropy and that it matches the provided XPUBs.
On round N+1, instead of revealing, the final seed is stored on device.
This function returns the XPUBs from the last round. Caller SHOULD store these XPUBs
and periodically check that the device still generates the same ones, to ensure that
the device has not maliciously switched to a pre-generated seed.
The caller can provide a list of interesting derivation paths to be used in the
entropy check. If an empty list is provided, the function will use the first BTC
SegWit v0 account and the first ETH account.
Returned XPUBs are in the form of tuples (derivation path, xpub).
Specifying an entropy check count other than 0 on devices that don't support it,
such as Trezor Model One, will result in an error. If not specified, a random value
between 2 and 8 is chosen on supporting devices.
Args:
* client: TrezorClient instance.
* strength: Entropy strength in bits. Default is 128 for the core family, and 256
for Trezor Model One.
* passphrase_protection: Enable passphrase feature. Defaults to True.
* pin_protection: Enable and set up device PIN as part of the setup flow. Defaults
to False.
* label: Device label.
* u2f_counter: U2F counter value.
* skip_backup: Skip the backup step. Defaults to False.
* no_backup: Do not create backup (seedless mode). Defaults to False.
* entropy_check_count: Number of rounds for the entropy check.
Returns:
Sequence of tuples (derivation path, xpub) from the last round of the entropy
check.
"""
if session.features.initialized:
raise RuntimeError(
"Device is initialized already. Call wipe_device() and try again."
)
if strength is None:
if session.features.model == "1":
strength = 256
else:
strength = 128
if backup_type is None:
if session.version < SLIP39_EXTENDABLE_MIN_VERSION:
# includes Trezor One 1.x.x
backup_type = messages.BackupType.Bip39
else:
backup_type = messages.BackupType.Slip39_Single_Extendable
if not paths:
# Get XPUBs for the first BTC SegWit v0 account and first ETH account.
paths = [parse_path("m/84h/0h/0h"), parse_path("m/44h/60h/0h")]
if entropy_check_count is None:
if session.version < ENTROPY_CHECK_MIN_VERSION:
# includes Trezor One 1.x.x
entropy_check_count = 0
else:
entropy_check_count = random.randint(2, 8)
# Begin with device reset workflow
msg = messages.ResetDevice(
strength=strength,
passphrase_protection=bool(passphrase_protection),
pin_protection=bool(pin_protection),
label=label,
u2f_counter=u2f_counter,
skip_backup=bool(skip_backup),
no_backup=bool(no_backup),
backup_type=backup_type,
entropy_check=entropy_check_count > 0,
)
if entropy_check_count > 0:
xpubs = _reset_with_entropycheck(
session, msg, entropy_check_count, paths, _get_entropy
)
else:
_reset_no_entropycheck(session, msg, _get_entropy)
xpubs = []
session.refresh_features()
return xpubs
def _reset_no_entropycheck(
session: "Session",
msg: messages.ResetDevice,
get_entropy: Callable[[], bytes],
) -> None:
"""Simple reset workflow without entropy checks:
>> ResetDevice
<< EntropyRequest
>> EntropyAck(entropy=...)
<< Success
"""
assert msg.entropy_check is False
session.call(msg, expect=messages.EntropyRequest)
session.call(messages.EntropyAck(entropy=get_entropy()), expect=messages.Success)
def _reset_with_entropycheck(
session: "Session",
reset_msg: messages.ResetDevice,
entropy_check_count: int,
paths: Iterable[Address],
get_entropy: Callable[[], bytes],
) -> list[tuple[Address, str]]:
"""Reset workflow with entropy checks:
>> ResetDevice
repeat n times:
<< EntropyRequest(entropy_commitment=..., prev_entropy=...)
>> EntropyAck(entropy=...)
<< EntropyCheckReady
>> GetPublicKey(...)
<< PublicKey(...)
>> EntropyCheckContinue(finish=False)
last round:
>> EntropyCheckContinue(finish=True)
<< Success
After each round, the device reveals its internal entropy via the prev_entropy
field. This function verifies that the entropy matches the respective commitment,
then recalculate the seed for the previous round, and verifies that the public keys
generated by the device match that seed.
Returns the list of XPUBs from the last round. Caller is responsible for storing
those XPUBs and later verifying that these are still valid.
"""
assert reset_msg.strength is not None
assert reset_msg.backup_type is not None
strength = reset_msg.strength
backup_type = reset_msg.backup_type
def get_xpubs() -> list[tuple[Address, str]]:
xpubs = []
for path in paths:
resp = session.call(
messages.GetPublicKey(address_n=path), expect=messages.PublicKey
)
xpubs.append((path, resp.xpub))
return xpubs
def verify_entropy_commitment(
internal_entropy: bytes | None,
external_entropy: bytes,
entropy_commitment: bytes | None,
xpubs: list[tuple[Address, str]],
) -> None:
if internal_entropy is None or entropy_commitment is None:
raise TrezorException("Invalid entropy check response.")
calculated_commitment = hmac.HMAC(
key=internal_entropy, msg=b"", digestmod=hashlib.sha256
).digest()
if calculated_commitment != entropy_commitment:
raise TrezorException("Invalid entropy commitment.")
seed = _seed_from_entropy(
internal_entropy, external_entropy, strength, backup_type
)
slip10 = SLIP10.from_seed(seed)
for path, xpub in xpubs:
if slip10.get_xpub_from_path(path) != xpub:
raise TrezorException("Invalid XPUB in entropy check")
xpubs = []
resp = session.call(reset_msg, expect=messages.EntropyRequest)
entropy_commitment = resp.entropy_commitment
while True:
# provide external entropy for this round
external_entropy = get_entropy()
session.call(
messages.EntropyAck(entropy=external_entropy),
expect=messages.EntropyCheckReady,
)
# fetch xpubs for the current round
xpubs = get_xpubs()
if entropy_check_count <= 0:
# last round, wait for a Success and exit the loop
session.call(
messages.EntropyCheckContinue(finish=True),
expect=messages.Success,
)
break
entropy_check_count -= 1
# Next round starts.
resp = session.call(
messages.EntropyCheckContinue(finish=False),
expect=messages.EntropyRequest,
)
# Check the entropy commitment from the previous round.
verify_entropy_commitment(
resp.prev_entropy, external_entropy, entropy_commitment, xpubs
)
# Update the entropy commitment for the next round.
entropy_commitment = resp.entropy_commitment
# TODO when we grow an API for auto-opening an empty passphrase session,
# we should run the following piece:
# xpubs_verify = get_xpubs()
# if xpubs != xpubs_verify:
# raise TrezorException("Invalid XPUBs after entropy check phase")
return xpubs
def backup(
session: "Session",
group_threshold: Optional[int] = None,
groups: Iterable[tuple[int, int]] = (),
) -> str | None:
ret = session.call(
messages.BackupDevice(
group_threshold=group_threshold,
groups=[
messages.Slip39Group(member_threshold=t, member_count=c)
for t, c in groups
],
),
expect=messages.Success,
)
session.refresh_features()
return _return_success(ret)
def cancel_authorization(session: "Session") -> str | None:
ret = session.call(messages.CancelAuthorization(), expect=messages.Success)
return _return_success(ret)
def unlock_path(session: "Session", n: "Address") -> bytes:
resp = session.call(
messages.UnlockPath(address_n=n), expect=messages.UnlockedPathRequest
)
# Cancel the UnlockPath workflow now that we have the authentication code.
try:
session.call(messages.Cancel())
except Cancelled:
return resp.mac
else:
raise TrezorException("Unexpected response in UnlockPath flow")
def reboot_to_bootloader(
session: "Session",
boot_command: messages.BootCommand = messages.BootCommand.STOP_AND_WAIT,
firmware_header: Optional[bytes] = None,
language_data: bytes = b"",
) -> str | None:
response = session.call(
messages.RebootToBootloader(
boot_command=boot_command,
firmware_header=firmware_header,
language_data_length=len(language_data),
)
)
if isinstance(response, messages.TranslationDataRequest):
response = _send_language_data(session, response, language_data)
return _return_success(messages.Success(message=""))
def show_device_tutorial(session: "Session") -> str | None:
ret = session.call(messages.ShowDeviceTutorial(), expect=messages.Success)
return _return_success(ret)
def unlock_bootloader(session: "Session") -> str | None:
ret = session.call(messages.UnlockBootloader(), expect=messages.Success)
return _return_success(ret)
def set_busy(session: "Session", expiry_ms: Optional[int]) -> str | None:
"""Sets or clears the busy state of the device.
In the busy state the device shows a "Do not disconnect" message instead of the homescreen.
Setting `expiry_ms=None` clears the busy state.
"""
ret = session.call(messages.SetBusy(expiry_ms=expiry_ms), expect=messages.Success)
session.refresh_features()
return _return_success(ret)
def authenticate(session: "Session", challenge: bytes) -> messages.AuthenticityProof:
return session.call(
messages.AuthenticateDevice(challenge=challenge),
expect=messages.AuthenticityProof,
)
def set_brightness(session: "Session", value: Optional[int] = None) -> str | None:
ret = session.call(messages.SetBrightness(value=value), expect=messages.Success)
return _return_success(ret)