From c94e02b0eb6a2ac860cbe3052fc9b432e26ec0d7 Mon Sep 17 00:00:00 2001 From: Jan Pochyla Date: Mon, 19 Jun 2017 14:47:40 +0200 Subject: [PATCH] apps.fido_u2f: more type annotations --- src/apps/fido_u2f/__init__.py | 90 +++++++++++++++++------------------ 1 file changed, 45 insertions(+), 45 deletions(-) diff --git a/src/apps/fido_u2f/__init__.py b/src/apps/fido_u2f/__init__.py index 1d367efe8d..6462214372 100644 --- a/src/apps/fido_u2f/__init__.py +++ b/src/apps/fido_u2f/__init__.py @@ -195,14 +195,26 @@ def make_struct(desc): return buf, uctypes.struct(uctypes.addressof(buf), desc, uctypes.BIG_ENDIAN) +class Msg: + + def __init__(self, cid: int, cla: int, ins: int, p1: int, p2: int, lc: int, data: bytes) -> None: + self.cid = cid + self.cla = cla + self.ins = ins + self.p1 = p1 + self.p2 = p2 + self.lc = lc + self.data = data + + class Cmd: - def __init__(self, cid: int, cmd: int, data: bytes): + def __init__(self, cid: int, cmd: int, data: bytes) -> None: self.cid = cid self.cmd = cmd self.data = data - def to_msg(self): + def to_msg(self) -> Msg: cla = self.data[_APDU_CLA] ins = self.data[_APDU_INS] p1 = self.data[_APDU_P1] @@ -214,23 +226,11 @@ class Cmd: return Msg(self.cid, cla, ins, p1, p2, lc, data) -class Msg: - - def __init__(self, cid: int, cla: int, ins: int, p1: int, p2: int, lc: int, data: bytes): - self.cid = cid - self.cla = cla - self.ins = ins - self.p1 = p1 - self.p2 = p2 - self.lc = lc - self.data = data - - -def read_cmd(iface: int) -> Cmd: +async def read_cmd(iface: int) -> Cmd: desc_init = frame_init() desc_cont = frame_cont() - buf, = yield loop.select(iface) + buf, = await loop.select(iface) # log.debug(__name__, 'read init %s', buf) ifrm = overlay_struct(buf, desc_init) @@ -252,7 +252,7 @@ def read_cmd(iface: int) -> Cmd: data = data[:bcnt] while datalen < bcnt: - buf, = yield loop.select(iface) + buf, = await loop.select(iface) # log.debug(__name__, 'read cont %s', buf) cfrm = overlay_struct(buf, desc_cont) @@ -282,7 +282,7 @@ def read_cmd(iface: int) -> Cmd: return Cmd(ifrm.cid, ifrm.cmd, data) -def send_cmd(cmd: Cmd, iface: int): +def send_cmd(cmd: Cmd, iface: int) -> None: init_desc = frame_init() cont_desc = frame_cont() offset = 0 @@ -315,10 +315,10 @@ def boot(): loop.schedule_task(handle_reports(iface)) -def handle_reports(iface: int): +async def handle_reports(iface: int): while True: try: - req = yield from read_cmd(iface) + req = await read_cmd(iface) if req is None: continue resp = dispatch_cmd(req) @@ -333,11 +333,11 @@ def dispatch_cmd(req: Cmd) -> Cmd: if m.cla != 0: log.warning(__name__, '_SW_CLA_NOT_SUPPORTED') - return msg_error(req, _SW_CLA_NOT_SUPPORTED) + return msg_error(req.cid, _SW_CLA_NOT_SUPPORTED) if m.lc + _APDU_DATA > len(req.data): log.warning(__name__, '_SW_WRONG_LENGTH') - return msg_error(req, _SW_WRONG_LENGTH) + return msg_error(req.cid, _SW_WRONG_LENGTH) if m.ins == _MSG_REGISTER: log.debug(__name__, '_MSG_REGISTER') @@ -350,7 +350,7 @@ def dispatch_cmd(req: Cmd) -> Cmd: return msg_version(m) else: log.warning(__name__, '_SW_INS_NOT_SUPPORTED: %d', m.ins) - return msg_error(req, _SW_INS_NOT_SUPPORTED) + return msg_error(req.cid, _SW_INS_NOT_SUPPORTED) elif req.cmd == _CMD_INIT: log.debug(__name__, '_CMD_INIT') @@ -393,14 +393,14 @@ _CONFIRM_AUTHENTICATE = const(1) class ConfirmContent(ui.Widget): - def __init__(self, action: int, app_id: bytes): + def __init__(self, action: int, app_id: bytes) -> None: self.action = action self.app_id = app_id self.app_name = None self.app_icon = None self.boot() - def boot(self): + def boot(self) -> None: import ubinascii from trezor import res from . import knownapps @@ -420,7 +420,7 @@ class ConfirmContent(ui.Widget): self.app_name = name self.app_icon = icon - def render(self): + def render(self) -> None: if self.action == _CONFIRM_REGISTER: header = 'U2F Register' else: @@ -435,23 +435,23 @@ _CONFIRM_STATE_TIMEOUT_MS = const(10 * 1000) class ConfirmState: - def __init__(self, action: int, app_id: bytes): + def __init__(self, action: int, app_id: bytes) -> None: self.action = action self.app_id = app_id self.deadline_ms = None self.confirmed = None self.task = None - def fork(self): + def fork(self) -> None: self.deadline_ms = utime.ticks_ms() + _CONFIRM_STATE_TIMEOUT_MS self.task = self.confirm() workflow.start(self.task) - def kill(self): + def kill(self) -> None: if self.task is not None: self.task.close() - async def confirm(self): + async def confirm(self) -> None: from trezor.ui.confirm import HoldToConfirmDialog content = ConfirmContent(self.action, self.app_id) dialog = HoldToConfirmDialog(content) @@ -459,8 +459,8 @@ class ConfirmState: self.confirmed = await dialog -_state = None # state for msg_register and msg_authenticate, None or ConfirmState -_lastreq = None # last received register/authenticate request, None or Req +_state = None # type: Optional[ConfirmState] # state for msg_register and msg_authenticate +_lastreq = None # type: Optional[Msg] # last received register/authenticate request def msg_register(req: Msg) -> Cmd: @@ -471,12 +471,12 @@ def msg_register(req: Msg) -> Cmd: if not storage.is_initialized(): log.warning(__name__, 'not initialized') - return msg_error(req, _SW_CONDITIONS_NOT_SATISFIED) + return msg_error(req.cid, _SW_CONDITIONS_NOT_SATISFIED) # check length of input data if len(req.data) != 64: log.warning(__name__, '_SW_WRONG_LENGTH req.data') - return msg_error(req, _SW_WRONG_LENGTH) + return msg_error(req.cid, _SW_WRONG_LENGTH) # parse challenge and app_id chal = req.data[:32] @@ -498,7 +498,7 @@ def msg_register(req: Msg) -> Cmd: _state.fork() if _state.confirmed is None: log.info(__name__, 'waiting for button') - return msg_error(req, _SW_CONDITIONS_NOT_SATISFIED) + return msg_error(req.cid, _SW_CONDITIONS_NOT_SATISFIED) _state = None buf = msg_register_sign(chal, app_id) @@ -565,18 +565,18 @@ def msg_authenticate(req: Msg) -> Cmd: if not storage.is_initialized(): log.warning(__name__, 'not initialized') - return msg_error(req, _SW_CONDITIONS_NOT_SATISFIED) + return msg_error(req.cid, _SW_CONDITIONS_NOT_SATISFIED) # we need at least keyHandleLen if len(req.data) <= _REQ_CMD_AUTHENTICATE_KHLEN: log.warning(__name__, '_SW_WRONG_LENGTH req.data') - return msg_error(req, _SW_WRONG_LENGTH) + return msg_error(req.cid, _SW_WRONG_LENGTH) # check keyHandleLen khlen = req.data[_REQ_CMD_AUTHENTICATE_KHLEN] if khlen != 64: log.warning(__name__, '_SW_WRONG_LENGTH khlen') - return msg_error(req, _SW_WRONG_LENGTH) + return msg_error(req.cid, _SW_WRONG_LENGTH) auth = overlay_struct(req.data, req_cmd_authenticate(khlen)) @@ -584,17 +584,17 @@ def msg_authenticate(req: Msg) -> Cmd: node = msg_authenticate_genkey(auth.appId, auth.keyHandle) if node is None: # specific error logged in msg_authenticate_genkey - return msg_error(req, _SW_WRONG_DATA) + return msg_error(req.cid, _SW_WRONG_DATA) # if _AUTH_CHECK_ONLY is requested, return, because keyhandle has been checked already if req.p1 == _AUTH_CHECK_ONLY: log.info(__name__, '_AUTH_CHECK_ONLY') - return msg_error(req, _SW_CONDITIONS_NOT_SATISFIED) + return msg_error(req.cid, _SW_CONDITIONS_NOT_SATISFIED) # from now on, only _AUTH_ENFORCE is supported if req.p1 != _AUTH_ENFORCE: log.info(__name__, '_AUTH_ENFORCE') - return msg_error(req, _SW_WRONG_DATA) + return msg_error(req.cid, _SW_WRONG_DATA) # check equality with last request if _lastreq is None or _lastreq.__dict__ != req.__dict__: @@ -612,7 +612,7 @@ def msg_authenticate(req: Msg) -> Cmd: _state.fork() if _state.confirmed is None: log.info(__name__, 'waiting for button') - return msg_error(req, _SW_CONDITIONS_NOT_SATISFIED) + return msg_error(req.cid, _SW_CONDITIONS_NOT_SATISFIED) _state = None buf = msg_authenticate_sign(auth.chal, auth.appId, node.private_key()) @@ -691,12 +691,12 @@ def msg_authenticate_sign(challenge: bytes, app_id: bytes, privkey: bytes) -> by def msg_version(req: Msg) -> Cmd: if req.data: - return msg_error(req, _SW_WRONG_LENGTH) + return msg_error(req.cid, _SW_WRONG_LENGTH) return Cmd(req.cid, _CMD_MSG, b'U2F_V2\x90\x00') # includes _SW_NO_ERROR -def msg_error(req: Msg, code: int) -> Cmd: - return Cmd(req.cid, _CMD_MSG, ustruct.pack('>H', code)) +def msg_error(cid: int, code: int) -> Cmd: + return Cmd(cid, _CMD_MSG, ustruct.pack('>H', code)) def cmd_error(cid: int, code: int) -> Cmd: