feat(core): prevent interruption of workflows from other communication interfaces

tychovrahe/bluetooth/cleaner_disc2
tychovrahe 1 year ago
parent 8761105264
commit 945958096e

@ -39,6 +39,7 @@ message Failure {
Failure_PinMismatch = 12;
Failure_WipeCodeMismatch = 13;
Failure_InvalidSession = 14;
Failure_DeviceIsBusy = 15;
Failure_FirmwareError = 99;
}
}

@ -39,6 +39,8 @@ boot
import boot
main
import main
mutex
import mutex
session
import session
typing

@ -0,0 +1,23 @@
class Mutex:
def __init__(self):
self.ifaces = []
self.busy = None
def add(self, iface_num: int):
if iface_num not in self.ifaces:
self.ifaces.append(iface_num)
def set_busy(self, iface_num: int):
if iface_num in self.ifaces:
self.busy = iface_num
def get_busy(self, iface_num: int) -> int:
return (
iface_num in self.ifaces
and self.busy is not None
and self.busy != iface_num
)
def release(self, iface_num: int):
if iface_num == self.busy:
self.busy = None

@ -1,3 +1,5 @@
from mutex import Mutex
from trezor import log, loop, utils, wire, workflow
import apps.base
@ -20,11 +22,19 @@ if __debug__:
apps.base.set_homescreen()
workflow.start_default()
mutex = Mutex()
mutex.add(usb.iface_wire.iface_num())
mutex.add(usb.iface_debug.iface_num())
mutex.add(bluetooth.iface_ble.iface_num())
# initialize the wire codec
wire.setup(usb.iface_wire)
wire.setup(usb.iface_wire, mutex=mutex)
if __debug__:
wire.setup(usb.iface_debug, is_debug_session=True)
wire.setup(bluetooth.iface_ble)
wire.setup(usb.iface_debug, is_debug_session=True, mutex=mutex)
wire.setup(bluetooth.iface_ble, mutex=mutex)
loop.run()

@ -16,4 +16,5 @@ NotInitialized = 11
PinMismatch = 12
WipeCodeMismatch = 13
InvalidSession = 14
DeviceIsBusy = 15
FirmwareError = 99

@ -268,6 +268,7 @@ if TYPE_CHECKING:
PinMismatch = 12
WipeCodeMismatch = 13
InvalidSession = 14
DeviceIsBusy = 15
FirmwareError = 99
class ButtonRequestType(IntEnum):

@ -92,9 +92,9 @@ if TYPE_CHECKING:
experimental_enabled = False
def setup(iface: WireInterface, is_debug_session: bool = False) -> None:
def setup(iface: WireInterface, is_debug_session: bool = False, mutex=None) -> None:
"""Initialize the wire stack on passed USB interface."""
loop.schedule(handle_session(iface, codec_v1.SESSION_ID, is_debug_session))
loop.schedule(handle_session(iface, codec_v1.SESSION_ID, is_debug_session, mutex))
def _wrap_protobuf_load(
@ -384,7 +384,7 @@ async def _handle_single_message(
async def handle_session(
iface: WireInterface, session_id: int, is_debug_session: bool = False
iface: WireInterface, session_id: int, is_debug_session: bool = False, mutex=None
) -> None:
if __debug__ and is_debug_session:
ctx_buffer = WIRE_BUFFER_DEBUG
@ -409,6 +409,18 @@ async def handle_session(
# wait for a new one coming from the wire.
try:
msg = await ctx.read_from_wire()
if mutex is not None:
if mutex.get_busy(iface.iface_num()):
await ctx.write(
Failure(
code=FailureType.DeviceIsBusy,
message="Device is busy",
)
)
continue
else:
mutex.set_busy(iface.iface_num())
except codec_v1.CodecError as exc:
if __debug__:
log.exception(__name__, exc)

@ -26,7 +26,7 @@ import dbus.mainloop.glib
import dbus.service
from gi.repository import GLib
from .. import exceptions, transport
from .. import exceptions, messages, transport
from ..client import TrezorClient
from ..ui import ClickUI, ScriptUI
@ -108,6 +108,12 @@ class TrezorConnection:
except transport.DeviceIsBusy:
click.echo("Device is in use by another process.")
sys.exit(1)
except exceptions.TrezorFailure as e:
if e.code is messages.FailureType.DeviceIsBusy:
click.echo(str(e))
sys.exit(1)
else:
raise e
except Exception:
click.echo("Failed to find a Trezor device.")
if self.path is not None:

@ -277,6 +277,7 @@ class FailureType(IntEnum):
PinMismatch = 12
WipeCodeMismatch = 13
InvalidSession = 14
DeviceIsBusy = 15
FirmwareError = 99

Loading…
Cancel
Save