rework high-level storage

We don't use Storage protobuf message anymore, and instead all keys are as granular as possible.

trezor.storage provides high-level function interface.
pull/25/head
Jan Pochyla 8 years ago
parent 53f6347838
commit a44e16a9ca

@ -12,11 +12,6 @@ _cached_root_node = None
async def get_node(session_id: int, path: list):
from trezor import ui
ui.display.clear()
ui.display.text_center(120, 120, 'Deriving key...',
ui.NORMAL, ui.GREY, ui.BLACK)
ui.display.refresh()
node = await get_root_node(session_id)
node.derive_path(path)
return node
@ -48,17 +43,15 @@ async def compute_seed(session_id):
from .request_pin import request_pin
from . import storage
try:
st = storage.get(session_id)
except KeyError:
if not storage.is_initialized():
raise wire.FailureError(Other, 'Device is not initialized')
st_pin = getattr(st, 'pin', '')
if st_pin and st_pin != await request_pin(session_id):
raise wire.FailureError(PinInvalid, 'PIN is incorrect')
if storage.is_protected_by_pin():
pin = await request_pin(session_id)
if not storage.check_pin(pin):
raise wire.FailureError(PinInvalid, 'PIN is incorrect')
st_passphrase_protection = getattr(st, 'passphrase_protection', False)
if st_passphrase_protection:
if storage.is_protected_by_passphrase():
from trezor.messages.PassphraseRequest import PassphraseRequest
from trezor.messages.wire_types import PassphraseAck
ack = await wire.reply_message(session_id, PassphraseRequest(), PassphraseAck)
@ -66,4 +59,4 @@ async def compute_seed(session_id):
else:
passphrase = ''
return bip39.seed(st.mnemonic, passphrase)
return bip39.seed(storage.get_mnemonic(), passphrase)

@ -1,27 +1,111 @@
import protobuf as p
from micropython import const
from trezor import config
from trezor.messages.Storage import Storage
_APP_COMMON = const(1)
APP_COMMON = const(1)
CFG_STORAGE = const(1)
_CFG_ID = const(0)
_CFG_VERSION = const(1)
_CFG_MNEMONIC = const(2)
_CFG_LANGUAGE = const(3)
_CFG_LABEL = const(4)
_CFG_PIN = const(5)
_CFG_PIN_ATTEMPTS = const(6)
_CFG_PASSPHRASE_PROTECTION = const(7)
_types = {
_CFG_ID: p.UnicodeType,
_CFG_VERSION: p.UVarintType,
_CFG_MNEMONIC: p.UnicodeType,
_CFG_LANGUAGE: p.UnicodeType,
_CFG_LABEL: p.UnicodeType,
_CFG_PIN: p.UnicodeType,
_CFG_PIN_ATTEMPTS: p.UVarintType,
_CFG_PASSPHRASE_PROTECTION: p.BoolType,
}
def has(session_id):
buf = config.get(session_id, APP_COMMON, CFG_STORAGE)
return bool(buf)
def get_device_id() -> str:
devid = _get(_CFG_ID)
if devid is None:
devid = _new_device_id()
_set(_CFG_ID, devid)
return devid
def get(session_id):
buf = config.get(session_id, APP_COMMON, CFG_STORAGE)
if not buf:
raise KeyError('Storage is not initialized')
return Storage.loads(buf)
def is_initialized() -> bool:
return _get(_CFG_VERSION) is not None
def set(session_id, st):
config.set(session_id, APP_COMMON, CFG_STORAGE, st.dumps())
def is_protected_by_pin() -> bool:
return _get(_CFG_PIN) is not None
def clear(session_id):
config.set(session_id, APP_COMMON, CFG_STORAGE, b'')
def is_protected_by_passphrase() -> bool:
return _get(_CFG_PASSPHRASE_PROTECTION) is True
def check_pin(pin: str) -> bool:
return _get(_CFG_PIN) == pin
def get_label() -> str:
return _get(_CFG_LABEL)
def get_mnemonic() -> str:
return _get(_CFG_MNEMONIC)
def load_mnemonic(mnemonic: str):
if is_initialized():
raise Exception('Device is already initialized')
_set(_CFG_VERSION, 1)
_set(_CFG_MNEMONIC, mnemonic)
def load_settings(language: str,
label: str,
pin: str,
passphrase_protection: bool):
if not is_initialized():
raise Exception('Device is not initialized')
_set(_CFG_LANGUAGE, language or None)
_set(_CFG_LABEL, label or None)
_set(_CFG_PIN, pin or None)
_set(_CFG_PIN_ATTEMPTS, None)
_set(_CFG_PASSPHRASE_PROTECTION, passphrase_protection)
def wipe():
_set(_CFG_ID, _new_device_id())
_set(_CFG_VERSION, None)
_set(_CFG_MNEMONIC, None)
_set(_CFG_LANGUAGE, None)
_set(_CFG_LABEL, None)
_set(_CFG_PIN, None)
_set(_CFG_PIN_ATTEMPTS, None)
_set(_CFG_PASSPHRASE_PROTECTION, None)
def _get(key: int):
buf = config.get(_APP_COMMON, key)
if buf:
val = _types[key].loads(buf)
else:
val = None
return val
def _set(key: int, val):
if val is not None:
buf = _types[key].dumps(val)
else:
buf = b''
config.set(_APP_COMMON, key, buf)
def _new_device_id() -> str:
from ubinascii import hexlify
from trezor.crypto import random
return str(hexlify(random.bytes(16)))

