import uctypes import ustruct import utime from micropython import const from typing import TYPE_CHECKING import storage.device as storage_device from trezor import config, io, log, loop, utils, wire, workflow from trezor.crypto import hashlib from trezor.crypto.curve import nist256p1 from trezor.ui.layouts import show_popup from apps.base import set_homescreen from apps.common import cbor from . import common from .credential import Credential, Fido2Credential if TYPE_CHECKING: from typing import Any, Awaitable, Callable, Coroutine, Iterable, Iterator from .credential import U2fCredential HID = io.HID _CID_BROADCAST = const(0xFFFF_FFFF) # broadcast channel id # types of frame _TYPE_MASK = const(0x80) # frame type mask _TYPE_INIT = const(0x80) # initial frame identifier _TYPE_CONT = const(0x00) # continuation frame identifier # U2F HID sizes _HID_RPT_SIZE = const(64) _FRAME_INIT_SIZE = const(57) _FRAME_CONT_SIZE = const(59) _MAX_U2FHID_MSG_PAYLOAD_LEN = const(_FRAME_INIT_SIZE + 128 * _FRAME_CONT_SIZE) _CMD_INIT_NONCE_SIZE = const(8) # types of cmd _CMD_PING = const(0x81) # echo data through local processor only _CMD_MSG = const(0x83) # send U2F message frame _CMD_INIT = const(0x86) # channel initialization _CMD_WINK = const(0x88) # send device identification wink _CMD_CBOR = const(0x90) # send encapsulated CTAP CBOR encoded message _CMD_CANCEL = const(0x91) # cancel any outstanding requests on this CID _CMD_KEEPALIVE = const(0xBB) # processing a message _CMD_ERROR = const(0xBF) # error response # types for the msg cmd _MSG_REGISTER = const(0x01) # registration command _MSG_AUTHENTICATE = const(0x02) # authenticate/sign command _MSG_VERSION = const(0x03) # read version string command # types for the cbor cmd _CBOR_MAKE_CREDENTIAL = const(0x01) # generate new credential command _CBOR_GET_ASSERTION = const(0x02) # authenticate command _CBOR_GET_INFO = const(0x04) # report AAGUID and device capabilities _CBOR_CLIENT_PIN = const(0x06) # PIN and pinToken management _CBOR_RESET = const(0x07) # factory reset, invalidating all generated credentials _CBOR_GET_NEXT_ASSERTION = const(0x08) # obtain the next per-credential signature # CBOR MakeCredential command parameter keys _MAKECRED_CMD_CLIENT_DATA_HASH = const(0x01) # bytes, required _MAKECRED_CMD_RP = const(0x02) # map, required _MAKECRED_CMD_USER = const(0x03) # map, required _MAKECRED_CMD_PUB_KEY_CRED_PARAMS = const(0x04) # array of maps, required _MAKECRED_CMD_EXCLUDE_LIST = const(0x05) # array of maps, optional _MAKECRED_CMD_EXTENSIONS = const(0x06) # map, optional _MAKECRED_CMD_OPTIONS = const(0x07) # map, optional _MAKECRED_CMD_PIN_AUTH = const(0x08) # bytes, optional # CBOR MakeCredential response member keys _MAKECRED_RESP_FMT = const(0x01) # str, required _MAKECRED_RESP_AUTH_DATA = const(0x02) # bytes, required _MAKECRED_RESP_ATT_STMT = const(0x03) # map, required # CBOR GetAssertion command parameter keys _GETASSERT_CMD_RP_ID = const(0x01) # str, required _GETASSERT_CMD_CLIENT_DATA_HASH = const(0x02) # bytes, required _GETASSERT_CMD_ALLOW_LIST = const(0x03) # array of maps, optional _GETASSERT_CMD_EXTENSIONS = const(0x04) # map, optional _GETASSERT_CMD_OPTIONS = const(0x05) # map, optional _GETASSERT_CMD_PIN_AUTH = const(0x06) # bytes, optional # CBOR GetAssertion response member keys _GETASSERT_RESP_CREDENTIAL = const(0x01) # map, optional _GETASSERT_RESP_AUTH_DATA = const(0x02) # bytes, required _GETASSERT_RESP_SIGNATURE = const(0x03) # bytes, required _GETASSERT_RESP_USER = const(0x04) # map, optional # CBOR GetInfo response member keys _GETINFO_RESP_VERSIONS = const(0x01) # array of str, required _GETINFO_RESP_EXTENSIONS = const(0x02) # array of str, optional _GETINFO_RESP_AAGUID = const(0x03) # bytes(16), required _GETINFO_RESP_OPTIONS = const(0x04) # map, optional _GETINFO_RESP_PIN_PROTOCOLS = const(0x06) # list of unsigned integers, optional _GETINFO_RESP_MAX_CRED_COUNT_IN_LIST = const(0x07) # int, optional _GETINFO_RESP_MAX_CRED_ID_LEN = const(0x08) # int, optional # CBOR ClientPin command parameter keys _CLIENTPIN_CMD_PIN_PROTOCOL = const(0x01) # unsigned int, required _CLIENTPIN_CMD_SUBCOMMAND = const(0x02) # unsigned int, required _CLIENTPIN_SUBCMD_GET_KEY_AGREEMENT = const(0x02) # CBOR ClientPin response member keys _CLIENTPIN_RESP_KEY_AGREEMENT = const(0x01) # COSE_Key, optional # status codes for the keepalive cmd _KEEPALIVE_STATUS_NONE = const(0x00) _KEEPALIVE_STATUS_PROCESSING = const(0x01) # still processing the current request _KEEPALIVE_STATUS_UP_NEEDED = const(0x02) # waiting for user presence # time intervals and timeouts _KEEPALIVE_INTERVAL_MS = const(80) # interval between keepalive commands _CTAP_HID_TIMEOUT_MS = const( 500 ) # maximum interval between CTAP HID continuation frames _U2F_CONFIRM_TIMEOUT_MS = const( 3 * 1000 ) # maximum U2F pollling interval, Chrome uses 200 ms _FIDO2_CONFIRM_TIMEOUT_MS = const(60 * 1000) _POPUP_TIMEOUT_MS = const(4 * 1000) _UV_CACHE_TIME_MS = const(3 * 60 * 1000) # user verification cache time # hid error codes _ERR_NONE = const(0x00) # no error _ERR_INVALID_CMD = const(0x01) # invalid command _ERR_INVALID_LEN = const(0x03) # invalid message length _ERR_INVALID_SEQ = const(0x04) # invalid message sequencing _ERR_MSG_TIMEOUT = const(0x05) # message has timed out _ERR_CHANNEL_BUSY = const(0x06) # channel busy _ERR_INVALID_CID = const(0x0B) # command not allowed on this cid _ERR_CBOR_UNEXPECTED_TYPE = const(0x11) # invalid/unexpected CBOR _ERR_INVALID_CBOR = const(0x12) # error when parsing CBOR _ERR_MISSING_PARAMETER = const(0x14) # missing non-optional parameter _ERR_CREDENTIAL_EXCLUDED = const(0x19) # valid credential found in the exclude list _ERR_UNSUPPORTED_ALGORITHM = const(0x26) # requested COSE algorithm not supported _ERR_OPERATION_DENIED = const(0x27) # user declined or timed out _ERR_KEY_STORE_FULL = const(0x28) # internal key storage is full _ERR_UNSUPPORTED_OPTION = const(0x2B) # unsupported option _ERR_INVALID_OPTION = const(0x2C) # not a valid option for current operation _ERR_KEEPALIVE_CANCEL = const(0x2D) # pending keep alive was cancelled _ERR_NO_CREDENTIALS = const(0x2E) # no valid credentials provided _ERR_NOT_ALLOWED = const(0x30) # continuation command not allowed _ERR_PIN_AUTH_INVALID = const(0x33) # pinAuth verification failed _ERR_OTHER = const(0x7F) # other unspecified error _ERR_EXTENSION_FIRST = const(0xE0) # extension specific error # command status responses _SW_NO_ERROR = const(0x9000) _SW_WRONG_LENGTH = const(0x6700) _SW_CONDITIONS_NOT_SATISFIED = const(0x6985) _SW_WRONG_DATA = const(0x6A80) _SW_INS_NOT_SUPPORTED = const(0x6D00) _SW_CLA_NOT_SUPPORTED = const(0x6E00) # init response _CAPFLAG_WINK = const(0x01) # device supports _CMD_WINK _CAPFLAG_CBOR = const(0x04) # device supports _CMD_CBOR _U2FHID_IF_VERSION = const(2) # interface version # register response _U2F_REGISTER_ID = const(0x05) # version 2 registration identifier _FIDO_ATT_PRIV_KEY = b"q&\xac+\xf6D\xdca\x86\xad\x83\xef\x1f\xcd\xf1*W\xb5\xcf\xa2\x00\x0b\x8a\xd0'\xe9V\xe8T\xc5\n\x8b" _FIDO_ATT_CERT = b"0\x82\x01\xcd0\x82\x01s\xa0\x03\x02\x01\x02\x02\x04\x03E`\xc40\n\x06\x08*\x86H\xce=\x04\x03\x020.1,0*\x06\x03U\x04\x03\x0c#Trezor FIDO Root CA Serial 841513560 \x17\r200406100417Z\x18\x0f20500406100417Z0x1\x0b0\t\x06\x03U\x04\x06\x13\x02CZ1\x1c0\x1a\x06\x03U\x04\n\x0c\x13SatoshiLabs, s.r.o.1\"0 \x06\x03U\x04\x0b\x0c\x19Authenticator Attestation1'0%\x06\x03U\x04\x03\x0c\x1eTrezor FIDO EE Serial 548784040Y0\x13\x06\x07*\x86H\xce=\x02\x01\x06\x08*\x86H\xce=\x03\x01\x07\x03B\x00\x04\xd9\x18\xbd\xfa\x8aT\xac\x92\xe9\r\xa9\x1f\xcaz\xa2dT\xc0\xd1s61M\xde\x83\xa5K\x86\xb5\xdfN\xf0Re\x9a\x1do\xfc\xb7F\x7f\x1a\xcd\xdb\x8a3\x08\x0b^\xed\x91\x89\x13\xf4C\xa5&\x1b\xc7{h`o\xc1\xa33010!\x06\x0b+\x06\x01\x04\x01\x82\xe5\x1c\x01\x01\x04\x04\x12\x04\x10\xd6\xd0\xbd\xc3b\xee\xc4\xdb\xde\x8dzenJD\x870\x0c\x06\x03U\x1d\x13\x01\x01\xff\x04\x020\x000\n\x06\x08*\x86H\xce=\x04\x03\x02\x03H\x000E\x02 \x0b\xce\xc4R\xc3\n\x11'\xe5\xd5\xf5\xfc\xf5\xd6Wy\x11+\xe50\xad\x9d-TXJ\xbeE\x86\xda\x93\xc6\x02!\x00\xaf\xca=\xcf\xd8A\xb0\xadz\x9e$}\x0ff\xf4L,\x83\xf9T\xab\x95O\x896\xc15\x08\x7fX\xf1\x95" _BOGUS_APPID_CHROME = b"A" * 32 _BOGUS_APPID_FIREFOX = b"\0" * 32 _BOGUS_APPIDS = (_BOGUS_APPID_CHROME, _BOGUS_APPID_FIREFOX) _AAGUID = b"\xd6\xd0\xbd\xc3b\xee\xc4\xdb\xde\x8dzenJD\x87" # First 16 bytes of SHA-256("TREZOR 2") # authentication control byte _AUTH_ENFORCE = const(0x03) # enforce user presence and sign _AUTH_CHECK_ONLY = const(0x07) # check only _AUTH_FLAG_UP = const(1 << 0) # user present _AUTH_FLAG_UV = const(1 << 2) # user verified _AUTH_FLAG_AT = const(1 << 6) # attested credential data included _AUTH_FLAG_ED = const(1 << 7) # extension data included # common raw message format (ISO7816-4:2005 mapping) _APDU_CLA = const(0) # uint8_t cla; // Class - reserved _APDU_INS = const(1) # uint8_t ins; // U2F instruction _APDU_P1 = const(2) # uint8_t p1; // U2F parameter 1 _APDU_P2 = const(3) # uint8_t p2; // U2F parameter 2 _APDU_LC1 = const(4) # uint8_t lc1; // Length field, set to zero _APDU_LC2 = const(5) # uint8_t lc2; // Length field, MSB _APDU_LC3 = const(6) # uint8_t lc3; // Length field, LSB _APDU_DATA = const(7) # uint8_t data[1]; // Data field # Dialog results _RESULT_NONE = const(0) _RESULT_CONFIRM = const(1) # User confirmed. _RESULT_DECLINE = const(2) # User declined. _RESULT_CANCEL = const(3) # Request was cancelled by _CMD_CANCEL. _RESULT_TIMEOUT = const(4) # Request exceeded _FIDO2_CONFIRM_TIMEOUT_MS. # FIDO2 configuration. _ALLOW_FIDO2 = const(1) _ALLOW_RESIDENT_CREDENTIALS = const(1) _ALLOW_WINK = const(0) # The default attestation type to use in MakeCredential responses. If false, then basic attestation will be used by default. _DEFAULT_USE_SELF_ATTESTATION = const(1) # The default value of the use_sign_count flag for newly created credentials. _DEFAULT_USE_SIGN_COUNT = const(1) # The maximum number of credential IDs that can be supplied in the GetAssertion allow list. _MAX_CRED_COUNT_IN_LIST = const(10) # The CID of the last WINK command. Used to ensure that we do only one WINK at a time on any given CID. _last_wink_cid = 0 # Indicates whether the last U2F_AUTHENTICATE had a valid key handle. _last_auth_valid = False class CborError(Exception): def __init__(self, code: int): super().__init__() self.code = code def frame_init() -> uctypes.StructDict: # uint32_t cid; // Channel identifier # uint8_t cmd; // Command - b7 set # uint8_t bcnth; // Message byte count - high part # uint8_t bcntl; // Message byte count - low part # uint8_t data[HID_RPT_SIZE - 7]; // Data payload return { "cid": 0 | uctypes.UINT32, "cmd": 4 | uctypes.UINT8, "bcnt": 5 | uctypes.UINT16, "data": (7 | uctypes.ARRAY, (_HID_RPT_SIZE - 7) | uctypes.UINT8), } def frame_cont() -> uctypes.StructDict: # uint32_t cid; // Channel identifier # uint8_t seq; // Sequence number - b7 cleared # uint8_t data[HID_RPT_SIZE - 5]; // Data payload return { "cid": 0 | uctypes.UINT32, "seq": 4 | uctypes.UINT8, "data": (5 | uctypes.ARRAY, (_HID_RPT_SIZE - 5) | uctypes.UINT8), } def _resp_cmd_init() -> uctypes.StructDict: UINT8 = uctypes.UINT8 # local_cache_attribute # uint8_t nonce[8]; // Client application nonce # uint32_t cid; // Channel identifier # uint8_t versionInterface; // Interface version # uint8_t versionMajor; // Major version number # uint8_t versionMinor; // Minor version number # uint8_t versionBuild; // Build version number # uint8_t capFlags; // Capabilities flags return { "nonce": (0 | uctypes.ARRAY, 8 | UINT8), "cid": 8 | uctypes.UINT32, "versionInterface": 12 | UINT8, "versionMajor": 13 | UINT8, "versionMinor": 14 | UINT8, "versionBuild": 15 | UINT8, "capFlags": 16 | UINT8, } def _resp_cmd_register(khlen: int, certlen: int, siglen: int) -> dict: cert_ofs = 67 + khlen sig_ofs = cert_ofs + certlen status_ofs = sig_ofs + siglen UINT8 = uctypes.UINT8 # local_cache_attribute ARRAY = uctypes.ARRAY # local_cache_attribute # uint8_t registerId; // Registration identifier (U2F_REGISTER_ID) # uint8_t pubKey[65]; // Generated public key # uint8_t keyHandleLen; // Length of key handle # uint8_t keyHandle[khlen]; // Key handle # uint8_t cert[certlen]; // Attestation certificate # uint8_t sig[siglen]; // Registration signature # uint16_t status; return { "registerId": 0 | UINT8, "pubKey": (1 | ARRAY, 65 | UINT8), "keyHandleLen": 66 | UINT8, "keyHandle": (67 | ARRAY, khlen | UINT8), "cert": (cert_ofs | ARRAY, certlen | UINT8), "sig": (sig_ofs | ARRAY, siglen | UINT8), "status": status_ofs | uctypes.UINT16, } # index of keyHandleLen in req_cmd_authenticate struct _REQ_CMD_AUTHENTICATE_KHLEN = const(64) def _req_cmd_authenticate(khlen: int) -> uctypes.StructDict: UINT8 = uctypes.UINT8 # local_cache_attribute ARRAY = uctypes.ARRAY # local_cache_attribute return { "chal": (0 | ARRAY, 32 | UINT8), "appId": (32 | ARRAY, 32 | UINT8), "keyHandleLen": 64 | UINT8, "keyHandle": (65 | ARRAY, khlen | UINT8), } def _resp_cmd_authenticate(siglen: int) -> uctypes.StructDict: status_ofs = 5 + siglen # uint8_t flags; // U2F_AUTH_FLAG_ values # uint32_t ctr; // Counter field (big-endian) # uint8_t sig[siglen]; // Signature # uint16_t status; return { "flags": 0 | uctypes.UINT8, "ctr": 1 | uctypes.UINT32, "sig": (5 | uctypes.ARRAY, siglen | uctypes.UINT8), "status": status_ofs | uctypes.UINT16, } def overlay_struct(buf: bytearray, desc: uctypes.StructDict) -> Any: desc_size = uctypes.sizeof(desc, uctypes.BIG_ENDIAN) if desc_size > len(buf): raise ValueError(f"desc is too big ({desc_size} > {len(buf)})") return uctypes.struct(uctypes.addressof(buf), desc, uctypes.BIG_ENDIAN) def make_struct(desc: uctypes.StructDict) -> tuple[bytearray, Any]: desc_size = uctypes.sizeof(desc, uctypes.BIG_ENDIAN) buf = bytearray(desc_size) return buf, uctypes.struct(uctypes.addressof(buf), desc, uctypes.BIG_ENDIAN) class Msg: def __init__( self, cid: int, cla: int, ins: int, p1: int, p2: int, lc: int, data: bytes ) -> None: self.cid = cid self.cla = cla self.ins = ins self.p1 = p1 self.p2 = p2 self.lc = lc self.data = data class Cmd: def __init__(self, cid: int, cmd: int, data: bytes) -> None: self.cid = cid self.cmd = cmd self.data = data def to_msg(self) -> Msg: self_data = self.data # local_cache_attribute cla = self_data[_APDU_CLA] ins = self_data[_APDU_INS] p1 = self_data[_APDU_P1] p2 = self_data[_APDU_P2] lc = ( (self_data[_APDU_LC1] << 16) + (self_data[_APDU_LC2] << 8) + (self_data[_APDU_LC3]) ) data = self_data[_APDU_DATA : _APDU_DATA + lc] return Msg(self.cid, cla, ins, p1, p2, lc, data) async def _read_cmd(iface: HID) -> Cmd | None: desc_init = frame_init() desc_cont = frame_cont() read = loop.wait(iface.iface_num() | io.POLL_READ) buf = await read while True: ifrm = overlay_struct(bytearray(buf), desc_init) bcnt = ifrm.bcnt data = ifrm.data datalen = len(data) seq = 0 ifrm_cid = ifrm.cid # local_cache_attribute warning = log.warning # local_cache_attribute if ifrm.cmd & _TYPE_MASK == _TYPE_CONT: # unexpected cont packet, abort current msg if __debug__: warning(__name__, "_TYPE_CONT") return None if ifrm_cid == 0 or ((ifrm_cid == _CID_BROADCAST) and (ifrm.cmd != _CMD_INIT)): # CID 0 is reserved for future use and _CID_BROADCAST is reserved for channel allocation await send_cmd(cmd_error(ifrm_cid, _ERR_INVALID_CID), iface) return None if bcnt > _MAX_U2FHID_MSG_PAYLOAD_LEN: # invalid payload length, abort current msg if __debug__: warning(__name__, "_MAX_U2FHID_MSG_PAYLOAD_LEN") await send_cmd(cmd_error(ifrm_cid, _ERR_INVALID_LEN), iface) return None if datalen < bcnt: databuf = bytearray(bcnt) utils.memcpy(databuf, 0, data, 0, bcnt) data = databuf else: data = data[:bcnt] while datalen < bcnt: buf = await loop.race(read, loop.sleep(_CTAP_HID_TIMEOUT_MS)) if not isinstance(buf, bytes): if __debug__: warning(__name__, "_ERR_MSG_TIMEOUT") await send_cmd(cmd_error(ifrm_cid, _ERR_MSG_TIMEOUT), iface) return None cfrm = overlay_struct(bytearray(buf), desc_cont) cfrm_cid = cfrm.cid # local_cache_attribute if cfrm.seq == _CMD_INIT: if cfrm_cid == ifrm_cid: # _CMD_INIT command on current channel, abort current transaction. if __debug__: warning( __name__, "U2FHID: received CMD_INIT command during active tran, aborting", ) break else: # _CMD_INIT command on different channel, return synchronization response, but continue on current CID. if __debug__: log.info( __name__, "U2FHID: received CMD_INIT command for different CID", ) cfrm = overlay_struct(bytearray(buf), desc_init) await send_cmd( cmd_init( Cmd(cfrm_cid, cfrm.cmd, bytes(cfrm.data[: cfrm.bcnt])) ), iface, ) continue if cfrm_cid != ifrm_cid: # Frame for a different channel, continue waiting for next frame on the active CID. # For init frames reply with BUSY. Ignore continuation frames. if cfrm.seq & _TYPE_MASK == _TYPE_INIT: if __debug__: warning( __name__, "U2FHID: received init frame for different CID, _ERR_CHANNEL_BUSY", ) await send_cmd(cmd_error(cfrm_cid, _ERR_CHANNEL_BUSY), iface) else: if __debug__: warning( __name__, "U2FHID: received cont frame for different CID" ) continue if cfrm.seq != seq: # cont frame for this channel, but incorrect seq number, abort # current msg if __debug__: warning(__name__, "_ERR_INVALID_SEQ") await send_cmd(cmd_error(cfrm_cid, _ERR_INVALID_SEQ), iface) return None datalen += utils.memcpy(data, datalen, cfrm.data, 0, bcnt - datalen) seq += 1 else: return Cmd(ifrm_cid, ifrm.cmd, bytes(data)) async def send_cmd(cmd: Cmd, iface: HID) -> None: init_desc = frame_init() cont_desc = frame_cont() offset = 0 seq = 0 datalen = len(cmd.data) buf, frm = make_struct(init_desc) frm.cid = cmd.cid frm.cmd = cmd.cmd frm.bcnt = datalen offset += utils.memcpy(frm.data, 0, cmd.data, offset, datalen) iface.write(buf) if offset < datalen: frm = overlay_struct(buf, cont_desc) write = loop.wait(iface.iface_num() | io.POLL_WRITE) while offset < datalen: frm.seq = seq copied = utils.memcpy(frm.data, 0, cmd.data, offset, datalen) offset += copied if copied < _FRAME_CONT_SIZE: frm.data[copied:] = bytearray(_FRAME_CONT_SIZE - copied) while True: ret = await loop.race(write, loop.sleep(_CTAP_HID_TIMEOUT_MS)) if ret is not None: raise TimeoutError if iface.write(buf) > 0: break seq += 1 def send_cmd_sync(cmd: Cmd, iface: HID) -> None: init_desc = frame_init() cont_desc = frame_cont() offset = 0 seq = 0 datalen = len(cmd.data) buf, frm = make_struct(init_desc) frm.cid = cmd.cid frm.cmd = cmd.cmd frm.bcnt = datalen offset += utils.memcpy(frm.data, 0, cmd.data, offset, datalen) iface.write(buf) if offset < datalen: frm = overlay_struct(buf, cont_desc) while offset < datalen: frm.seq = seq copied = utils.memcpy(frm.data, 0, cmd.data, offset, datalen) offset += copied if copied < _FRAME_CONT_SIZE: frm.data[copied:] = bytearray(_FRAME_CONT_SIZE - copied) iface.write_blocking(buf, 1000) seq += 1 async def handle_reports(iface: HID) -> None: dialog_mgr = DialogManager(iface) while True: try: req = await _read_cmd(iface) if req is None: continue if not dialog_mgr.allow_cid(req.cid): resp: Cmd | None = cmd_error(req.cid, _ERR_CHANNEL_BUSY) else: resp = _dispatch_cmd(req, dialog_mgr) if resp is not None: await send_cmd(resp, iface) except Exception as e: log.exception(__name__, e) class KeepaliveCallback: def __init__(self, cid: int, iface: HID) -> None: self.cid = cid self.iface = iface def __call__(self) -> None: send_cmd_sync(cmd_keepalive(self.cid, _KEEPALIVE_STATUS_PROCESSING), self.iface) async def verify_user(keepalive_callback: KeepaliveCallback) -> bool: from trezor.wire import PinCancelled, PinInvalid from apps.common.request_pin import verify_user_pin import trezor.pin try: trezor.pin.keepalive_callback = keepalive_callback await verify_user_pin(cache_time_ms=_UV_CACHE_TIME_MS) ret = True except (PinCancelled, PinInvalid): ret = False finally: trezor.pin.keepalive_callback = None return ret def _confirm_fido_choose(title: str, credentials: list[Credential]) -> Awaitable[int]: from trezor.ui.layouts.fido import confirm_fido from . import knownapps assert len(credentials) > 0 repr_credential = credentials[0] if __debug__: for cred in credentials: assert cred.rp_id_hash == repr_credential.rp_id_hash app_name = repr_credential.app_name() app = knownapps.by_rp_id_hash(repr_credential.rp_id_hash) icon_name = None if app is None else app.icon_name return confirm_fido( None, title, app_name, icon_name, [c.account_name() for c in credentials] ) async def _confirm_fido(title: str, credential: Credential) -> bool: try: await _confirm_fido_choose(title, [credential]) return True except wire.ActionCancelled: return False class State: def __init__(self, cid: int, iface: HID) -> None: self.cid = cid self.iface = iface self.finished = False def allow_cid(self, cid: int) -> bool: # Generally allow any CID, because Safari browser changes CID for every U2F message. return True def keepalive_status(self) -> int: # Run the keepalive loop to check for timeout, but do not send any keepalive messages. return _KEEPALIVE_STATUS_NONE def timeout_ms(self) -> int: raise NotImplementedError async def confirm_dialog(self) -> bool | "State": raise NotImplementedError async def on_confirm(self) -> None: pass async def on_decline(self) -> None: pass async def on_timeout(self) -> None: pass async def on_cancel(self) -> None: pass class U2fState(State): def __init__(self, cid: int, iface: HID, req_data: bytes, cred: Credential) -> None: State.__init__(self, cid, iface) self._cred = cred self._req_data = req_data def timeout_ms(self) -> int: return _U2F_CONFIRM_TIMEOUT_MS def app_name(self) -> str: return self._cred.app_name() def account_name(self) -> str | None: return self._cred.account_name() class U2fConfirmRegister(U2fState): async def confirm_dialog(self) -> bool: if self._cred.rp_id_hash in _BOGUS_APPIDS: if _last_auth_valid: await show_popup( "U2F", "This device is already registered with this application.", "Already registered.", timeout_ms=_POPUP_TIMEOUT_MS, ) else: await show_popup( "U2F", "This device is not registered with this application.", "Not registered.", timeout_ms=_POPUP_TIMEOUT_MS, ) return False else: return await _confirm_fido("U2F Register", self._cred) def __eq__(self, other: object) -> bool: return ( isinstance(other, U2fConfirmRegister) and self._req_data == other._req_data ) class U2fConfirmAuthenticate(U2fState): async def confirm_dialog(self) -> bool: return await _confirm_fido("U2F Authenticate", self._cred) def __eq__(self, other: object) -> bool: return ( isinstance(other, U2fConfirmAuthenticate) and self._req_data == other._req_data ) class U2fUnlock(State): def timeout_ms(self) -> int: return _U2F_CONFIRM_TIMEOUT_MS async def confirm_dialog(self) -> bool: from trezor.wire import PinCancelled, PinInvalid from apps.common.request_pin import verify_user_pin try: await verify_user_pin() set_homescreen() except (PinCancelled, PinInvalid): return False return True def __eq__(self, other: object) -> bool: return isinstance(other, U2fUnlock) class Fido2State(State): def allow_cid(self, cid: int) -> bool: # In FIDO2 lock out other channels until user interaction or timeout. return cid == self.cid def keepalive_status(self) -> int: return _KEEPALIVE_STATUS_UP_NEEDED def timeout_ms(self) -> int: return _FIDO2_CONFIRM_TIMEOUT_MS async def on_confirm(self) -> None: cmd = cbor_error(self.cid, _ERR_OPERATION_DENIED) await send_cmd(cmd, self.iface) self.finished = True async def on_decline(self) -> None: cmd = cbor_error(self.cid, _ERR_OPERATION_DENIED) await send_cmd(cmd, self.iface) self.finished = True async def on_timeout(self) -> None: await self.on_decline() async def on_cancel(self) -> None: cmd = cbor_error(self.cid, _ERR_KEEPALIVE_CANCEL) await send_cmd(cmd, self.iface) self.finished = True class Fido2Unlock(Fido2State): def __init__( self, process_func: Callable[[Cmd, "DialogManager"], State | Cmd], req: Cmd, dialog_mgr: "DialogManager", ) -> None: super().__init__(req.cid, dialog_mgr.iface) self.process_func = process_func self.req = req self.resp: Cmd | None = None self.dialog_mgr = dialog_mgr async def confirm_dialog(self) -> bool | "State": if not await verify_user(KeepaliveCallback(self.cid, self.iface)): return False set_homescreen() resp = self.process_func(self.req, self.dialog_mgr) if isinstance(resp, State): return resp else: self.resp = resp return True async def on_confirm(self) -> None: if self.resp: await send_cmd(self.resp, self.iface) class Fido2ConfirmMakeCredential(Fido2State): def __init__( self, cid: int, iface: HID, client_data_hash: bytes, cred: Fido2Credential, resident: bool, user_verification: bool, ) -> None: Fido2State.__init__(self, cid, iface) self._client_data_hash = client_data_hash self._cred = cred self._resident = resident self._user_verification = user_verification async def confirm_dialog(self) -> bool: if not await _confirm_fido("FIDO2 Register", self._cred): return False if self._user_verification: return await verify_user(KeepaliveCallback(self.cid, self.iface)) return True async def on_confirm(self) -> None: from .resident_credentials import store_resident_credential cid = self.cid # local_cache_attribute self._cred.generate_id() send_cmd_sync(cmd_keepalive(cid, _KEEPALIVE_STATUS_PROCESSING), self.iface) response_data = _cbor_make_credential_sign( self._client_data_hash, self._cred, self._user_verification ) cmd = Cmd(cid, _CMD_CBOR, bytes([_ERR_NONE]) + response_data) if self._resident: send_cmd_sync(cmd_keepalive(cid, _KEEPALIVE_STATUS_PROCESSING), self.iface) if not store_resident_credential(self._cred): cmd = cbor_error(cid, _ERR_KEY_STORE_FULL) await send_cmd(cmd, self.iface) self.finished = True class Fido2ConfirmExcluded(Fido2ConfirmMakeCredential): def __init__(self, cid: int, iface: HID, cred: Fido2Credential) -> None: super().__init__(cid, iface, b"", cred, resident=False, user_verification=False) async def on_confirm(self) -> None: cmd = cbor_error(self.cid, _ERR_CREDENTIAL_EXCLUDED) await send_cmd(cmd, self.iface) self.finished = True await show_popup( "FIDO2 Register", "This device is already registered with {}.", "Already registered.", self._cred.rp_id, # description_param _POPUP_TIMEOUT_MS, ) class Fido2ConfirmGetAssertion(Fido2State): def __init__( self, cid: int, iface: HID, client_data_hash: bytes, creds: list[Credential], hmac_secret: dict | None, resident: bool, user_verification: bool, ) -> None: Fido2State.__init__(self, cid, iface) self._client_data_hash = client_data_hash self._creds = creds self._hmac_secret = hmac_secret self._resident = resident self._user_verification = user_verification self._selected_cred: Credential | None = None async def confirm_dialog(self) -> bool: # There is a choice from more than one credential. try: index = await _confirm_fido_choose("FIDO2 Authenticate", self._creds) except wire.ActionCancelled: return False self._selected_cred = self._creds[index] if self._user_verification: return await verify_user(KeepaliveCallback(self.cid, self.iface)) return True async def on_confirm(self) -> None: cid = self.cid # local_cache_attribute assert self._selected_cred is not None try: send_cmd_sync(cmd_keepalive(cid, _KEEPALIVE_STATUS_PROCESSING), self.iface) response_data = cbor_get_assertion_sign( self._client_data_hash, self._selected_cred.rp_id_hash, self._selected_cred, self._hmac_secret, self._resident, True, self._user_verification, ) cmd = Cmd(cid, _CMD_CBOR, bytes([_ERR_NONE]) + response_data) except CborError as e: cmd = cbor_error(cid, e.code) except KeyError: cmd = cbor_error(cid, _ERR_MISSING_PARAMETER) except Exception as e: # Firmware error. if __debug__: log.exception(__name__, e) cmd = cbor_error(cid, _ERR_OTHER) await send_cmd(cmd, self.iface) self.finished = True class Fido2ConfirmNoPin(State): def allow_cid(self, cid: int) -> bool: # In FIDO2 lock out other channels until user interaction or timeout. return cid == self.cid def timeout_ms(self) -> int: return _FIDO2_CONFIRM_TIMEOUT_MS async def confirm_dialog(self) -> bool: cmd = cbor_error(self.cid, _ERR_UNSUPPORTED_OPTION) await send_cmd(cmd, self.iface) self.finished = True await show_popup( "FIDO2 Verify User", "Please enable PIN protection.", "Unable to verify user.", timeout_ms=_POPUP_TIMEOUT_MS, ) return False class Fido2ConfirmNoCredentials(Fido2ConfirmGetAssertion): def __init__(self, cid: int, iface: HID, rp_id: str) -> None: cred = Fido2Credential() cred.rp_id = rp_id super().__init__( cid, iface, b"", [cred], {}, resident=False, user_verification=False ) async def on_confirm(self) -> None: cmd = cbor_error(self.cid, _ERR_NO_CREDENTIALS) await send_cmd(cmd, self.iface) self.finished = True await show_popup( "FIDO2 Authenticate", "This device is not registered with\n{}.", "Not registered.", self._creds[0].app_name(), # description_param _POPUP_TIMEOUT_MS, ) class Fido2ConfirmReset(Fido2State): async def confirm_dialog(self) -> bool: from trezor.ui.layouts.fido import confirm_fido_reset return await confirm_fido_reset() async def on_confirm(self) -> None: import storage.resident_credentials storage.resident_credentials.delete_all() cmd = Cmd(self.cid, _CMD_CBOR, bytes([_ERR_NONE])) await send_cmd(cmd, self.iface) self.finished = True class DialogManager: def __init__(self, iface: HID) -> None: self.iface = iface self._clear() def _clear(self) -> None: self.state: State | None = None self.deadline = 0 self.result = _RESULT_NONE self.workflow: loop.spawn | None = None self.keepalive: Coroutine | None = None def _workflow_is_running(self) -> bool: return self.workflow is not None and not self.workflow.finished def reset_timeout(self) -> None: if self.state is not None: self.deadline = utime.ticks_ms() + self.state.timeout_ms() def reset(self) -> None: if self.workflow is not None: self.workflow.close() if self.keepalive is not None: loop.close(self.keepalive) self._clear() def allow_cid(self, cid: int) -> bool: return ( not self.is_busy() or cid == _CID_BROADCAST or (self.state is not None and self.state.allow_cid(cid)) ) def is_busy(self) -> bool: if utime.ticks_ms() >= self.deadline: self.reset() if not self._workflow_is_running(): return bool(workflow.tasks) if self.state is None or self.state.finished: self.reset() return False return True def set_state(self, state: State) -> bool: if self.state == state and utime.ticks_ms() < self.deadline: self.reset_timeout() return True if self.is_busy(): return False self.state = state self.reset_timeout() self.result = _RESULT_NONE self.keepalive = self.keepalive_loop() # TODO: use loop.spawn here loop.schedule(self.keepalive) self.workflow = workflow.spawn(self.dialog_workflow()) return True async def keepalive_loop(self) -> None: state = self.state # local_cache_attribute try: if not state: return while utime.ticks_ms() < self.deadline: if state.keepalive_status() != _KEEPALIVE_STATUS_NONE: cmd = cmd_keepalive(state.cid, state.keepalive_status()) await send_cmd(cmd, self.iface) await loop.sleep(_KEEPALIVE_INTERVAL_MS) finally: self.keepalive = None self.result = _RESULT_TIMEOUT self.reset() async def dialog_workflow(self) -> None: if self.state is None: return try: while self.result is _RESULT_NONE: result = await self.state.confirm_dialog() if isinstance(result, State): self.state = result self.reset_timeout() elif result is True: self.result = _RESULT_CONFIRM else: self.result = _RESULT_DECLINE finally: if self.keepalive is not None: loop.close(self.keepalive) result = self.result # local_cache_attribute state = self.state # local_cache_attribute if result == _RESULT_CONFIRM: await state.on_confirm() elif result == _RESULT_CANCEL: await state.on_cancel() elif result == _RESULT_TIMEOUT: await state.on_timeout() else: await state.on_decline() def _dispatch_cmd(req: Cmd, dialog_mgr: DialogManager) -> Cmd | None: debug = log.debug # local_cache_attribute warning = log.warning # local_cache_attribute cid = req.cid # local_cache_attribute cmd = req.cmd # local_cache_attribute if cmd == _CMD_MSG: try: m = req.to_msg() except IndexError: return cmd_error(cid, _ERR_INVALID_LEN) ins = m.ins # local_cache_attribute if m.cla != 0: if __debug__: warning(__name__, "_SW_CLA_NOT_SUPPORTED") return msg_error(cid, _SW_CLA_NOT_SUPPORTED) if m.lc + _APDU_DATA > len(req.data): if __debug__: warning(__name__, "_SW_WRONG_LENGTH") return msg_error(cid, _SW_WRONG_LENGTH) if ins == _MSG_REGISTER: if __debug__: debug(__name__, "_MSG_REGISTER") return _msg_register(m, dialog_mgr) elif ins == _MSG_AUTHENTICATE: if __debug__: debug(__name__, "_MSG_AUTHENTICATE") return _msg_authenticate(m, dialog_mgr) elif ins == _MSG_VERSION: if __debug__: debug(__name__, "_MSG_VERSION") # msg_version if m.data: return msg_error(m.cid, _SW_WRONG_LENGTH) return Cmd(m.cid, _CMD_MSG, b"U2F_V2\x90\x00") # includes _SW_NO_ERROR else: if __debug__: warning(__name__, "_SW_INS_NOT_SUPPORTED: %d", ins) return msg_error(cid, _SW_INS_NOT_SUPPORTED) elif cmd == _CMD_INIT: if __debug__: debug(__name__, "_CMD_INIT") return cmd_init(req) elif cmd == _CMD_PING: if __debug__: debug(__name__, "_CMD_PING") return req elif cmd == _CMD_WINK and _ALLOW_WINK: if __debug__: debug(__name__, "_CMD_WINK") return _cmd_wink(req) elif cmd == _CMD_CBOR and _ALLOW_FIDO2: if not req.data: return cmd_error(cid, _ERR_INVALID_LEN) req_data_first = req.data[0] if req_data_first == _CBOR_MAKE_CREDENTIAL: if __debug__: debug(__name__, "_CBOR_MAKE_CREDENTIAL") return _cbor_make_credential(req, dialog_mgr) elif req_data_first == _CBOR_GET_ASSERTION: if __debug__: debug(__name__, "_CBOR_GET_ASSERTION") return _cbor_get_assertion(req, dialog_mgr) elif req_data_first == _CBOR_GET_INFO: if __debug__: debug(__name__, "_CBOR_GET_INFO") return _cbor_get_info(req) elif req_data_first == _CBOR_CLIENT_PIN: if __debug__: debug(__name__, "_CBOR_CLIENT_PIN") return _cbor_client_pin(req) elif req_data_first == _CBOR_RESET: if __debug__: debug(__name__, "_CBOR_RESET") return _cbor_reset(req, dialog_mgr) elif req_data_first == _CBOR_GET_NEXT_ASSERTION: if __debug__: debug(__name__, "_CBOR_GET_NEXT_ASSERTION") return cbor_error(cid, _ERR_NOT_ALLOWED) else: if __debug__: warning(__name__, "_ERR_INVALID_CMD _CMD_CBOR %d", req_data_first) return cbor_error(cid, _ERR_INVALID_CMD) elif cmd == _CMD_CANCEL: if __debug__: debug(__name__, "_CMD_CANCEL") dialog_mgr.result = _RESULT_CANCEL dialog_mgr.reset() return None else: if __debug__: warning(__name__, "_ERR_INVALID_CMD: %d", cmd) return cmd_error(cid, _ERR_INVALID_CMD) def cmd_init(req: Cmd) -> Cmd: from trezor.crypto import random cid = req.cid # local_cache_attribute if cid == _CID_BROADCAST: # uint32_t except 0 and 0xffff_ffff resp_cid = random.uniform(0xFFFF_FFFE) + 1 else: resp_cid = cid if len(req.data) != _CMD_INIT_NONCE_SIZE: return cmd_error(cid, _ERR_INVALID_LEN) buf, resp = make_struct(_resp_cmd_init()) utils.memcpy(resp.nonce, 0, req.data, 0, len(req.data)) resp.cid = resp_cid resp.versionInterface = _U2FHID_IF_VERSION resp.versionMajor = 2 resp.versionMinor = 0 resp.versionBuild = 0 resp.capFlags = (_CAPFLAG_WINK * _ALLOW_WINK) | _CAPFLAG_CBOR return Cmd(cid, req.cmd, bytes(buf)) def _cmd_wink(req: Cmd) -> Cmd: from trezor import ui global _last_wink_cid if _last_wink_cid != req.cid: _last_wink_cid = req.cid ui.alert() return req def _msg_register(req: Msg, dialog_mgr: DialogManager) -> Cmd: from .credential import U2fCredential cid = req.cid # local_cache_attribute data = req.data # local_cache_attribute if not config.is_unlocked(): new_state: State = U2fUnlock(cid, dialog_mgr.iface) dialog_mgr.set_state(new_state) return msg_error(cid, _SW_CONDITIONS_NOT_SATISFIED) if not storage_device.is_initialized(): if __debug__: log.warning(__name__, "not initialized") # There is no standard way to decline a U2F request, but responding with ERR_CHANNEL_BUSY # doesn't seem to violate the protocol and at least stops Chrome from polling. return cmd_error(cid, _ERR_CHANNEL_BUSY) # check length of input data if len(data) != 64: if __debug__: log.warning(__name__, "_SW_WRONG_LENGTH req_data") return msg_error(cid, _SW_WRONG_LENGTH) # parse challenge and rp_id_hash chal = data[:32] cred = U2fCredential() cred.rp_id_hash = data[32:] cred.generate_key_handle() # check equality with last request new_state = U2fConfirmRegister(cid, dialog_mgr.iface, data, cred) if not dialog_mgr.set_state(new_state): return msg_error(cid, _SW_CONDITIONS_NOT_SATISFIED) # wait for a button or continue if dialog_mgr.result == _RESULT_NONE: if __debug__: log.info(__name__, "waiting for button") return msg_error(cid, _SW_CONDITIONS_NOT_SATISFIED) if dialog_mgr.result != _RESULT_CONFIRM: if __debug__: log.info(__name__, "request declined") # There is no standard way to decline a U2F request, but responding with ERR_CHANNEL_BUSY # doesn't seem to violate the protocol and at least stops Chrome from polling. return cmd_error(cid, _ERR_CHANNEL_BUSY) # sign the registration challenge and return if __debug__: log.info(__name__, "signing register") buf = _msg_register_sign(chal, cred) dialog_mgr.reset() return Cmd(cid, _CMD_MSG, buf) def basic_attestation_sign(data: Iterable[bytes]) -> bytes: from trezor.crypto import der dig = hashlib.sha256() for segment in data: dig.update(segment) sig = nist256p1.sign(_FIDO_ATT_PRIV_KEY, dig.digest(), False) return der.encode_seq((sig[1:33], sig[33:])) def _msg_register_sign(challenge: bytes, cred: U2fCredential) -> bytes: memcpy = utils.memcpy # local_cache_attribute id = cred.id # local_cache_attribute pubkey = cred.public_key() sig = basic_attestation_sign((b"\x00", cred.rp_id_hash, challenge, id, pubkey)) # pack to a response buf, resp = make_struct(_resp_cmd_register(len(id), len(_FIDO_ATT_CERT), len(sig))) resp.registerId = _U2F_REGISTER_ID memcpy(resp.pubKey, 0, pubkey, 0, len(pubkey)) resp.keyHandleLen = len(id) memcpy(resp.keyHandle, 0, id, 0, len(id)) memcpy(resp.cert, 0, _FIDO_ATT_CERT, 0, len(_FIDO_ATT_CERT)) memcpy(resp.sig, 0, sig, 0, len(sig)) resp.status = _SW_NO_ERROR return bytes(buf) def _msg_authenticate(req: Msg, dialog_mgr: DialogManager) -> Cmd: global _last_auth_valid _last_auth_valid = False cid = req.cid # local_cache_attribute data = req.data # local_cache_attribute info = log.info # local_cache_attribute if not config.is_unlocked(): new_state: State = U2fUnlock(cid, dialog_mgr.iface) dialog_mgr.set_state(new_state) return msg_error(cid, _SW_CONDITIONS_NOT_SATISFIED) if not storage_device.is_initialized(): if __debug__: log.warning(__name__, "not initialized") # Device is not registered with the RP. return msg_error(cid, _SW_WRONG_DATA) # we need at least keyHandleLen if len(data) <= _REQ_CMD_AUTHENTICATE_KHLEN: if __debug__: log.warning(__name__, "_SW_WRONG_LENGTH req_data") return msg_error(cid, _SW_WRONG_LENGTH) # check keyHandleLen khlen = data[_REQ_CMD_AUTHENTICATE_KHLEN] auth = overlay_struct(bytearray(data), _req_cmd_authenticate(khlen)) challenge = bytes(auth.chal) rp_id_hash = bytes(auth.appId) key_handle = bytes(auth.keyHandle) try: cred = Credential.from_bytes(key_handle, rp_id_hash) except Exception: # specific error logged in _node_from_key_handle return msg_error(cid, _SW_WRONG_DATA) _last_auth_valid = True # if _AUTH_CHECK_ONLY is requested, return, because keyhandle has been checked already if req.p1 == _AUTH_CHECK_ONLY: if __debug__: info(__name__, "_AUTH_CHECK_ONLY") return msg_error(cid, _SW_CONDITIONS_NOT_SATISFIED) # from now on, only _AUTH_ENFORCE is supported if req.p1 != _AUTH_ENFORCE: if __debug__: info(__name__, "_AUTH_ENFORCE") return msg_error(cid, _SW_WRONG_DATA) # check equality with last request new_state = U2fConfirmAuthenticate(cid, dialog_mgr.iface, data, cred) if not dialog_mgr.set_state(new_state): return msg_error(cid, _SW_CONDITIONS_NOT_SATISFIED) # wait for a button or continue if dialog_mgr.result == _RESULT_NONE: if __debug__: info(__name__, "waiting for button") return msg_error(cid, _SW_CONDITIONS_NOT_SATISFIED) if dialog_mgr.result != _RESULT_CONFIRM: if __debug__: info(__name__, "request declined") # There is no standard way to decline a U2F request, but responding with ERR_CHANNEL_BUSY # doesn't seem to violate the protocol and at least stops Chrome from polling. return cmd_error(cid, _ERR_CHANNEL_BUSY) # sign the authentication challenge and return if __debug__: info(__name__, "signing authentication") buf = _msg_authenticate_sign(challenge, rp_id_hash, cred) dialog_mgr.reset() return Cmd(cid, _CMD_MSG, buf) def _msg_authenticate_sign( challenge: bytes, rp_id_hash: bytes, cred: Credential ) -> bytes: flags = bytes([_AUTH_FLAG_UP]) # get next counter ctr = cred.next_signature_counter() ctrbuf = ustruct.pack(">L", ctr) # sign the input data together with counter sig = cred.sign((rp_id_hash, flags, ctrbuf, challenge)) # pack to a response buf, resp = make_struct(_resp_cmd_authenticate(len(sig))) resp.flags = flags[0] resp.ctr = ctr utils.memcpy(resp.sig, 0, sig, 0, len(sig)) resp.status = _SW_NO_ERROR return bytes(buf) def msg_error(cid: int, code: int) -> Cmd: return Cmd(cid, _CMD_MSG, ustruct.pack(">H", code)) def cmd_error(cid: int, code: int) -> Cmd: return Cmd(cid, _CMD_ERROR, ustruct.pack(">B", code)) def cbor_error(cid: int, code: int) -> Cmd: return Cmd(cid, _CMD_CBOR, ustruct.pack(">B", code)) def credentials_from_descriptor_list( descriptor_list: list[dict], rp_id_hash: bytes ) -> Iterator[Credential]: for credential_descriptor in descriptor_list: credential_type = credential_descriptor["type"] if not isinstance(credential_type, str): raise TypeError if credential_type != "public-key": continue credential_id = credential_descriptor["id"] if not isinstance(credential_id, bytes): raise TypeError try: cred = Credential.from_bytes(credential_id, rp_id_hash) except Exception: continue yield cred def _distinguishable_cred_list(credentials: Iterable[Credential]) -> list[Credential]: """Reduces the input to a list of credentials which can be distinguished by the user. It is assumed that all input credentials share the same RP ID.""" cred_list: list[Credential] = [] for cred in credentials: for i, prev_cred in enumerate(cred_list): if prev_cred.account_name() == cred.account_name(): # Among indistinguishable FIDO2 credentials prefer the newest. # Among U2F credentials prefer the first in the input. if isinstance(cred, Fido2Credential) and cred < prev_cred: cred_list[i] = cred break else: cred_list.append(cred) return cred_list def _algorithms_from_pub_key_cred_params(pub_key_cred_params: list[dict]) -> list[int]: alg_list = [] for pkcp in pub_key_cred_params: pub_key_cred_type = pkcp["type"] if not isinstance(pub_key_cred_type, str): raise TypeError if pub_key_cred_type != "public-key": continue pub_key_cred_alg = pkcp["alg"] if not isinstance(pub_key_cred_alg, int): raise TypeError alg_list.append(pub_key_cred_alg) return alg_list def _cbor_make_credential(req: Cmd, dialog_mgr: DialogManager) -> Cmd | None: if config.is_unlocked(): resp = _cbor_make_credential_process(req, dialog_mgr) else: resp = Fido2Unlock(_cbor_make_credential_process, req, dialog_mgr) if isinstance(resp, State): if dialog_mgr.set_state(resp): return None else: return cmd_error(req.cid, _ERR_CHANNEL_BUSY) else: return resp def _cbor_make_credential_process(req: Cmd, dialog_mgr: DialogManager) -> State | Cmd: from . import knownapps cid = req.cid # local_cache_attribute if not storage_device.is_initialized(): if __debug__: log.warning(__name__, "not initialized") return cbor_error(cid, _ERR_OTHER) try: param = cbor.decode(req.data, offset=1) rp = param[_MAKECRED_CMD_RP] rp_id = rp["id"] rp_id_hash = hashlib.sha256(rp_id).digest() # Prepare the new credential. user = param[_MAKECRED_CMD_USER] cred = Fido2Credential() cred.rp_id = rp_id cred.rp_id_hash = rp_id_hash cred.rp_name = param[_MAKECRED_CMD_RP].get("name", None) cred.user_id = user["id"] cred.user_name = user.get("name", None) cred.user_display_name = user.get("displayName", None) cred.truncate_names() # Check if any of the credential descriptors in the exclude list belong to this authenticator. exclude_list = param.get(_MAKECRED_CMD_EXCLUDE_LIST, []) excluded_creds = credentials_from_descriptor_list(exclude_list, rp_id_hash) if not utils.is_empty_iterator(excluded_creds): # This authenticator is already registered. return Fido2ConfirmExcluded(cid, dialog_mgr.iface, cred) # Check that the relying party supports ECDSA with SHA-256 or EdDSA. We don't support any other algorithms. pub_key_cred_params = param[_MAKECRED_CMD_PUB_KEY_CRED_PARAMS] for alg in _algorithms_from_pub_key_cred_params(pub_key_cred_params): if alg == common.COSE_ALG_ES256: cred.algorithm = alg cred.curve = common.COSE_CURVE_P256 break elif alg == common.COSE_ALG_EDDSA: cred.algorithm = alg cred.curve = common.COSE_CURVE_ED25519 break else: return cbor_error(cid, _ERR_UNSUPPORTED_ALGORITHM) # Get options. options = param.get(_MAKECRED_CMD_OPTIONS, {}) resident_key = options.get("rk", False) user_verification = options.get("uv", False) # Get supported extensions. cred.hmac_secret = param.get(_MAKECRED_CMD_EXTENSIONS, {}).get( "hmac-secret", False ) client_data_hash = param[_MAKECRED_CMD_CLIENT_DATA_HASH] except TypeError: return cbor_error(cid, _ERR_CBOR_UNEXPECTED_TYPE) except KeyError: return cbor_error(cid, _ERR_MISSING_PARAMETER) except Exception: return cbor_error(cid, _ERR_INVALID_CBOR) app = knownapps.by_rp_id_hash(rp_id_hash) if app is not None and app.use_sign_count is not None: cred.use_sign_count = app.use_sign_count else: cred.use_sign_count = bool(_DEFAULT_USE_SIGN_COUNT) # Check data types. if ( not cred.check_data_types() or not isinstance(user.get("icon", ""), str) or not isinstance(rp.get("icon", ""), str) or not isinstance(client_data_hash, bytes) or not isinstance(resident_key, bool) or not isinstance(user_verification, bool) ): return cbor_error(cid, _ERR_CBOR_UNEXPECTED_TYPE) # Check options. if "up" in options: return cbor_error(cid, _ERR_INVALID_OPTION) if resident_key and not _ALLOW_RESIDENT_CREDENTIALS: return cbor_error(cid, _ERR_UNSUPPORTED_OPTION) if user_verification and not config.has_pin(): # User verification requested, but PIN is not enabled. return Fido2ConfirmNoPin(cid, dialog_mgr.iface) # Check that the pinAuth parameter is absent. Client PIN is not supported. if _MAKECRED_CMD_PIN_AUTH in param: return cbor_error(cid, _ERR_PIN_AUTH_INVALID) # Ask user to confirm registration. return Fido2ConfirmMakeCredential( cid, dialog_mgr.iface, client_data_hash, cred, resident_key, user_verification, ) def _cbor_make_credential_sign( client_data_hash: bytes, cred: Fido2Credential, user_verification: bool ) -> bytes: from . import knownapps flags = _AUTH_FLAG_UP | _AUTH_FLAG_AT if user_verification: flags |= _AUTH_FLAG_UV # Encode the authenticator data (Credential ID, its public key and extensions). att_cred_data = ( _AAGUID + len(cred.id).to_bytes(2, "big") + cred.id + cred.public_key() ) extensions = b"" if cred.hmac_secret: extensions = cbor.encode({"hmac-secret": True}) flags |= _AUTH_FLAG_ED ctr = cred.next_signature_counter() authenticator_data = ( cred.rp_id_hash + bytes([flags]) + ctr.to_bytes(4, "big") + att_cred_data + extensions ) # use_self_attestation app = knownapps.by_rp_id_hash(cred.rp_id_hash) if app is not None and app.use_self_attestation is not None: use_self_attestation = app.use_self_attestation else: use_self_attestation = _DEFAULT_USE_SELF_ATTESTATION if use_self_attestation: sig = cred.sign((authenticator_data, client_data_hash)) attestation_statement = {"alg": cred.algorithm, "sig": sig} else: sig = basic_attestation_sign((authenticator_data, client_data_hash)) attestation_statement = { "alg": common.COSE_ALG_ES256, "sig": sig, "x5c": [_FIDO_ATT_CERT], } # Encode the authenticatorMakeCredential response data. return cbor.encode( { _MAKECRED_RESP_FMT: "packed", _MAKECRED_RESP_AUTH_DATA: authenticator_data, _MAKECRED_RESP_ATT_STMT: attestation_statement, } ) def _cbor_get_assertion(req: Cmd, dialog_mgr: DialogManager) -> Cmd | None: if config.is_unlocked(): resp = _cbor_get_assertion_process(req, dialog_mgr) else: resp = Fido2Unlock(_cbor_get_assertion_process, req, dialog_mgr) if isinstance(resp, State): if dialog_mgr.set_state(resp): return None else: return cmd_error(req.cid, _ERR_CHANNEL_BUSY) else: return resp def _cbor_get_assertion_process(req: Cmd, dialog_mgr: DialogManager) -> State | Cmd: from .resident_credentials import find_by_rp_id_hash cid = req.cid # local_cache_attribute if not storage_device.is_initialized(): if __debug__: log.warning(__name__, "not initialized") return cbor_error(cid, _ERR_OTHER) try: param = cbor.decode(req.data, offset=1) rp_id = param[_GETASSERT_CMD_RP_ID] rp_id_hash = hashlib.sha256(rp_id).digest() allow_list = param.get(_GETASSERT_CMD_ALLOW_LIST, []) cred_list: list[Credential] if allow_list: # Get all credentials from the allow list that belong to this authenticator. allowed_creds = credentials_from_descriptor_list(allow_list, rp_id_hash) cred_list = _distinguishable_cred_list(allowed_creds) for cred in cred_list: if cred.rp_id is None: cred.rp_id = rp_id resident = False else: # Allow list is empty. Get resident credentials. if _ALLOW_RESIDENT_CREDENTIALS: cred_list = list(find_by_rp_id_hash(rp_id_hash)) else: cred_list = [] resident = True # Sort credentials by time of creation. cred_list.sort() # Check that the pinAuth parameter is absent. Client PIN is not supported. if _GETASSERT_CMD_PIN_AUTH in param: return cbor_error(cid, _ERR_PIN_AUTH_INVALID) # Get options. options = param.get(_GETASSERT_CMD_OPTIONS, {}) user_presence = options.get("up", True) user_verification = options.get("uv", False) # Get supported extensions. hmac_secret = param.get(_GETASSERT_CMD_EXTENSIONS, {}).get("hmac-secret", None) client_data_hash = param[_GETASSERT_CMD_CLIENT_DATA_HASH] except TypeError: return cbor_error(cid, _ERR_CBOR_UNEXPECTED_TYPE) except KeyError: return cbor_error(cid, _ERR_MISSING_PARAMETER) except Exception: return cbor_error(cid, _ERR_INVALID_CBOR) # Check data types. if ( not isinstance(hmac_secret, (dict, type(None))) or not isinstance(client_data_hash, bytes) or not isinstance(user_presence, bool) or not isinstance(user_verification, bool) ): return cbor_error(cid, _ERR_CBOR_UNEXPECTED_TYPE) # Check options. if "rk" in options: return cbor_error(cid, _ERR_INVALID_OPTION) if user_verification and not config.has_pin(): # User verification requested, but PIN is not enabled. return Fido2ConfirmNoPin(cid, dialog_mgr.iface) if not cred_list: # No credentials. This authenticator is not registered. if user_presence: return Fido2ConfirmNoCredentials(cid, dialog_mgr.iface, rp_id) else: return cbor_error(cid, _ERR_NO_CREDENTIALS) elif not user_presence and not user_verification: # Silent authentication. try: response_data = cbor_get_assertion_sign( client_data_hash, rp_id_hash, cred_list[0], hmac_secret, resident, user_presence, user_verification, ) return Cmd(cid, _CMD_CBOR, bytes([_ERR_NONE]) + response_data) except Exception as e: # Firmware error. if __debug__: log.exception(__name__, e) return cbor_error(cid, _ERR_OTHER) else: # Ask user to confirm one of the credentials. return Fido2ConfirmGetAssertion( cid, dialog_mgr.iface, client_data_hash, cred_list, hmac_secret, resident, user_verification, ) def _cbor_get_assertion_hmac_secret( cred: Credential, hmac_secret: dict ) -> bytes | None: from storage.fido2 import KEY_AGREEMENT_PRIVKEY from trezor.crypto import aes from trezor.crypto import hmac key_agreement = hmac_secret[1] # The public key of platform key agreement key. # NOTE: We should check the key_agreement[COSE_KEY_ALG] here, but to avoid compatibility issues we don't, # because there is currently no valid value which describes the actual key agreement algorithm. if ( key_agreement[common.COSE_KEY_KTY] != common.COSE_KEYTYPE_EC2 or key_agreement[common.COSE_KEY_CRV] != common.COSE_CURVE_P256 ): return None x = key_agreement[common.COSE_KEY_X] y = key_agreement[common.COSE_KEY_Y] salt_enc = hmac_secret[2] # The encrypted salt. salt_auth = hmac_secret[3] # The HMAC of the encrypted salt. if ( len(x) != 32 or len(y) != 32 or len(salt_auth) != 16 or len(salt_enc) not in (32, 64) ): raise CborError(_ERR_INVALID_LEN) # Compute the ECDH shared secret. ecdh_result = nist256p1.multiply(KEY_AGREEMENT_PRIVKEY, b"\04" + x + y) shared_secret = hashlib.sha256(ecdh_result[1:33]).digest() # Check the authentication tag and decrypt the salt. tag = hmac(hmac.SHA256, shared_secret, salt_enc).digest()[:16] if not utils.consteq(tag, salt_auth): raise CborError(_ERR_EXTENSION_FIRST) salt = aes(aes.CBC, shared_secret).decrypt(salt_enc) # Get cred_random - a constant symmetric key associated with the credential. cred_random = cred.hmac_secret_key() if cred_random is None: # The credential does not have the hmac-secret extension enabled. return None # Compute the hmac-secret output. output = hmac(hmac.SHA256, cred_random, salt[:32]).digest() if len(salt) == 64: output += hmac(hmac.SHA256, cred_random, salt[32:]).digest() # Encrypt the hmac-secret output. return aes(aes.CBC, shared_secret).encrypt(output) def cbor_get_assertion_sign( client_data_hash: bytes, rp_id_hash: bytes, cred: Credential, hmac_secret: dict | None, resident: bool, user_presence: bool, user_verification: bool, ) -> bytes: # Process extensions extensions = {} # Spec deviation: Do not reveal hmac-secret during silent authentication. if hmac_secret and user_presence: encrypted_output = _cbor_get_assertion_hmac_secret(cred, hmac_secret) if encrypted_output is not None: extensions["hmac-secret"] = encrypted_output # Encode the authenticator data. flags = 0 if user_presence: flags |= _AUTH_FLAG_UP if user_verification: flags |= _AUTH_FLAG_UV encoded_extensions = b"" if extensions: flags |= _AUTH_FLAG_ED encoded_extensions = cbor.encode(extensions) ctr = cred.next_signature_counter() authenticator_data = ( rp_id_hash + bytes([flags]) + ctr.to_bytes(4, "big") + encoded_extensions ) # Sign the authenticator data and the client data hash. if user_presence: sig = cred.sign((authenticator_data, client_data_hash)) else: # Spec deviation: Use a bogus signature during silent authentication. sig = cred.bogus_signature() # Encode the authenticatorGetAssertion response data. response = { _GETASSERT_RESP_CREDENTIAL: {"type": "public-key", "id": cred.id}, _GETASSERT_RESP_AUTH_DATA: authenticator_data, _GETASSERT_RESP_SIGNATURE: sig, } if resident and user_presence and cred.user_id is not None: response[_GETASSERT_RESP_USER] = {"id": cred.user_id} return cbor.encode(response) def _cbor_get_info(req: Cmd) -> Cmd: from .credential import CRED_ID_MAX_LENGTH # Note: We claim that the PIN is set even when it's not, because otherwise # login.live.com shows an error, but doesn't instruct the user to set a PIN. response_data = { _GETINFO_RESP_VERSIONS: ["U2F_V2", "FIDO_2_0"], _GETINFO_RESP_EXTENSIONS: ["hmac-secret"], _GETINFO_RESP_AAGUID: _AAGUID, _GETINFO_RESP_OPTIONS: { "rk": bool(_ALLOW_RESIDENT_CREDENTIALS), "up": True, "uv": True, }, _GETINFO_RESP_PIN_PROTOCOLS: [1], _GETINFO_RESP_MAX_CRED_COUNT_IN_LIST: _MAX_CRED_COUNT_IN_LIST, _GETINFO_RESP_MAX_CRED_ID_LEN: CRED_ID_MAX_LENGTH, } return Cmd(req.cid, _CMD_CBOR, bytes([_ERR_NONE]) + cbor.encode(response_data)) def _cbor_client_pin(req: Cmd) -> Cmd: from storage.fido2 import KEY_AGREEMENT_PUBKEY cid = req.cid # local_cache_attribute try: param = cbor.decode(req.data, offset=1) pin_protocol = param[_CLIENTPIN_CMD_PIN_PROTOCOL] subcommand = param[_CLIENTPIN_CMD_SUBCOMMAND] except Exception: return cbor_error(cid, _ERR_INVALID_CBOR) if pin_protocol != 1: return cbor_error(cid, _ERR_PIN_AUTH_INVALID) # We only support the get key agreement command which is required for the hmac-secret extension. if subcommand != _CLIENTPIN_SUBCMD_GET_KEY_AGREEMENT: return cbor_error(cid, _ERR_UNSUPPORTED_OPTION) # Encode the public key of the authenticator key agreement key. # NOTE: There is currently no valid value for COSE_KEY_ALG which describes the actual # key agreement algorithm as specified, but COSE_ALG_ECDH_ES_HKDF_256 is allegedly # recommended by the latest draft of the CTAP2 spec. response_data = { _CLIENTPIN_RESP_KEY_AGREEMENT: { common.COSE_KEY_ALG: common.COSE_ALG_ECDH_ES_HKDF_256, common.COSE_KEY_KTY: common.COSE_KEYTYPE_EC2, common.COSE_KEY_CRV: common.COSE_CURVE_P256, common.COSE_KEY_X: KEY_AGREEMENT_PUBKEY[1:33], common.COSE_KEY_Y: KEY_AGREEMENT_PUBKEY[33:], } } return Cmd(cid, _CMD_CBOR, bytes([_ERR_NONE]) + cbor.encode(response_data)) def _cbor_reset(req: Cmd, dialog_mgr: DialogManager) -> Cmd | None: if not storage_device.is_initialized(): if __debug__: log.warning(__name__, "not initialized") # Return success, because the authenticator is already in factory default state. return cbor_error(req.cid, _ERR_NONE) if not dialog_mgr.set_state(Fido2ConfirmReset(req.cid, dialog_mgr.iface)): return cmd_error(req.cid, _ERR_CHANNEL_BUSY) return None def cmd_keepalive(cid: int, status: int) -> Cmd: return Cmd(cid, _CMD_KEEPALIVE, bytes([status]))