diff --git a/core/src/apps/webauthn/__init__.py b/core/src/apps/webauthn/__init__.py index 646cab820..c4c49ebf9 100644 --- a/core/src/apps/webauthn/__init__.py +++ b/core/src/apps/webauthn/__init__.py @@ -3,11 +3,22 @@ import ustruct import utime from micropython import const -from trezor import io, log, loop, ui, utils, workflow -from trezor.crypto import der, hashlib, hmac, random +from trezor import config, io, log, loop, ui, utils, workflow +from trezor.crypto import aes, der, hashlib, hmac, random from trezor.crypto.curve import nist256p1 +from trezor.ui.confirm import CONFIRMED, Confirm, ConfirmPageable, Pageable +from trezor.ui.text import Text, text_center_trim_left, text_center_trim_right -from apps.common import HARDENED, storage +from apps.common import cbor, storage +from apps.common.storage.webauthn import ( + erase_resident_credentials, + get_resident_credentials, + store_resident_credential, +) +from apps.webauthn.credential import Credential, Fido2Credential, U2fCredential + +if False: + from typing import Coroutine, List, Optional _HID_RPT_SIZE = const(64) _CID_BROADCAST = const(0xFFFFFFFF) # broadcast channel id @@ -23,6 +34,9 @@ _CMD_MSG = const(0x83) # send U2F message frame _CMD_LOCK = const(0x84) # send lock channel command _CMD_INIT = const(0x86) # channel initialization _CMD_WINK = const(0x88) # send device identification wink +_CMD_CBOR = const(0x90) # send encapsulated CTAP CBOR encoded message +_CMD_CANCEL = const(0x91) # cancel any outstanding requests on this CID +_CMD_KEEPALIVE = const(0xBB) # processing a message _CMD_ERROR = const(0xBF) # error response # types for the msg cmd @@ -30,6 +44,78 @@ _MSG_REGISTER = const(0x01) # registration command _MSG_AUTHENTICATE = const(0x02) # authenticate/sign command _MSG_VERSION = const(0x03) # read version string command +# types for the cbor cmd +_CBOR_MAKE_CREDENTIAL = const(0x01) # generate new credential command +_CBOR_GET_ASSERTION = const(0x02) # authenticate command +_CBOR_GET_INFO = const(0x04) # report AAGUID and device capabilities +_CBOR_CLIENT_PIN = const(0x06) # PIN and pinToken management +_CBOR_RESET = const(0x07) # factory reset, invalidating all generated credentials +_CBOR_GET_NEXT_ASSERTION = const(0x08) # obtain the next per-credential signature + +# CBOR MakeCredential command parameter keys +_MAKECRED_CMD_CLIENT_DATA_HASH = const(0x01) # bytes, required +_MAKECRED_CMD_RP = const(0x02) # map, required +_MAKECRED_CMD_USER = const(0x03) # map, required +_MAKECRED_CMD_PUB_KEY_CRED_PARAMS = const(0x04) # array of maps, required +_MAKECRED_CMD_EXCLUDE_LIST = const(0x05) # array of maps, optional +_MAKECRED_CMD_EXTENSIONS = const(0x06) # map, optional +_MAKECRED_CMD_OPTIONS = const(0x07) # map, optional +_MAKECRED_CMD_PIN_AUTH = const(0x08) # bytes, optional + +# CBOR MakeCredential response member keys +_MAKECRED_RESP_FMT = const(0x01) # str, required +_MAKECRED_RESP_AUTH_DATA = const(0x02) # bytes, required +_MAKECRED_RESP_ATT_STMT = const(0x03) # map, required + +# CBOR GetAssertion command parameter keys +_GETASSERT_CMD_RP_ID = const(0x01) # str, required +_GETASSERT_CMD_CLIENT_DATA_HASH = const(0x02) # bytes, required +_GETASSERT_CMD_ALLOW_LIST = const(0x03) # array of maps, optional +_GETASSERT_CMD_EXTENSIONS = const(0x04) # map, optional +_GETASSERT_CMD_OPTIONS = const(0x05) # map, optional +_GETASSERT_CMD_PIN_AUTH = const(0x06) # bytes, optional + +# CBOR GetAssertion response member keys +_GETASSERT_RESP_CREDENTIAL = const(0x01) # map, optional +_GETASSERT_RESP_AUTH_DATA = const(0x02) # bytes, required +_GETASSERT_RESP_SIGNATURE = const(0x03) # bytes, required +_GETASSERT_RESP_USER = const(0x04) # map, optional +_GETASSERT_RESP_PUB_KEY_CREDENTIAL_USER_ENTITY = const(0x04) # map, optional +_GETASSERT_RESP_NUM_OF_CREDENTIALS = const(0x05) # int, optional + +# CBOR GetInfo response member keys +_GETINFO_RESP_VERSIONS = const(0x01) # array of str, required +_GETINFO_RESP_EXTENSIONS = const(0x02) # array of str, optional +_GETINFO_RESP_AAGUID = const(0x03) # bytes(16), required +_GETINFO_RESP_OPTIONS = const(0x04) # map, optional + +# CBOR ClientPin command parameter keys +_CLIENTPIN_CMD_PIN_PROTOCOL = const(0x01) # unsigned int, required +_CLIENTPIN_CMD_SUBCOMMAND = const(0x02) # unsigned int, required +_CLIENTPIN_SUBCMD_GET_KEY_AGREEMENT = const(0x02) + +# CBOR ClientPin response member keys +_CLIENTPIN_RESP_KEY_AGREEMENT = const(0x01) # COSE_Key, optional + +# status codes for the keepalive cmd +_KEEPALIVE_STATUS_PROCESSING = const(0x01) # still processing the current request +_KEEPALIVE_STATUS_UP_NEEDED = const(0x02) # waiting for user presence + +# time intervals and timeouts +_KEEPALIVE_INTERVAL_MS = const(80) # interval between keepalive commands +_U2F_CONFIRM_TIMEOUT_MS = const(10 * 1000) +_FIDO2_CONFIRM_TIMEOUT_MS = const(60 * 1000) + +# CBOR object signing and encryption algorithms and keys +_COSE_ALG_KEY = const(3) +_COSE_ALG_ES256 = const(-7) # ECDSA P-256 with SHA-256 +_COSE_KEY_TYPE_KEY = const(1) +_COSE_KEY_TYPE_EC2 = const(2) # elliptic curve keys with x- and y-coordinate pair +_COSE_CURVE_KEY = const(-1) # elliptic curve identifier +_COSE_CURVE_P256 = const(1) # P-256 curve +_COSE_X_COORD_KEY = const(-2) # x coordinate of the public key +_COSE_Y_COORD_KEY = const(-3) # y coordinate of the public key + # hid error codes _ERR_NONE = const(0x00) # no error _ERR_INVALID_CMD = const(0x01) # invalid command @@ -40,6 +126,17 @@ _ERR_MSG_TIMEOUT = const(0x05) # message has timed out _ERR_CHANNEL_BUSY = const(0x06) # channel busy _ERR_LOCK_REQUIRED = const(0x0A) # command requires channel lock _ERR_INVALID_CID = const(0x0B) # command not allowed on this cid +_ERR_INVALID_CBOR = const(0x12) # error when parsing CBOR +_ERR_CREDENTIAL_EXCLUDED = const(0x19) # valid credential found in the exclude list +_ERR_UNSUPPORTED_ALGORITHM = const(0x26) # requested COSE algorithm not supported +_ERR_OPERATION_DENIED = const(0x27) # user declined or timed out +_ERR_KEY_STORE_FULL = const(0x28) # internal key storage is full +_ERR_UNSUPPORTED_OPTION = const(0x2B) # unsupported option +_ERR_INVALID_OPTION = const(0x2C) # not a valid option for current operation +_ERR_KEEPALIVE_CANCEL = const(0x2D) # pending keep alive was cancelled +_ERR_NO_CREDENTIALS = const(0x2E) # no valid credentials provided +_ERR_NOT_ALLOWED = const(0x30) # continuation command not allowed +_ERR_PIN_AUTH_INVALID = const(0x33) # pinAuth verification failed _ERR_OTHER = const(0x7F) # other unspecified error # command status responses @@ -53,19 +150,26 @@ _SW_CLA_NOT_SUPPORTED = const(0x6E00) # init response _CAPFLAG_WINK = const(0x01) # device supports _CMD_WINK +_CAPFLAG_CBOR = const(0x04) # device supports _CMD_CBOR _U2FHID_IF_VERSION = const(2) # interface version # register response -_U2F_KEY_PATH = const(0x80553246) _U2F_REGISTER_ID = const(0x05) # version 2 registration identifier _U2F_ATT_PRIV_KEY = b"q&\xac+\xf6D\xdca\x86\xad\x83\xef\x1f\xcd\xf1*W\xb5\xcf\xa2\x00\x0b\x8a\xd0'\xe9V\xe8T\xc5\n\x8b" _U2F_ATT_CERT = b"0\x82\x01\x180\x81\xc0\x02\t\x00\xb1\xd9\x8fBdr\xd3,0\n\x06\x08*\x86H\xce=\x04\x03\x020\x151\x130\x11\x06\x03U\x04\x03\x0c\nTrezor U2F0\x1e\x17\r160429133153Z\x17\r260427133153Z0\x151\x130\x11\x06\x03U\x04\x03\x0c\nTrezor U2F0Y0\x13\x06\x07*\x86H\xce=\x02\x01\x06\x08*\x86H\xce=\x03\x01\x07\x03B\x00\x04\xd9\x18\xbd\xfa\x8aT\xac\x92\xe9\r\xa9\x1f\xcaz\xa2dT\xc0\xd1s61M\xde\x83\xa5K\x86\xb5\xdfN\xf0Re\x9a\x1do\xfc\xb7F\x7f\x1a\xcd\xdb\x8a3\x08\x0b^\xed\x91\x89\x13\xf4C\xa5&\x1b\xc7{h`o\xc10\n\x06\x08*\x86H\xce=\x04\x03\x02\x03G\x000D\x02 $\x1e\x81\xff\xd2\xe5\xe6\x156\x94\xc3U.\x8f\xeb\xd7\x1e\x895\x92\x1c\xb4\x83ACq\x1cv\xea\xee\xf3\x95\x02 _\x80\xeb\x10\xf2\\\xcc9\x8b<\xa8\xa9\xad\xa4\x02\x7f\x93\x13 w\xb7\xab\xcewFZ'\xf5=3\xa1\x1d" _BOGUS_APPID = b"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" +_AAGUID = ( + b"\xd6\xd0\xbd\xc3b\xee\xc4\xdb\xde\x8dzenJD\x87" +) # First 16 bytes of SHA-256("TREZOR 2") +_BOGUS_PRIV_KEY = b"\xAA" * 32 # authentication control byte _AUTH_ENFORCE = const(0x03) # enforce user presence and sign _AUTH_CHECK_ONLY = const(0x07) # check only -_AUTH_FLAG_TUP = const(0x01) # test of user presence set +_AUTH_FLAG_UP = const(1 << 0) # user present +_AUTH_FLAG_UV = const(1 << 2) # user verified +_AUTH_FLAG_AT = const(1 << 6) # attested credential data included +_AUTH_FLAG_ED = const(1 << 7) # extension data included # common raw message format (ISO7816-4:2005 mapping) _APDU_CLA = const(0) # uint8_t cla; // Class - reserved @@ -77,9 +181,23 @@ _APDU_LC2 = const(5) # uint8_t lc2; // Length field, MSB _APDU_LC3 = const(6) # uint8_t lc3; // Length field, LSB _APDU_DATA = const(7) # uint8_t data[1]; // Data field +# Dialog results +_RESULT_NONE = const(0) +_RESULT_CONFIRM = const(1) # User confirmed. +_RESULT_DECLINE = const(2) # User declined. +_RESULT_CANCEL = const(3) # Request was cancelled by _CMD_CANCEL. +_RESULT_TIMEOUT = const(4) # Request exceeded _FIDO2_CONFIRM_TIMEOUT_MS. + _FRAME_INIT_SIZE = 57 _FRAME_CONT_SIZE = 59 +# Generate the authenticatorKeyAgreementKey used for ECDH in authenticatorClientPIN getKeyAgreement. +_KEY_AGREEMENT_PRIVKEY = nist256p1.generate_secret() +_KEY_AGREEMENT_PUBKEY = nist256p1.publickey(_KEY_AGREEMENT_PRIVKEY, False) + +_ALLOW_RESIDENT_CREDENTIALS = True +_ALLOW_FIDO2 = True + def frame_init() -> dict: # uint32_t cid; // Channel identifier @@ -224,7 +342,7 @@ class Cmd: return Msg(self.cid, cla, ins, p1, p2, lc, data) -async def read_cmd(iface: io.HID) -> Cmd: +async def read_cmd(iface: io.HID) -> Optional[Cmd]: desc_init = frame_init() desc_cont = frame_cont() read = loop.wait(iface.iface_num() | io.POLL_READ) @@ -314,145 +432,537 @@ async def send_cmd(cmd: Cmd, iface: io.HID) -> None: seq += 1 -def boot(iface: io.HID): +def send_cmd_sync(cmd: Cmd, iface: io.HID) -> None: + init_desc = frame_init() + cont_desc = frame_cont() + offset = 0 + seq = 0 + datalen = len(cmd.data) + + buf, frm = make_struct(init_desc) + frm.cid = cmd.cid + frm.cmd = cmd.cmd + frm.bcnt = datalen + + offset += utils.memcpy(frm.data, 0, cmd.data, offset, datalen) + iface.write(buf) + + if offset < datalen: + frm = overlay_struct(buf, cont_desc) + + while offset < datalen: + frm.seq = seq + copied = utils.memcpy(frm.data, 0, cmd.data, offset, datalen) + offset += copied + if copied < _FRAME_CONT_SIZE: + frm.data[copied:] = bytearray(_FRAME_CONT_SIZE - copied) + iface.write_blocking(buf, 1000) + seq += 1 + + +def boot(iface: io.HID) -> None: loop.schedule(handle_reports(iface)) -async def handle_reports(iface: io.HID): - state = ConfirmState() +async def handle_reports(iface: io.HID) -> None: + dialog_mgr = DialogManager(iface) while True: try: req = await read_cmd(iface) if req is None: continue - resp = dispatch_cmd(req, state) - await send_cmd(resp, iface) + if dialog_mgr.is_busy() and req.cid not in ( + dialog_mgr.get_cid(), + _CID_BROADCAST, + ): + resp = cmd_error(req.cid, _ERR_CHANNEL_BUSY) # type: Optional[Cmd] + else: + resp = dispatch_cmd(req, dialog_mgr) + if resp is not None: + await send_cmd(resp, iface) except Exception as e: log.exception(__name__, e) -_CONFIRM_REGISTER = const(0) -_CONFIRM_AUTHENTICATE = const(1) -_CONFIRM_TIMEOUT_MS = const(10 * 1000) +class KeepaliveCallback: + def __init__(self, cid: int, iface: io.HID) -> None: + self.cid = cid + self.iface = iface + def __call__(self) -> None: + send_cmd_sync(cmd_keepalive(self.cid, _KEEPALIVE_STATUS_PROCESSING), self.iface) -class ConfirmState: - def __init__(self) -> None: - self.reset() - def reset(self): - self.action = None - self.checksum = None - self.app_id = None +async def check_pin(keepalive_callback: KeepaliveCallback) -> bool: + from apps.common.request_pin import PinCancelled, request_pin + import trezor.pin - self.confirmed = None - self.deadline = None - self.workflow = None + try: + trezor.pin.keepalive_callback = keepalive_callback + if config.has_pin(): + pin = await request_pin("Enter your PIN", config.get_pin_rem()) + while config.unlock(trezor.pin.pin_to_int(pin)) is not True: + pin = await request_pin("Wrong PIN, enter again", config.get_pin_rem()) + ret = True + else: + ret = config.unlock(trezor.pin.pin_to_int("")) + except PinCancelled: + ret = False + finally: + trezor.pin.keepalive_callback = None - def compare(self, action: int, checksum: bytes) -> bool: - if self.action != action or self.checksum != checksum: - return False - if utime.ticks_ms() >= self.deadline: - if self.workflow is not None: - # We crossed the deadline, kill the running confirmation - # workflow. `self.workflow` is reset in the finally - # handler in `confirm_workflow`. - loop.close(self.workflow) - return False - return True + return ret - def setup(self, action: int, checksum: bytes, app_id: bytes) -> bool: - if workflow.tasks or self.workflow: - # If any other workflow is running, we bail out. - return False - self.action = action - self.checksum = checksum - self.app_id = app_id +class ConfirmInfo: + def __init__(self) -> None: + self.app_icon = None # type: Optional[bytes] - self.confirmed = None - self.workflow = self.confirm_workflow() - loop.schedule(self.workflow) - return True + def get_header(self) -> Optional[str]: + return None + + def app_name(self) -> str: + raise NotImplementedError - def keepalive(self): - self.deadline = utime.ticks_ms() + _CONFIRM_TIMEOUT_MS + def account_name(self) -> Optional[str]: + return None + + def load_icon(self, rp_id_hash: bytes) -> None: + from trezor import res + from apps.webauthn import knownapps - async def confirm_workflow(self) -> None: try: - workflow.on_start(self.workflow) - await self.confirm_layout() - finally: - workflow.on_close(self.workflow) - self.workflow = None + namepart = knownapps.knownapps[rp_id_hash].lower().replace(" ", "_") + icon = res.load("apps/webauthn/res/icon_%s.toif" % namepart) + except Exception as e: + icon = res.load("apps/webauthn/res/icon_webauthn.toif") + if __debug__: + log.exception(__name__, e) + self.app_icon = icon - async def confirm_layout(self) -> None: - from trezor.ui.confirm import Confirm, CONFIRMED - from trezor.ui.text import Text - app_id = bytes(self.app_id) # could be bytearray, which doesn't have __hash__ +class ConfirmContent(ui.Component): + def __init__(self, info: ConfirmInfo) -> None: + self.info = info + self.repaint = True - if app_id == _BOGUS_APPID and self.action == _CONFIRM_REGISTER: + def on_render(self) -> None: + if self.repaint: + header = self.info.get_header() + + if header is None or self.info.app_icon is None: + return + ui.header(header, ui.ICON_DEFAULT, ui.GREEN, ui.BG, ui.GREEN) + ui.display.image((ui.WIDTH - 64) // 2, 48, self.info.app_icon) + + app_name = self.info.app_name() + account_name = self.info.account_name() + + # Dummy requests usually have some text as both app_name and account_name, + # in that case show the text only once. + if account_name is not None: + if app_name != account_name: + text_center_trim_left(ui.WIDTH // 2, 140, app_name) + text_center_trim_right(ui.WIDTH // 2, 172, account_name) + else: + text_center_trim_right(ui.WIDTH // 2, 156, account_name) + else: + text_center_trim_left(ui.WIDTH // 2, 156, app_name) + + self.repaint = False + + +class State: + def __init__(self, cid: int, iface: io.HID) -> None: + self.cid = cid + self.iface = iface + + def keepalive_status(self) -> Optional[int]: + return None + + def timeout_ms(self) -> int: + raise NotImplementedError + + async def confirm_dialog(self) -> bool: + pass + + async def on_confirm(self) -> None: + pass + + async def on_decline(self) -> None: + pass + + async def on_timeout(self) -> None: + pass + + async def on_cancel(self) -> None: + pass + + +class U2fState(State, ConfirmInfo): + def __init__( + self, cid: int, iface: io.HID, req_data: bytes, cred: Credential + ) -> None: + State.__init__(self, cid, iface) + ConfirmInfo.__init__(self) + self._cred = cred + self._req_data = req_data + self.load_icon(self._cred.rp_id_hash) + + def timeout_ms(self) -> int: + return _U2F_CONFIRM_TIMEOUT_MS + + def app_name(self) -> str: + return self._cred.app_name() + + def account_name(self) -> Optional[str]: + return self._cred.account_name() + + +class U2fConfirmRegister(U2fState): + def __init__( + self, cid: int, iface: io.HID, req_data: bytes, cred: U2fCredential + ) -> None: + super().__init__(cid, iface, req_data, cred) + + async def confirm_dialog(self) -> bool: + if self._cred.rp_id_hash == _BOGUS_APPID: text = Text("U2F", ui.ICON_WRONG, ui.RED) text.normal( "Another U2F device", "was used to register", "in this application." ) - dialog = Confirm(text) + return await Confirm(text, confirm=None) is CONFIRMED else: - content = ConfirmContent(self.action, app_id) - dialog = Confirm(content) + content = ConfirmContent(self) + return await Confirm(content) is CONFIRMED - self.confirmed = await dialog is CONFIRMED + def get_header(self) -> str: + return "U2F Register" + def __eq__(self, other: object) -> bool: + return ( + isinstance(other, U2fConfirmRegister) + and self.cid == other.cid + and self._req_data == other._req_data + ) -class ConfirmContent(ui.Component): - def __init__(self, action: int, app_id: bytes) -> None: - self.action = action - self.app_id = app_id - self.app_name = None - self.app_icon = None - self.repaint = True - self.boot() - def boot(self) -> None: - from ubinascii import hexlify - from trezor import res - from apps.webauthn import knownapps +class U2fConfirmAuthenticate(U2fState): + def __init__( + self, cid: int, iface: io.HID, req_data: bytes, cred: Credential + ) -> None: + super().__init__(cid, iface, req_data, cred) - if self.app_id in knownapps.knownapps: - name = knownapps.knownapps[self.app_id] - try: - namepart = name.lower().replace(" ", "_") - icon = res.load("apps/webauthn/res/icon_%s.toif" % namepart) - except Exception as e: - icon = res.load("apps/webauthn/res/icon_webauthn.toif") - if __debug__: - log.exception(__name__, e) - else: - name = "%s...%s" % ( - hexlify(self.app_id[:4]).decode(), - hexlify(self.app_id[-4:]).decode(), + def get_header(self) -> str: + return "U2F Authenticate" + + async def confirm_dialog(self) -> bool: + content = ConfirmContent(self) + return await Confirm(content) is CONFIRMED + + def __eq__(self, other: object) -> bool: + return ( + isinstance(other, U2fConfirmAuthenticate) + and self.cid == other.cid + and self._req_data == other._req_data + ) + + +class Fido2State(State): + def __init__(self, cid: int, iface: io.HID) -> None: + super().__init__(cid, iface) + + def keepalive_status(self) -> int: + return _KEEPALIVE_STATUS_UP_NEEDED + + def timeout_ms(self) -> int: + return _FIDO2_CONFIRM_TIMEOUT_MS + + async def on_confirm(self) -> None: + cmd = cbor_error(self.cid, _ERR_OPERATION_DENIED) + await send_cmd(cmd, self.iface) + + async def on_decline(self) -> None: + cmd = cbor_error(self.cid, _ERR_OPERATION_DENIED) + await send_cmd(cmd, self.iface) + + async def on_timeout(self) -> None: + await self.on_decline() + + async def on_cancel(self) -> None: + cmd = cbor_error(self.cid, _ERR_KEEPALIVE_CANCEL) + await send_cmd(cmd, self.iface) + + +class Fido2ConfirmMakeCredential(Fido2State, ConfirmInfo): + def __init__( + self, + cid: int, + iface: io.HID, + client_data_hash: bytes, + cred: Fido2Credential, + resident: bool, + user_verification: bool, + ) -> None: + Fido2State.__init__(self, cid, iface) + ConfirmInfo.__init__(self) + self._client_data_hash = client_data_hash + self._cred = cred + self._resident = resident + self._user_verification = user_verification + self.load_icon(cred.rp_id_hash) + + def get_header(self) -> str: + return "FIDO2 Register" + + def app_name(self) -> str: + return self._cred.app_name() + + def account_name(self) -> Optional[str]: + return self._cred.account_name() + + async def confirm_dialog(self) -> bool: + content = ConfirmContent(self) + if await Confirm(content) is not CONFIRMED: + return False + if self._user_verification: + return await check_pin(KeepaliveCallback(self.cid, self.iface)) + return True + + async def on_confirm(self) -> None: + self._cred.generate_id() + send_cmd_sync(cmd_keepalive(self.cid, _KEEPALIVE_STATUS_PROCESSING), self.iface) + response_data = cbor_make_credential_sign( + self._client_data_hash, self._cred, self._user_verification + ) + + cmd = Cmd(self.cid, _CMD_CBOR, bytes([_ERR_NONE]) + response_data) + if self._resident: + send_cmd_sync( + cmd_keepalive(self.cid, _KEEPALIVE_STATUS_PROCESSING), self.iface ) - icon = res.load("apps/webauthn/res/icon_webauthn.toif") - self.app_name = name - self.app_icon = icon + if not store_resident_credential(self._cred): + cmd = cbor_error(self.cid, _ERR_KEY_STORE_FULL) + await send_cmd(cmd, self.iface) - def on_render(self) -> None: - if self.repaint: - if self.action == _CONFIRM_REGISTER: - header = "U2F Register" - else: - header = "U2F Authenticate" - ui.header(header, ui.ICON_DEFAULT, ui.GREEN, ui.BG, ui.GREEN) - ui.display.image((ui.WIDTH - 64) // 2, 64, self.app_icon) - ui.display.text_center( - ui.WIDTH // 2, 168, self.app_name, ui.MONO, ui.FG, ui.BG + +class Fido2ConfirmExcluded(Fido2ConfirmMakeCredential): + def __init__(self, cid: int, iface: io.HID, cred: Fido2Credential) -> None: + super().__init__(cid, iface, b"", cred, resident=False, user_verification=False) + + async def on_confirm(self) -> None: + cmd = cbor_error(self.cid, _ERR_CREDENTIAL_EXCLUDED) + await send_cmd(cmd, self.iface) + + text = Text("FIDO2 Register", ui.ICON_WRONG, ui.RED) + text.normal("This device is already", "registered with", self._cred.rp_id + ".") + await Confirm(text, confirm=None) + + +class Fido2ConfirmGetAssertion(Fido2State, ConfirmInfo, Pageable): + def __init__( + self, + cid: int, + iface: io.HID, + client_data_hash: bytes, + creds: List[Credential], + hmac_secret: Optional[dict], + user_verification: bool, + ) -> None: + Fido2State.__init__(self, cid, iface) + ConfirmInfo.__init__(self) + Pageable.__init__(self) + self._client_data_hash = client_data_hash + self._creds = creds + self._hmac_secret = hmac_secret + self._user_verification = user_verification + self.load_icon(self._creds[0].rp_id_hash) + + def get_header(self) -> str: + return "FIDO2 Authenticate" + + def app_name(self) -> str: + return self._creds[self.page()].app_name() + + def account_name(self) -> Optional[str]: + return self._creds[self.page()].account_name() + + def page_count(self) -> int: + return len(self._creds) + + async def confirm_dialog(self) -> bool: + content = ConfirmContent(self) + if await ConfirmPageable(self, content) is not CONFIRMED: + return False + if self._user_verification: + return await check_pin(KeepaliveCallback(self.cid, self.iface)) + return True + + async def on_confirm(self) -> None: + cred = self._creds[self.page()] + try: + send_cmd_sync( + cmd_keepalive(self.cid, _KEEPALIVE_STATUS_PROCESSING), self.iface ) - self.repaint = False + response_data = cbor_get_assertion_sign( + self._client_data_hash, + cred.rp_id_hash, + cred, + self._hmac_secret, + True, + self._user_verification, + ) + cmd = Cmd(self.cid, _CMD_CBOR, bytes([_ERR_NONE]) + response_data) + except Exception: + cmd = cbor_error(self.cid, _ERR_OPERATION_DENIED) + + await send_cmd(cmd, self.iface) + +class Fido2ConfirmNoPin(State): + def __init__(self, cid: int, iface: io.HID) -> None: + super().__init__(cid, iface) -def dispatch_cmd(req: Cmd, state: ConfirmState) -> Cmd: + def timeout_ms(self) -> int: + return _FIDO2_CONFIRM_TIMEOUT_MS + + async def confirm_dialog(self) -> bool: + text = Text("FIDO2 Verify User", ui.ICON_WRONG, ui.RED) + text.normal("Unable to verify user.", "Please enable PIN", "protection.") + return await Confirm(text, confirm=None) is CONFIRMED + + +class Fido2ConfirmNoCredentials(Fido2ConfirmGetAssertion): + def __init__(self, cid: int, iface: io.HID, rp_id: str) -> None: + cred = Fido2Credential() + cred.rp_id = rp_id + super().__init__(cid, iface, b"", [cred], {}, user_verification=False) + + async def on_confirm(self) -> None: + cmd = cbor_error(self.cid, _ERR_NO_CREDENTIALS) + await send_cmd(cmd, self.iface) + + text = Text("FIDO2 Authenticate", ui.ICON_WRONG, ui.RED) + text.normal( + "This device is not", "registered with", self._creds[0].app_name() + "." + ) + await Confirm(text, confirm=None) + + +class Fido2ConfirmReset(Fido2State): + def __init__(self, cid: int, iface: io.HID) -> None: + super().__init__(cid, iface) + + async def confirm_dialog(self) -> bool: + text = Text("FIDO2 Reset", ui.ICON_CONFIG) + text.normal("Do you really want to") + text.bold("erase all credentials?") + return await Confirm(text) is CONFIRMED + + async def on_confirm(self) -> None: + erase_resident_credentials() + cmd = Cmd(self.cid, _CMD_CBOR, bytes([_ERR_NONE])) + await send_cmd(cmd, self.iface) + + +class DialogManager: + def __init__(self, iface: io.HID) -> None: + self.iface = iface + self._clear() + + def _clear(self) -> None: + self.state = None # type: Optional[State] + self.deadline = 0 + self.result = _RESULT_NONE + self.workflow = None # type: Optional[Coroutine] + self.keepalive = None # type: Optional[Coroutine] + + def reset_timeout(self) -> None: + if self.state is not None: + self.deadline = utime.ticks_ms() + self.state.timeout_ms() + + def reset(self) -> None: + if self.workflow is not None: + loop.close(self.workflow) + if self.keepalive is not None: + loop.close(self.keepalive) + self._clear() + + def get_cid(self) -> int: + if self.state is None: + return 0 + return self.state.cid + + def is_busy(self) -> bool: + if utime.ticks_ms() >= self.deadline: + self.reset() + return bool(workflow.tasks or self.workflow) + + def compare(self, state: State) -> bool: + if self.state != state: + return False + if utime.ticks_ms() >= self.deadline: + self.reset() + return False + return True + + def set_state(self, state: State) -> bool: + if self.is_busy(): + return False + + self.state = state + self.reset_timeout() + self.result = _RESULT_NONE + if state.keepalive_status() is not None: + self.keepalive = self.keepalive_loop() + loop.schedule(self.keepalive) + else: + self.keepalive = None + self.workflow = self.dialog_workflow() + loop.schedule(self.workflow) + return True + + async def keepalive_loop(self) -> None: + if not isinstance(self.state, Fido2State): + return + while utime.ticks_ms() < self.deadline: + cmd = cmd_keepalive(self.state.cid, self.state.keepalive_status()) + await send_cmd(cmd, self.iface) + await loop.sleep(_KEEPALIVE_INTERVAL_MS * 1000) + + self.result = _RESULT_TIMEOUT + self.reset() + + async def dialog_workflow(self) -> None: + if self.workflow is None or self.state is None: + return + + try: + workflow.on_start(self.workflow) + if await self.state.confirm_dialog(): + self.result = _RESULT_CONFIRM + else: + self.result = _RESULT_DECLINE + finally: + if self.keepalive is not None: + loop.close(self.keepalive) + self.keepalive = None + + if self.result == _RESULT_CONFIRM: + await self.state.on_confirm() + elif self.result == _RESULT_CANCEL: + await self.state.on_cancel() + elif self.result == _RESULT_TIMEOUT: + await self.state.on_timeout() + else: + await self.state.on_decline() + + workflow.on_close(self.workflow) + self.workflow = None + + +def dispatch_cmd(req: Cmd, dialog_mgr: DialogManager) -> Optional[Cmd]: if req.cmd == _CMD_MSG: m = req.to_msg() @@ -469,11 +979,11 @@ def dispatch_cmd(req: Cmd, state: ConfirmState) -> Cmd: if m.ins == _MSG_REGISTER: if __debug__: log.debug(__name__, "_MSG_REGISTER") - return msg_register(m, state) + return msg_register(m, dialog_mgr) elif m.ins == _MSG_AUTHENTICATE: if __debug__: log.debug(__name__, "_MSG_AUTHENTICATE") - return msg_authenticate(m, state) + return msg_authenticate(m, dialog_mgr) elif m.ins == _MSG_VERSION: if __debug__: log.debug(__name__, "_MSG_VERSION") @@ -494,7 +1004,44 @@ def dispatch_cmd(req: Cmd, state: ConfirmState) -> Cmd: elif req.cmd == _CMD_WINK: if __debug__: log.debug(__name__, "_CMD_WINK") + loop.schedule(ui.alert()) return req + elif req.cmd == _CMD_CBOR and _ALLOW_FIDO2: + if req.data[0] == _CBOR_MAKE_CREDENTIAL: + if __debug__: + log.debug(__name__, "_CBOR_MAKE_CREDENTIAL") + return cbor_make_credential(req, dialog_mgr) + elif req.data[0] == _CBOR_GET_ASSERTION: + if __debug__: + log.debug(__name__, "_CBOR_GET_ASSERTION") + return cbor_get_assertion(req, dialog_mgr) + elif req.data[0] == _CBOR_GET_INFO: + if __debug__: + log.debug(__name__, "_CBOR_GET_INFO") + return cbor_get_info(req) + elif req.data[0] == _CBOR_CLIENT_PIN: + if __debug__: + log.debug(__name__, "_CBOR_CLIENT_PIN") + return cbor_client_pin(req) + elif req.data[0] == _CBOR_RESET: + if __debug__: + log.debug(__name__, "_CBOR_RESET") + return cbor_reset(req, dialog_mgr) + elif req.data[0] == _CBOR_GET_NEXT_ASSERTION: + if __debug__: + log.debug(__name__, "_CBOR_GET_NEXT_ASSERTION") + return cbor_error(req.cid, _ERR_NOT_ALLOWED) + else: + if __debug__: + log.warning(__name__, "_ERR_INVALID_CMD _CMD_CBOR %d", req.data[0]) + return cbor_error(req.cid, _ERR_INVALID_CMD) + + elif req.cmd == _CMD_CANCEL: + if __debug__: + log.debug(__name__, "_CMD_CANCEL") + dialog_mgr.result = _RESULT_CANCEL + dialog_mgr.reset() + return None else: if __debug__: log.warning(__name__, "_ERR_INVALID_CMD: %d", req.cmd) @@ -517,14 +1064,12 @@ def cmd_init(req: Cmd) -> Cmd: resp.versionMajor = 2 resp.versionMinor = 0 resp.versionBuild = 0 - resp.capFlags = _CAPFLAG_WINK + resp.capFlags = _CAPFLAG_WINK | _CAPFLAG_CBOR return Cmd(req.cid, req.cmd, buf) -def msg_register(req: Msg, state: ConfirmState) -> Cmd: - from apps.common import storage - +def msg_register(req: Msg, dialog_mgr: DialogManager) -> Cmd: if not storage.is_initialized(): if __debug__: log.warning(__name__, "not initialized") @@ -536,18 +1081,21 @@ def msg_register(req: Msg, state: ConfirmState) -> Cmd: log.warning(__name__, "_SW_WRONG_LENGTH req.data") return msg_error(req.cid, _SW_WRONG_LENGTH) - # parse challenge and app_id + # parse challenge and rp_id_hash chal = req.data[:32] - app_id = req.data[32:] + cred = U2fCredential() + cred.rp_id_hash = bytes(req.data[32:]) + cred.generate_key_handle() # check equality with last request - if not state.compare(_CONFIRM_REGISTER, req.data): - if not state.setup(_CONFIRM_REGISTER, req.data, app_id): + new_state = U2fConfirmRegister(req.cid, dialog_mgr.iface, req.data, cred) + if not dialog_mgr.compare(new_state): + if not dialog_mgr.set_state(new_state): return msg_error(req.cid, _SW_CONDITIONS_NOT_SATISFIED) - state.keepalive() + dialog_mgr.reset_timeout() # wait for a button or continue - if not state.confirmed: + if dialog_mgr.result != _RESULT_CONFIRM: if __debug__: log.info(__name__, "waiting for button") return msg_error(req.cid, _SW_CONDITIONS_NOT_SATISFIED) @@ -555,56 +1103,36 @@ def msg_register(req: Msg, state: ConfirmState) -> Cmd: # sign the registration challenge and return if __debug__: log.info(__name__, "signing register") - buf = msg_register_sign(chal, app_id) + buf = msg_register_sign(chal, cred) - state.reset() + dialog_mgr.reset() return Cmd(req.cid, _CMD_MSG, buf) -def msg_register_sign(challenge: bytes, app_id: bytes) -> bytes: - - from apps.common import seed - - # derivation path is m/U2F'/r'/r'/r'/r'/r'/r'/r'/r' - keypath = [HARDENED | random.uniform(0xF0000000) for _ in range(0, 8)] - nodepath = [_U2F_KEY_PATH] + keypath - - # prepare signing key from random path, compute decompressed public key - node = seed.derive_node_without_passphrase(nodepath, "nist256p1") - pubkey = nist256p1.publickey(node.private_key(), False) - - # first half of keyhandle is keypath - keybuf = ustruct.pack("<8L", *keypath) - - # second half of keyhandle is a hmac of app_id and keypath - keybase = hmac.Hmac(node.private_key(), app_id, hashlib.sha256) - keybase.update(keybuf) - keybase = keybase.digest() +def msg_register_sign(challenge: bytes, cred: U2fCredential) -> bytes: + pubkey = nist256p1.publickey(cred.private_key(), False) # hash the request data together with keyhandle and pubkey dig = hashlib.sha256() dig.update(b"\x00") # uint8_t reserved; - dig.update(app_id) # uint8_t appId[32]; + dig.update(cred.rp_id_hash) # uint8_t appId[32]; dig.update(challenge) # uint8_t chal[32]; - dig.update(keybuf) # uint8_t keyHandle[64]; - dig.update(keybase) + dig.update(cred.id) # uint8_t keyHandle[64]; dig.update(pubkey) # uint8_t pubKey[65]; - dig = dig.digest() # sign the digest and convert to der - sig = nist256p1.sign(_U2F_ATT_PRIV_KEY, dig, False) + sig = nist256p1.sign(_U2F_ATT_PRIV_KEY, dig.digest(), False) sig = der.encode_seq((sig[1:33], sig[33:])) # pack to a response buf, resp = make_struct( - resp_cmd_register(len(keybuf) + len(keybase), len(_U2F_ATT_CERT), len(sig)) + resp_cmd_register(len(cred.id), len(_U2F_ATT_CERT), len(sig)) ) resp.registerId = _U2F_REGISTER_ID utils.memcpy(resp.pubKey, 0, pubkey, 0, len(pubkey)) - resp.keyHandleLen = len(keybuf) + len(keybase) - utils.memcpy(resp.keyHandle, 0, keybuf, 0, len(keybuf)) - utils.memcpy(resp.keyHandle, len(keybuf), keybase, 0, len(keybase)) + resp.keyHandleLen = len(cred.id) + utils.memcpy(resp.keyHandle, 0, cred.id, 0, len(cred.id)) utils.memcpy(resp.cert, 0, _U2F_ATT_CERT, 0, len(_U2F_ATT_CERT)) utils.memcpy(resp.sig, 0, sig, 0, len(sig)) resp.status = _SW_NO_ERROR @@ -612,9 +1140,7 @@ def msg_register_sign(challenge: bytes, app_id: bytes) -> bytes: return buf -def msg_authenticate(req: Msg, state: ConfirmState) -> Cmd: - from apps.common import storage - +def msg_authenticate(req: Msg, dialog_mgr: DialogManager) -> Cmd: if not storage.is_initialized(): if __debug__: log.warning(__name__, "not initialized") @@ -628,21 +1154,10 @@ def msg_authenticate(req: Msg, state: ConfirmState) -> Cmd: # check keyHandleLen khlen = req.data[_REQ_CMD_AUTHENTICATE_KHLEN] - if khlen != 64: - if __debug__: - log.warning(__name__, "_SW_WRONG_LENGTH khlen") - return msg_error(req.cid, _SW_WRONG_LENGTH) - auth = overlay_struct(req.data, req_cmd_authenticate(khlen)) - # check the keyHandle and generate the signing key - node = msg_authenticate_genkey(auth.appId, auth.keyHandle, "<8L") - if node is None: - # prior to firmware version 2.0.8, keypath was serialized in a - # big-endian manner, instead of little endian, like in trezor-mcu. - # try to parse it as big-endian now and check the HMAC. - node = msg_authenticate_genkey(auth.appId, auth.keyHandle, ">8L") - if node is None: + cred = Credential.from_bytes(auth.keyHandle, bytes(auth.appId)) + if cred is None: # specific error logged in msg_authenticate_genkey return msg_error(req.cid, _SW_WRONG_DATA) @@ -659,13 +1174,14 @@ def msg_authenticate(req: Msg, state: ConfirmState) -> Cmd: return msg_error(req.cid, _SW_WRONG_DATA) # check equality with last request - if not state.compare(_CONFIRM_AUTHENTICATE, req.data): - if not state.setup(_CONFIRM_AUTHENTICATE, req.data, auth.appId): + new_state = U2fConfirmAuthenticate(req.cid, dialog_mgr.iface, req.data, cred) + if not dialog_mgr.compare(new_state): + if not dialog_mgr.set_state(new_state): return msg_error(req.cid, _SW_CONDITIONS_NOT_SATISFIED) - state.keepalive() + dialog_mgr.reset_timeout() # wait for a button or continue - if not state.confirmed: + if dialog_mgr.result != _RESULT_CONFIRM: if __debug__: log.info(__name__, "waiting for button") return msg_error(req.cid, _SW_CONDITIONS_NOT_SATISFIED) @@ -673,47 +1189,15 @@ def msg_authenticate(req: Msg, state: ConfirmState) -> Cmd: # sign the authentication challenge and return if __debug__: log.info(__name__, "signing authentication") - buf = msg_authenticate_sign(auth.chal, auth.appId, node.private_key()) + buf = msg_authenticate_sign(auth.chal, auth.appId, cred.private_key()) - state.reset() + dialog_mgr.reset() return Cmd(req.cid, _CMD_MSG, buf) -def msg_authenticate_genkey(app_id: bytes, keyhandle: bytes, pathformat: str): - from apps.common import seed - - # unpack the keypath from the first half of keyhandle - keybuf = keyhandle[:32] - keypath = ustruct.unpack(pathformat, keybuf) - - # check high bit for hardened keys - for i in keypath: - if not i & HARDENED: - if __debug__: - log.warning(__name__, "invalid key path") - return None - - # derive the signing key - nodepath = [_U2F_KEY_PATH] + list(keypath) - node = seed.derive_node_without_passphrase(nodepath, "nist256p1") - - # second half of keyhandle is a hmac of app_id and keypath - keybase = hmac.Hmac(node.private_key(), app_id, hashlib.sha256) - keybase.update(keybuf) - keybase = keybase.digest() - - # verify the hmac - if keybase != keyhandle[32:]: - if __debug__: - log.warning(__name__, "invalid key handle") - return None - - return node - - -def msg_authenticate_sign(challenge: bytes, app_id: bytes, privkey: bytes) -> bytes: - flags = bytes([_AUTH_FLAG_TUP]) +def msg_authenticate_sign(challenge: bytes, rp_id_hash: bytes, privkey: bytes) -> bytes: + flags = bytes([_AUTH_FLAG_UP]) # get next counter ctr = storage.device.next_u2f_counter() @@ -721,14 +1205,13 @@ def msg_authenticate_sign(challenge: bytes, app_id: bytes, privkey: bytes) -> by # hash input data together with counter dig = hashlib.sha256() - dig.update(app_id) # uint8_t appId[32]; + dig.update(rp_id_hash) # uint8_t appId[32]; dig.update(flags) # uint8_t flags; dig.update(ctrbuf) # uint8_t ctr[4]; dig.update(challenge) # uint8_t chal[32]; - dig = dig.digest() # sign the digest and convert to der - sig = nist256p1.sign(privkey, dig, False) + sig = nist256p1.sign(privkey, dig.digest(), False) sig = der.encode_seq((sig[1:33], sig[33:])) # pack to a response @@ -753,3 +1236,431 @@ def msg_error(cid: int, code: int) -> Cmd: def cmd_error(cid: int, code: int) -> Cmd: return Cmd(cid, _CMD_ERROR, ustruct.pack(">B", code)) + + +def cbor_error(cid: int, code: int) -> Cmd: + return Cmd(cid, _CMD_CBOR, ustruct.pack(">B", code)) + + +def cbor_make_credential(req: Cmd, dialog_mgr: DialogManager) -> Optional[Cmd]: + if not storage.is_initialized(): + if __debug__: + log.warning(__name__, "not initialized") + return cbor_error(req.cid, _ERR_OPERATION_DENIED) + + try: + param = cbor.decode(req.data[1:]) + rp_id = param[_MAKECRED_CMD_RP]["id"] + rp_id_hash = hashlib.sha256(rp_id).digest() + + # Prepare the new credential. + user = param[_MAKECRED_CMD_USER] + cred = Fido2Credential() + cred.rp_id = rp_id + cred.rp_id_hash = rp_id_hash + cred.rp_name = param[_MAKECRED_CMD_RP].get("name", None) + cred.user_id = user["id"] + cred.user_name = user.get("name", None) + cred.user_display_name = user.get("displayName", None) + + # Check if any of the credential descriptors in the exclude list belong to this authenticator. + exclude_list = param.get(_MAKECRED_CMD_EXCLUDE_LIST, []) + for credential_descriptor in exclude_list: + excl_cred = Credential.from_bytes(credential_descriptor["id"], rp_id_hash) + if credential_descriptor["type"] == "public-key" and excl_cred is not None: + # This authenticator is already registered. + if not dialog_mgr.set_state( + Fido2ConfirmExcluded(req.cid, dialog_mgr.iface, cred) + ): + return cmd_error(req.cid, _ERR_CHANNEL_BUSY) + return None + + # Check that the relying party supports ECDSA P-256 with SHA-256. We don't support any other algorithms. + pub_key_cred_params = param[_MAKECRED_CMD_PUB_KEY_CRED_PARAMS] + if ("public-key", _COSE_ALG_ES256) not in ( + (pkcp.get("type", None), pkcp.get("alg", None)) + for pkcp in pub_key_cred_params + ): + return cbor_error(req.cid, _ERR_UNSUPPORTED_ALGORITHM) + + # Get options. + options = param.get(_MAKECRED_CMD_OPTIONS, {}) + resident_key = options.get("rk", False) + user_verification = options.get("uv", False) + + # Get supported extensions. + cred.hmac_secret = param.get(_MAKECRED_CMD_EXTENSIONS, {}).get( + "hmac-secret", False + ) + + client_data_hash = param[_MAKECRED_CMD_CLIENT_DATA_HASH] + except Exception: + return cbor_error(req.cid, _ERR_INVALID_CBOR) + + # Check data types. + if ( + not cred.check_data_types() + or not isinstance(client_data_hash, (bytes, bytearray)) + or not isinstance(resident_key, bool) + or not isinstance(user_verification, bool) + ): + return cbor_error(req.cid, _ERR_INVALID_CBOR) + + # Check options. + if resident_key and not _ALLOW_RESIDENT_CREDENTIALS: + return cbor_error(req.cid, _ERR_UNSUPPORTED_OPTION) + + if user_verification and not config.has_pin(): + # User verification requested, but PIN is not enabled. + state_set = dialog_mgr.set_state(Fido2ConfirmNoPin(req.cid, dialog_mgr.iface)) + if state_set: + return cbor_error(req.cid, _ERR_UNSUPPORTED_OPTION) + else: + return cmd_error(req.cid, _ERR_CHANNEL_BUSY) + + # Check that the pinAuth parameter is absent. Client PIN is not supported. + if _MAKECRED_CMD_PIN_AUTH in param: + return cbor_error(req.cid, _ERR_PIN_AUTH_INVALID) + + # Ask user to confirm registration. + state_set = dialog_mgr.set_state( + Fido2ConfirmMakeCredential( + req.cid, + dialog_mgr.iface, + client_data_hash, + cred, + resident_key, + user_verification, + ) + ) + + if not state_set: + return cmd_error(req.cid, _ERR_CHANNEL_BUSY) + + return None + + +def cbor_make_credential_sign( + client_data_hash: bytes, cred: Fido2Credential, user_verification: bool +) -> bytes: + privkey = cred.private_key() + pubkey = nist256p1.publickey(privkey, False) + + flags = _AUTH_FLAG_UP | _AUTH_FLAG_AT + if user_verification: + flags |= _AUTH_FLAG_UV + + # Encode the authenticator data (Credential ID, its public key and extensions). + credential_pub_key = cbor.encode( + { + _COSE_ALG_KEY: _COSE_ALG_ES256, + _COSE_KEY_TYPE_KEY: _COSE_KEY_TYPE_EC2, + _COSE_CURVE_KEY: _COSE_CURVE_P256, + _COSE_X_COORD_KEY: pubkey[1:33], + _COSE_Y_COORD_KEY: pubkey[33:], + } + ) + + att_cred_data = ( + _AAGUID + len(cred.id).to_bytes(2, "big") + cred.id + credential_pub_key + ) + + extensions = b"" + if cred.hmac_secret: + extensions = cbor.encode({"hmac-secret": True}) + flags |= _AUTH_FLAG_ED + + authenticator_data = ( + cred.rp_id_hash + + bytes([flags]) + + b"\x00\x00\x00\x00" + + att_cred_data + + extensions + ) + + # Compute self-attestation signature of the authenticator data. + dig = hashlib.sha256() + dig.update(authenticator_data) + dig.update(client_data_hash) + sig = nist256p1.sign(privkey, dig.digest(), False) + sig = der.encode_seq((sig[1:33], sig[33:])) + + # Encode the authenticatorMakeCredential response data. + return cbor.encode( + { + _MAKECRED_RESP_FMT: "packed", + _MAKECRED_RESP_AUTH_DATA: authenticator_data, + _MAKECRED_RESP_ATT_STMT: {"alg": _COSE_ALG_ES256, "sig": sig}, + } + ) + + +def cbor_get_assertion(req: Cmd, dialog_mgr: DialogManager) -> Optional[Cmd]: + if not storage.is_initialized(): + if __debug__: + log.warning(__name__, "not initialized") + return cbor_error(req.cid, _ERR_OPERATION_DENIED) + + try: + param = cbor.decode(req.data[1:]) + rp_id = param[_GETASSERT_CMD_RP_ID] + rp_id_hash = hashlib.sha256(rp_id).digest() + + cred_list = [] + allow_list = param.get(_GETASSERT_CMD_ALLOW_LIST, []) + if allow_list: + # Get all credentials from the allow list that belong to this authenticator. + for credential_descriptor in allow_list: + if credential_descriptor["type"] != "public-key": + continue + cred = Credential.from_bytes(credential_descriptor["id"], rp_id_hash) + if cred is not None: + if cred.rp_id is None: + cred.rp_id = rp_id + cred_list.append(cred) + else: + # Allow list is empty. Get resident credentials. + if _ALLOW_RESIDENT_CREDENTIALS: + cred_list = get_resident_credentials(rp_id_hash) + + # Sort credentials by time of creation. + cred_list.sort() + + # Check that the pinAuth parameter is absent. Client PIN is not supported. + if _GETASSERT_CMD_PIN_AUTH in param: + return cbor_error(req.cid, _ERR_PIN_AUTH_INVALID) + + # Get options. + options = param.get(_GETASSERT_CMD_OPTIONS, {}) + user_presence = options.get("up", True) + user_verification = options.get("uv", False) + + # Get supported extensions. + hmac_secret = param.get(_GETASSERT_CMD_EXTENSIONS, {}).get("hmac-secret", None) + + client_data_hash = param[_GETASSERT_CMD_CLIENT_DATA_HASH] + except Exception: + return cbor_error(req.cid, _ERR_INVALID_CBOR) + + # Check data types. + if ( + not isinstance(hmac_secret, (dict, type(None))) + or not isinstance(client_data_hash, (bytes, bytearray)) + or not isinstance(user_presence, bool) + or not isinstance(user_verification, bool) + ): + return cbor_error(req.cid, _ERR_INVALID_CBOR) + + # Check options. + if "rk" in options: + return cbor_error(req.cid, _ERR_INVALID_OPTION) + + if user_verification and not config.has_pin(): + # User verification requested, but PIN is not enabled. + state_set = dialog_mgr.set_state(Fido2ConfirmNoPin(req.cid, dialog_mgr.iface)) + if state_set: + return cbor_error(req.cid, _ERR_UNSUPPORTED_OPTION) + else: + return cmd_error(req.cid, _ERR_CHANNEL_BUSY) + + if not cred_list: + # No credentials. This authenticator is not registered. + if user_presence: + state_set = dialog_mgr.set_state( + Fido2ConfirmNoCredentials(req.cid, dialog_mgr.iface, rp_id) + ) + else: + return cbor_error(req.cid, _ERR_NO_CREDENTIALS) + elif not user_presence and not user_verification: + # Silent authentication. + try: + response_data = cbor_get_assertion_sign( + client_data_hash, + rp_id_hash, + cred_list[0], + hmac_secret, + user_presence, + user_verification, + ) + return Cmd(req.cid, _CMD_CBOR, bytes([_ERR_NONE]) + response_data) + except Exception: + return cbor_error(req.cid, _ERR_OPERATION_DENIED) + else: + # Ask user to confirm one of the credentials. + state_set = dialog_mgr.set_state( + Fido2ConfirmGetAssertion( + req.cid, + dialog_mgr.iface, + client_data_hash, + cred_list, + hmac_secret, + user_verification, + ) + ) + + if not state_set: + return cmd_error(req.cid, _ERR_CHANNEL_BUSY) + + return None + + +def cbor_get_assertion_hmac_secret( + cred: Credential, hmac_secret: dict +) -> Optional[bytes]: + key_agreement = hmac_secret[1] # The public key of platform key agreement key. + salt_enc = hmac_secret[2] # The encrypted salt. + salt_auth = hmac_secret[3] # The HMAC of the encrypted salt. + + x = key_agreement[_COSE_X_COORD_KEY] + y = key_agreement[_COSE_Y_COORD_KEY] + if ( + key_agreement[_COSE_ALG_KEY] != _COSE_ALG_ES256 + or key_agreement[_COSE_KEY_TYPE_KEY] != _COSE_KEY_TYPE_EC2 + or key_agreement[_COSE_CURVE_KEY] != _COSE_CURVE_P256 + or len(x) != 32 + or len(y) != 32 + or len(salt_auth) != 16 + or len(salt_enc) not in (32, 64) + ): + return None + + # Compute the ECDH shared secret. + ecdh_result = nist256p1.multiply(_KEY_AGREEMENT_PRIVKEY, b"\04" + x + y) + shared_secret = hashlib.sha256(ecdh_result[1:33]).digest() + + # Check the authentication tag and decrypt the salt. + tag = hmac.Hmac(shared_secret, salt_enc, hashlib.sha256).digest()[:16] + if not utils.consteq(tag, salt_auth): + return None + salt = aes(aes.CBC, shared_secret).decrypt(salt_enc) + + # Get cred_random - a constant symmetric key associated with the credential. + cred_random = cred.hmac_secret_key() + if cred_random is None: + # The credential does not have the hmac-secret extension enabled. + return None + + # Compute the hmac-secret output. + output = hmac.Hmac(cred_random, salt[:32], hashlib.sha256).digest() + if len(salt) == 64: + output += hmac.Hmac(cred_random, salt[32:], hashlib.sha256).digest() + + # Encrypt the hmac-secret output. + return aes(aes.CBC, shared_secret).encrypt(output) + + +def cbor_get_assertion_sign( + client_data_hash: bytes, + rp_id_hash: bytes, + cred: Credential, + hmac_secret: Optional[dict], + user_presence: bool, + user_verification: bool, +) -> bytes: + # Process extensions + extensions = {} + + # Spec deviation: Do not reveal hmac-secret during silent authentication. + if hmac_secret and user_presence: + encrypted_output = cbor_get_assertion_hmac_secret(cred, hmac_secret) + if encrypted_output is not None: + extensions["hmac-secret"] = encrypted_output + + # Encode the authenticator data. + flags = 0 + if user_presence: + flags |= _AUTH_FLAG_UP + if user_verification: + flags |= _AUTH_FLAG_UV + + encoded_extensions = b"" + if extensions: + flags |= _AUTH_FLAG_ED + encoded_extensions = cbor.encode(extensions) + + ctr = storage.device.next_u2f_counter() or 0 + authenticator_data = ( + rp_id_hash + bytes([flags]) + ctr.to_bytes(4, "big") + encoded_extensions + ) + + # Sign the authenticator data and the client data hash. + dig = hashlib.sha256() + dig.update(authenticator_data) + dig.update(client_data_hash) + if user_presence: + privkey = cred.private_key() + else: + # Spec deviation: Use a bogus key during silent authentication. + privkey = _BOGUS_PRIV_KEY + sig = nist256p1.sign(privkey, dig.digest(), False) + sig = der.encode_seq((sig[1:33], sig[33:])) + + # Encode the authenticatorGetAssertion response data. + response = { + _GETASSERT_RESP_CREDENTIAL: {"type": "public-key", "id": cred.id}, + _GETASSERT_RESP_AUTH_DATA: authenticator_data, + _GETASSERT_RESP_SIGNATURE: sig, + } + + if user_presence and cred.user_id is not None: + response[_GETASSERT_RESP_USER] = {"id": cred.user_id} + + return cbor.encode(response) + + +def cbor_get_info(req: Cmd) -> Cmd: + response_data = { + _GETINFO_RESP_VERSIONS: ["FIDO_2_0"], + _GETINFO_RESP_EXTENSIONS: ["hmac-secret"], + _GETINFO_RESP_AAGUID: _AAGUID, + _GETINFO_RESP_OPTIONS: { + "rk": _ALLOW_RESIDENT_CREDENTIALS, + "up": True, + "uv": config.has_pin(), + }, + } + return Cmd(req.cid, _CMD_CBOR, bytes([_ERR_NONE]) + cbor.encode(response_data)) + + +def cbor_client_pin(req: Cmd) -> Cmd: + try: + param = cbor.decode(req.data[1:]) + pin_protocol = param[_CLIENTPIN_CMD_PIN_PROTOCOL] + subcommand = param[_CLIENTPIN_CMD_SUBCOMMAND] + except Exception: + return cbor_error(req.cid, _ERR_INVALID_CBOR) + + if pin_protocol != 1: + return cbor_error(req.cid, _ERR_PIN_AUTH_INVALID) + + # We only support the get key agreement command which is required for the hmac-secret extension. + if subcommand != _CLIENTPIN_SUBCMD_GET_KEY_AGREEMENT: + return cbor_error(req.cid, _ERR_UNSUPPORTED_OPTION) + + # Encode the public key of the authenticator key agreement key. + response_data = { + _CLIENTPIN_RESP_KEY_AGREEMENT: { + _COSE_ALG_KEY: _COSE_ALG_ES256, + _COSE_KEY_TYPE_KEY: _COSE_KEY_TYPE_EC2, + _COSE_CURVE_KEY: _COSE_CURVE_P256, + _COSE_X_COORD_KEY: _KEY_AGREEMENT_PUBKEY[1:33], + _COSE_Y_COORD_KEY: _KEY_AGREEMENT_PUBKEY[33:], + } + } + + return Cmd(req.cid, _CMD_CBOR, bytes([_ERR_NONE]) + cbor.encode(response_data)) + + +def cbor_reset(req: Cmd, dialog_mgr: DialogManager) -> Optional[Cmd]: + if not storage.is_initialized(): + if __debug__: + log.warning(__name__, "not initialized") + return cbor_error(req.cid, _ERR_OPERATION_DENIED) + + if not dialog_mgr.set_state(Fido2ConfirmReset(req.cid, dialog_mgr.iface)): + return cmd_error(req.cid, _ERR_CHANNEL_BUSY) + return None + + +def cmd_keepalive(cid: int, status: int) -> Cmd: + return Cmd(cid, _CMD_KEEPALIVE, bytes([status])) diff --git a/core/src/apps/webauthn/credential.py b/core/src/apps/webauthn/credential.py index 5e74c6c0c..6dbea2a5e 100644 --- a/core/src/apps/webauthn/credential.py +++ b/core/src/apps/webauthn/credential.py @@ -13,6 +13,7 @@ if False: # Credential ID values _CRED_ID_VERSION = b"\xf1\xd0\x02\x00" _CRED_ID_MIN_LENGTH = const(33) +_KEY_HANDLE_LENGTH = const(64) # Credential ID keys _CRED_ID_RP_ID = const(0x01) @@ -23,6 +24,9 @@ _CRED_ID_USER_DISPLAY_NAME = const(0x05) _CRED_ID_CREATION_TIME = const(0x06) _CRED_ID_HMAC_SECRET = const(0x07) +# Key paths +_U2F_KEY_PATH = const(0x80553246) + class Credential: def __init__(self) -> None: @@ -194,3 +198,102 @@ class Fido2Credential(Credential): ) return node.key() + + +class U2fCredential(Credential): + def __init__(self) -> None: + super().__init__() + self.node = None # type: Optional[bip32.HDNode] + + def __lt__(self, other: "Credential") -> bool: + # Sort U2F credentials after FIDO2 credentials. + if isinstance(other, Fido2Credential): + return False + + # Sort U2F credentials lexicographically amongst each other. + return self.id < other.id + + def private_key(self) -> bytes: + if self.node is None: + return b"" + return self.node.private_key() + + def generate_key_handle(self) -> None: + # derivation path is m/U2F'/r'/r'/r'/r'/r'/r'/r'/r' + path = [HARDENED | random.uniform(0xF0000000) for _ in range(0, 8)] + nodepath = [_U2F_KEY_PATH] + path + + # prepare signing key from random path, compute decompressed public key + self.node = seed.derive_node_without_passphrase(nodepath, "nist256p1") + + # first half of keyhandle is keypath + keypath = ustruct.pack("<8L", *path) + + # second half of keyhandle is a hmac of rp_id_hash and keypath + mac = hmac.Hmac(self.node.private_key(), self.rp_id_hash, hashlib.sha256) + mac.update(keypath) + + self.id = keypath + mac.digest() + + def app_name(self) -> str: + from apps.webauthn import knownapps + + app_name = knownapps.knownapps.get(self.rp_id_hash, None) + if app_name is None: + app_name = "%s...%s" % ( + hexlify(self.rp_id_hash[:4]).decode(), + hexlify(self.rp_id_hash[-4:]).decode(), + ) + return app_name + + @staticmethod + def from_key_handle( + key_handle: bytes, rp_id_hash: bytes + ) -> Optional["U2fCredential"]: + # check the keyHandle and generate the signing key + node = U2fCredential._node_from_key_handle(rp_id_hash, key_handle, "<8L") + if node is None: + # prior to firmware version 2.0.8, keypath was serialized in a + # big-endian manner, instead of little endian, like in trezor-mcu. + # try to parse it as big-endian now and check the HMAC. + node = U2fCredential._node_from_key_handle(rp_id_hash, key_handle, ">8L") + if node is None: + # specific error logged in msg_authenticate_genkey + return None + + cred = U2fCredential() + cred.id = key_handle + cred.rp_id_hash = rp_id_hash + cred.node = node + return cred + + @staticmethod + def _node_from_key_handle( + rp_id_hash: bytes, keyhandle: bytes, pathformat: str + ) -> Optional[bip32.HDNode]: + # unpack the keypath from the first half of keyhandle + keypath = keyhandle[:32] + path = ustruct.unpack(pathformat, keypath) + + # check high bit for hardened keys + for i in path: + if not i & HARDENED: + if __debug__: + log.warning(__name__, "invalid key path") + return None + + # derive the signing key + nodepath = [_U2F_KEY_PATH] + list(path) + node = seed.derive_node_without_passphrase(nodepath, "nist256p1") + + # second half of keyhandle is a hmac of rp_id_hash and keypath + mac = hmac.Hmac(node.private_key(), rp_id_hash, hashlib.sha256) + mac.update(keypath) + + # verify the hmac + if not utils.consteq(mac.digest(), keyhandle[32:]): + if __debug__: + log.warning(__name__, "invalid key handle") + return None + + return node diff --git a/core/src/trezor/pin.py b/core/src/trezor/pin.py index 10d78cf65..459640708 100644 --- a/core/src/trezor/pin.py +++ b/core/src/trezor/pin.py @@ -1,5 +1,8 @@ from trezor import ui +if False: + from typing import Any + def pin_to_int(pin: str) -> int: return int("1" + pin) @@ -7,10 +10,15 @@ def pin_to_int(pin: str) -> int: _previous_progress = None +keepalive_callback = None # type: Any + def show_pin_timeout(seconds: int, progress: int, message: str) -> bool: global _previous_progress + if callable(keepalive_callback): + keepalive_callback() + if progress == 0: if progress != _previous_progress: # avoid overdraw in case of repeated progress calls