@ -5,37 +5,27 @@ from trezor.utils import unimport
@unimport
async def layout_load_device(message, session_id):
from trezor.crypto import bip39
from trezor.messages.Storage import Storage
from trezor.messages.Success import Success
from trezor.messages.FailureType import UnexpectedMessage, Other
from trezor.ui.text import Text
from ..common.confirm import require_confirm
from ..common import storage
if storage.has(session_id):
if storage.is_initialized():
raise wire.FailureError(UnexpectedMessage, 'Already initialized')
st = Storage()
st.imported = True
st.version = 1
st.pin = message.pin
st.passphrase_protection = message.passphrase_protection,
st.language = message.language
st.label = message.label
if hasattr(message, 'node'):
st.node = message.node
elif hasattr(message, 'mnemonic'):
st.mnemonic = message.mnemonic
if not message.skip_checksum and not bip39.check(message.mnemonic):
raise wire.FailureError(Other, 'Mnemonic is not valid')
if not message.skip_checksum and not bip39.check(message.mnemonic):
raise wire.FailureError(Other, 'Mnemonic is not valid')
await require_confirm(session_id, Text(
'Loading seed',
ui.BOLD, 'Loading private seed', 'is not recommended.',
ui.NORMAL, 'Continue only if you', 'know what you are doing!'))
storage.set(session_id, st)
storage.load_mnemonic(message.mnemonic)
storage.load_settings(pin=message.pin,
passphrase_protection=message.passphrase_protection,
language=message.language,
label=message.label)
return Success(message='Device loaded')

@ -6,27 +6,28 @@ from trezor.utils import unimport, chunks
@unimport
async def layout_reset_device(message, session_id):
from trezor.messages.Success import Success
from trezor.messages.Storage import Storage
from trezor.messages.FailureType import UnexpectedMessage
from ..common.request_pin import request_pin_twice
from ..common import storage
if storage.has(session_id):
if storage.is_initialized():
raise wire.FailureError(UnexpectedMessage, 'Already initialized')
mnemonic = await generate_mnemonic(
message.strength, message.display_random, session_id)
await show_mnemonic(mnemonic)
if message.pin_protection:
pin = await request_pin_twice(session_id)
else:
pin = ''
pin = None
storage.set(session_id, Storage(
version=1, pin=pin, mnemonic=mnemonic,
passphrase_protection=message.passphrase_protection,
language=message.language, label=message.label))
storage.load_mnemonic(mnemonic)
storage.load_settings(pin=pin,
passphrase_protection=message.passphrase_protection,
language=message.language,
label=message.label)
return Success(message='Initialized')

@ -3,20 +3,18 @@ from trezor.utils import unimport
@unimport
async def layout_wipe_device(message, session_id):
async def layout_wipe_device(_, session_id):
from trezor.messages.Success import Success
from trezor.ui.text import Text
from ..common.confirm import hold_to_confirm
from ..common import storage
ui.display.clear()
content = Text(
await hold_to_confirm(session_id, Text(
'Wiping device',
ui.ICON_WIPE,
ui.BOLD, 'Do you really want to', 'wipe the device?',
ui.NORMAL, '', 'All data will be lost.')
await hold_to_confirm(session_id, content)
ui.NORMAL, '', 'All data will be lost.'))
storage.clear(session_id)
storage.wipe()
return Success(message='Device wiped')

@ -34,17 +34,11 @@ def _save():
_load()
def get(session_id, app_id, key, default=None):
# TODO: session_id
def get(app_id, key, default=None):
return _mock.get((app_id << 8) | key, default)
def set(session_id, app_id, key, value):
# TODO: session_id
def set(app_id, key, value):
_mock[(app_id << 8) | key] = value
_save()
return True
def commit(session_id):
pass

Loading…
Cancel
Save