1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2024-11-13 19:18:56 +00:00

refactor(core): limit global imports in key places

so that by importing `apps.base`, we don't pull in the whole circus
This commit is contained in:
matejcik 2021-03-22 16:14:24 +01:00 committed by matejcik
parent ae0da5e245
commit e859c13d70
12 changed files with 110 additions and 65 deletions

View File

@ -1,22 +1,15 @@
import storage
import storage.cache
import storage.device
import storage.recovery
import storage.sd_salt
from storage import cache
from trezor import config, sdcard, ui, utils, wire, workflow
from trezor.messages import Capability, MessageType
from trezor.messages.Features import Features
from trezor.messages.PreauthorizedRequest import PreauthorizedRequest
from trezor import config, utils, wire, workflow
from trezor.messages import MessageType
from trezor.messages.Success import Success
from apps.common import mnemonic, safety_checks
from apps.common.request_pin import verify_user_pin
from . import workflow_handlers
if False:
import protobuf
from typing import Iterable, NoReturn, Protocol
from trezor.messages.Features import Features
from trezor.messages.Initialize import Initialize
from trezor.messages.EndSession import EndSession
from trezor.messages.GetFeatures import GetFeatures
@ -37,6 +30,15 @@ if False:
def get_features() -> Features:
import storage.recovery
import storage.sd_salt
from trezor import sdcard
from trezor.messages import Capability
from trezor.messages.Features import Features
from apps.common import mnemonic, safety_checks
f = Features(
vendor="trezor.io",
language="en-US",
@ -107,7 +109,7 @@ async def handle_Initialize(ctx: wire.Context, msg: Initialize) -> Features:
features = get_features()
if msg.session_id:
msg.session_id = bytes(msg.session_id)
features.session_id = cache.start_session(msg.session_id)
features.session_id = storage.cache.start_session(msg.session_id)
return features
@ -125,7 +127,7 @@ async def handle_LockDevice(ctx: wire.Context, msg: LockDevice) -> Success:
async def handle_EndSession(ctx: wire.Context, msg: EndSession) -> Success:
cache.end_current_session()
storage.cache.end_current_session()
return Success()
@ -141,6 +143,8 @@ async def handle_Ping(ctx: wire.Context, msg: Ping) -> Success:
async def handle_DoPreauthorized(
ctx: wire.Context, msg: DoPreauthorized
) -> protobuf.MessageType:
from trezor.messages.PreauthorizedRequest import PreauthorizedRequest
authorization: Authorization = storage.cache.get(
storage.cache.APP_BASE_AUTHORIZATION
)
@ -194,6 +198,8 @@ ALLOW_WHILE_LOCKED = (
def set_homescreen() -> None:
import storage.recovery
if not config.is_unlocked():
from apps.homescreen.lockscreen import lockscreen
@ -229,6 +235,8 @@ async def unlock_device(ctx: wire.GenericContext = wire.DUMMY_CONTEXT) -> None:
If the storage is locked, attempt to unlock it. Reset the homescreen and the wire
handler.
"""
from apps.common.request_pin import verify_user_pin
if not config.is_unlocked():
# verify_user_pin will raise if the PIN was invalid
await verify_user_pin(ctx)
@ -264,6 +272,8 @@ def get_pinlocked_handler(
# this function is also called when handling ApplySettings
def reload_settings_from_storage() -> None:
from trezor import ui
workflow.idle_timer.set(
storage.device.get_autolock_delay_ms(), lock_device_if_unlocked
)

View File

@ -1,8 +1,6 @@
import storage.device
from trezor import ui, utils, workflow
from trezor.crypto import bip39, slip39
from trezor.messages import BackupType
from trezor.ui.components.tt.text import Text
if False:
from trezor.messages.ResetDevice import EnumTypeBackupType
@ -39,9 +37,13 @@ def get_seed(passphrase: str = "", progress_bar: bool = True) -> bytes:
render_func = _render_progress
if is_bip39():
from trezor.crypto import bip39
seed = bip39.seed(mnemonic_secret.decode(), passphrase, render_func)
else: # SLIP-39
from trezor.crypto import slip39
identifier = storage.device.get_slip39_identifier()
iteration_exponent = storage.device.get_slip39_iteration_exponent()
if identifier is None or iteration_exponent is None:
@ -55,6 +57,8 @@ def get_seed(passphrase: str = "", progress_bar: bool = True) -> bytes:
def _start_progress() -> None:
from trezor.ui.components.tt.text import Text
# Because we are drawing to the screen manually, without a layout, we
# should make sure that no other layout is running.
workflow.close_others()

View File

@ -3,10 +3,7 @@ from micropython import const
import storage.device
from trezor import wire, workflow
from trezor.messages import ButtonRequestType
from trezor.messages.PassphraseAck import PassphraseAck
from trezor.messages.PassphraseRequest import PassphraseRequest
from trezor.ui import ICON_CONFIG, draw_simple
from trezor.ui.components.tt.passphrase import CANCELLED, PassphraseKeyboard
from trezor.ui.components.tt.text import Text
from . import button_request
@ -41,6 +38,9 @@ async def _request_from_user(ctx: wire.Context) -> str:
async def _request_on_host(ctx: wire.Context) -> str:
from trezor.messages.PassphraseAck import PassphraseAck
from trezor.messages.PassphraseRequest import PassphraseRequest
_entry_dialog()
request = PassphraseRequest()
@ -74,6 +74,8 @@ async def _request_on_host(ctx: wire.Context) -> str:
async def _request_on_device(ctx: wire.Context) -> str:
from trezor.ui.components.tt.passphrase import CANCELLED, PassphraseKeyboard
await button_request(ctx, code=ButtonRequestType.PassphraseEntry)
keyboard = PassphraseKeyboard("Enter passphrase", _MAX_PASSPHRASE_LEN)

View File

@ -4,17 +4,19 @@ if not __debug__:
halt("debug mode inactive")
if __debug__:
from trezor import io, ui, wire
from trezor.messages import MessageType, DebugSwipeDirection
from trezor import config, log, loop, utils, wire
from trezor.ui import display
from trezor.messages import MessageType
from trezor.messages.DebugLinkLayout import DebugLinkLayout
from trezor import config, crypto, log, loop, utils
from trezor.messages.Success import Success
from apps import workflow_handlers
if False:
from trezor.ui import Layout
from trezor.messages.DebugLinkDecision import DebugLinkDecision
from trezor.messages.DebugLinkGetState import DebugLinkGetState
from trezor.messages.DebugLinkLayout import DebugLinkLayout
from trezor.messages.DebugLinkRecordScreen import DebugLinkRecordScreen
from trezor.messages.DebugLinkReseedRandom import DebugLinkReseedRandom
from trezor.messages.DebugLinkState import DebugLinkState
@ -43,36 +45,41 @@ if __debug__:
def screenshot() -> bool:
if save_screen:
ui.display.save(save_screen_directory + "/refresh-")
display.save(save_screen_directory + "/refresh-")
return True
return False
def notify_layout_change(layout: ui.Layout) -> None:
def notify_layout_change(layout: Layout) -> None:
global current_content
current_content = layout.read_content()
if watch_layout_changes:
layout_change_chan.publish(current_content)
async def debuglink_decision_dispatcher() -> None:
async def dispatch_debuglink_decision(msg: DebugLinkDecision) -> None:
from trezor.messages import DebugSwipeDirection
from trezor.ui import Result
from trezor.ui.components.tt import confirm, swipe
if msg.yes_no is not None:
await confirm_chan.put(
Result(confirm.CONFIRMED if msg.yes_no else confirm.CANCELLED)
)
if msg.swipe is not None:
if msg.swipe == DebugSwipeDirection.UP:
await swipe_chan.put(swipe.SWIPE_UP)
elif msg.swipe == DebugSwipeDirection.DOWN:
await swipe_chan.put(swipe.SWIPE_DOWN)
elif msg.swipe == DebugSwipeDirection.LEFT:
await swipe_chan.put(swipe.SWIPE_LEFT)
elif msg.swipe == DebugSwipeDirection.RIGHT:
await swipe_chan.put(swipe.SWIPE_RIGHT)
if msg.input is not None:
await input_chan.put(Result(msg.input))
async def debuglink_decision_dispatcher() -> None:
while True:
msg = await debuglink_decision_chan.take()
if msg.yes_no is not None:
await confirm_chan.put(
ui.Result(confirm.CONFIRMED if msg.yes_no else confirm.CANCELLED)
)
if msg.swipe is not None:
if msg.swipe == DebugSwipeDirection.UP:
await swipe_chan.put(swipe.SWIPE_UP)
elif msg.swipe == DebugSwipeDirection.DOWN:
await swipe_chan.put(swipe.SWIPE_DOWN)
elif msg.swipe == DebugSwipeDirection.LEFT:
await swipe_chan.put(swipe.SWIPE_LEFT)
elif msg.swipe == DebugSwipeDirection.RIGHT:
await swipe_chan.put(swipe.SWIPE_RIGHT)
if msg.input is not None:
await input_chan.put(ui.Result(msg.input))
await dispatch_debuglink_decision(msg)
loop.schedule(debuglink_decision_dispatcher())
@ -81,6 +88,8 @@ if __debug__:
await ctx.write(DebugLinkLayout(lines=content))
async def touch_hold(x: int, y: int, duration_ms: int) -> None:
from trezor import io
await loop.sleep(duration_ms)
loop.synthetic_events.append((io.TOUCH, (io.TOUCH_END, x, y)))
@ -96,6 +105,8 @@ if __debug__:
async def dispatch_DebugLinkDecision(
ctx: wire.Context, msg: DebugLinkDecision
) -> None:
from trezor import io
if debuglink_decision_chan.putters:
log.warning(__name__, "DebugLinkDecision queue is not empty")
@ -149,7 +160,7 @@ if __debug__:
save_screen = True
else:
save_screen = False
ui.display.clear_save() # clear C buffers
display.clear_save() # clear C buffers
return Success()
@ -157,12 +168,16 @@ if __debug__:
ctx: wire.Context, msg: DebugLinkReseedRandom
) -> Success:
if msg.value is not None:
crypto.random.reseed(msg.value)
from trezor.crypto import random
random.reseed(msg.value)
return Success()
async def dispatch_DebugLinkEraseSdCard(
ctx: wire.Context, msg: DebugLinkEraseSdCard
) -> Success:
from trezor import io
try:
io.sdcard.power_on()
if msg.format:

View File

@ -1,7 +1,7 @@
import gc
from trezorcrypto import random # avoid pulling in trezor.crypto
from trezor import utils
from trezor.crypto import random
if False:
from typing import Sequence

View File

@ -2,8 +2,6 @@ from micropython import const
from ubinascii import hexlify
from storage import common
from trezor.crypto import random
from trezor.messages import BackupType
if False:
from trezor.messages.ResetDevice import EnumTypeBackupType
@ -37,8 +35,6 @@ INITIALIZED = const(0x13) # bool (0x01 or empty)
_SAFETY_CHECK_LEVEL = const(0x14) # int
_EXPERIMENTAL_FEATURES = const(0x15) # bool (0x01 or empty)
_DEFAULT_BACKUP_TYPE = BackupType.Bip39
SAFETY_CHECK_LEVEL_STRICT : Literal[0] = const(0)
SAFETY_CHECK_LEVEL_PROMPT : Literal[1] = const(1)
_DEFAULT_SAFETY_CHECK_LEVEL = SAFETY_CHECK_LEVEL_STRICT
@ -79,6 +75,8 @@ def is_initialized() -> bool:
def _new_device_id() -> str:
from trezorcrypto import random # avoid pulling in trezor.crypto
return hexlify(random.bytes(12)).decode().upper()
@ -121,9 +119,11 @@ def get_mnemonic_secret() -> bytes | None:
def get_backup_type() -> EnumTypeBackupType:
from trezor.messages import BackupType
backup_type = common.get_uint8(_NAMESPACE, _BACKUP_TYPE)
if backup_type is None:
backup_type = _DEFAULT_BACKUP_TYPE
backup_type = BackupType.Bip39
if backup_type not in (
BackupType.Bip39,

View File

@ -1,7 +1,6 @@
from micropython import const
from storage import common, recovery_shares
from trezor.crypto import slip39
from storage import common
# Namespace:
_NAMESPACE = common.APP_RECOVERY
@ -86,22 +85,26 @@ def set_slip39_remaining_shares(shares_remaining: int, group_index: int) -> None
0x10 (16) was chosen as the default value because it's the max
share count for a group.
"""
from trezor.crypto.slip39 import MAX_SHARE_COUNT
_require_progress()
remaining = common.get(_NAMESPACE, _REMAINING)
group_count = get_slip39_group_count()
if not group_count:
raise RuntimeError
if remaining is None:
remaining = bytearray([slip39.MAX_SHARE_COUNT] * group_count)
remaining = bytearray([MAX_SHARE_COUNT] * group_count)
remaining = bytearray(remaining)
remaining[group_index] = shares_remaining
common.set(_NAMESPACE, _REMAINING, remaining)
def get_slip39_remaining_shares(group_index: int) -> int | None:
from trezor.crypto.slip39 import MAX_SHARE_COUNT
_require_progress()
remaining = common.get(_NAMESPACE, _REMAINING)
if remaining is None or remaining[group_index] == slip39.MAX_SHARE_COUNT:
if remaining is None or remaining[group_index] == MAX_SHARE_COUNT:
return None
else:
return remaining[group_index]
@ -120,6 +123,8 @@ def fetch_slip39_remaining_shares() -> list[int] | None:
def end_progress() -> None:
from . import recovery_shares
_require_progress()
common.delete(_NAMESPACE, _IN_PROGRESS)
common.delete(_NAMESPACE, _DRY_RUN)

View File

@ -1,4 +1,4 @@
from trezor import ui, utils
from trezor import utils
if False:
from typing import Any
@ -10,6 +10,8 @@ keepalive_callback: Any = None
def show_pin_timeout(seconds: int, progress: int, message: str) -> bool:
from trezor import ui
global _previous_progress
global _previous_seconds

View File

@ -5,9 +5,6 @@ from trezorui import Display
from trezor import io, loop, res, utils, workflow
if __debug__:
from apps.debug import notify_layout_change
if False:
from typing import Any, Awaitable, Generator, TypeVar
@ -37,9 +34,10 @@ _alert_in_progress = False
# in debug mode, display an indicator in top right corner
if __debug__:
from apps.debug import screenshot
def refresh() -> None:
from apps.debug import screenshot
if not screenshot():
display.bar(Display.WIDTH - 8, 0, 8, 8, 0xF800)
display.refresh()
@ -376,6 +374,8 @@ class Layout(Component):
self.dispatch(RENDER, 0, 0)
if __debug__ and self.should_notify_layout_change:
from apps.debug import notify_layout_change
# notify about change and do not notify again until next await.
# (handle_rendering might be called multiple times in a single await,
# because of the endless loop in __iter__)

View File

@ -1,8 +1,5 @@
from trezor import loop, ui, wire
if __debug__:
from apps.debug import confirm_signal
if False:
from typing import Any, Awaitable
@ -52,4 +49,6 @@ class ConfirmBase(ui.Layout):
return self.content.read_content()
def create_tasks(self) -> tuple[loop.Task, ...]:
from apps.debug import confirm_signal
return super().create_tasks() + (confirm_signal(),)

View File

@ -6,9 +6,6 @@ from trezor.ui.loader import Loader, LoaderDefault
from ..common.confirm import CANCELLED, CONFIRMED, INFO, ConfirmBase
from .button import Button, ButtonAbort, ButtonCancel, ButtonConfirm, ButtonDefault
if __debug__:
from apps.debug import swipe_signal, confirm_signal
if False:
from typing import Any
from .button import ButtonContent, ButtonStyleType
@ -96,6 +93,8 @@ class ConfirmPageable(Confirm):
directions = SWIPE_HORIZONTAL
if __debug__:
from apps.debug import swipe_signal
swipe = await loop.race(Swipe(directions), swipe_signal())
else:
swipe = await Swipe(directions)
@ -196,6 +195,8 @@ class InfoConfirm(ui.Layout):
return self.content.read_content()
def create_tasks(self) -> tuple[loop.Task, ...]:
from apps.debug import confirm_signal
return super().create_tasks() + (confirm_signal(),)
@ -273,4 +274,6 @@ class HoldToConfirm(ui.Layout):
return self.content.read_content()
def create_tasks(self) -> tuple[loop.Task, ...]:
from apps.debug import confirm_signal
return super().create_tasks() + (confirm_signal(),)

View File

@ -7,9 +7,6 @@ from .confirm import CANCELLED, CONFIRMED, Confirm
from .swipe import SWIPE_DOWN, SWIPE_UP, SWIPE_VERTICAL, Swipe
from .text import TEXT_MAX_LINES, Span, Text
if __debug__:
from apps.debug import confirm_signal, swipe_signal, notify_layout_change
_PAGINATED_LINE_WIDTH = const(204)
@ -81,6 +78,8 @@ class Paginated(ui.Layout):
directions = SWIPE_VERTICAL
if __debug__:
from apps.debug import swipe_signal
swipe = await loop.race(Swipe(directions), swipe_signal())
else:
swipe = await Swipe(directions)
@ -94,6 +93,8 @@ class Paginated(ui.Layout):
self.repaint = True
if __debug__:
from apps.debug import notify_layout_change
notify_layout_change(self)
self.on_change()
@ -110,6 +111,8 @@ class Paginated(ui.Layout):
# shut down by a DebugLink confirm, even if used outside of a confirm() call
# But we don't have any such usages in the codebase, and it doesn't actually
# make much sense to use a Paginated without a way to confirm it.
from apps.debug import confirm_signal
return tasks + (confirm_signal(),)
else:
return tasks
@ -230,6 +233,8 @@ class PaginatedWithButtons(ui.Layout):
return self.pages[self.page].read_content()
def create_tasks(self) -> tuple[loop.Task, ...]:
from apps.debug import confirm_signal
return super().create_tasks() + (confirm_signal(),)