1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2024-12-24 07:18:09 +00:00

feat(python): make webusb transport more resilient

* convert more USB errors into TransportExceptions
* add a timeout + infinite loop for read/write operations,
  so that they are interruptible by Python, instead of leaving
  the interface in a bad state when hard-killed
* (also ctrl+c now works if the process is waiting for usb)
This commit is contained in:
matejcik 2024-08-05 14:17:41 +02:00 committed by matejcik
parent 2a567f3a0c
commit ed79d35de9
3 changed files with 40 additions and 10 deletions

View File

@ -0,0 +1 @@
Most USB level errors are now converted to `TransportException`.

View File

@ -0,0 +1 @@
It is now possible to interrupt USB communication (via Ctrl+C, or a signal, or any other way).

View File

@ -40,6 +40,9 @@ ENDPOINT = 1
DEBUG_INTERFACE = 1 DEBUG_INTERFACE = 1
DEBUG_ENDPOINT = 2 DEBUG_ENDPOINT = 2
USB_COMM_TIMEOUT_MS = 300
WEBUSB_CHUNK_SIZE = 64
class WebUsbHandle: class WebUsbHandle:
def __init__(self, device: "usb1.USBDevice", debug: bool = False) -> None: def __init__(self, device: "usb1.USBDevice", debug: bool = False) -> None:
@ -64,28 +67,53 @@ class WebUsbHandle:
def close(self) -> None: def close(self) -> None:
if self.handle is not None: if self.handle is not None:
self.handle.releaseInterface(self.interface) try:
self.handle.close() self.handle.releaseInterface(self.interface)
self.handle.close()
except Exception as e:
raise TransportException(f"USB close failed: {e}") from e
self.handle = None self.handle = None
def write_chunk(self, chunk: bytes) -> None: def write_chunk(self, chunk: bytes) -> None:
assert self.handle is not None assert self.handle is not None
if len(chunk) != 64: if len(chunk) != WEBUSB_CHUNK_SIZE:
raise TransportException(f"Unexpected chunk size: {len(chunk)}") raise TransportException(f"Unexpected chunk size: {len(chunk)}")
LOG.log(DUMP_PACKETS, f"writing packet: {chunk.hex()}") LOG.log(DUMP_PACKETS, f"writing packet: {chunk.hex()}")
self.handle.interruptWrite(self.endpoint, chunk) while True:
try:
bytes_written = self.handle.interruptWrite(
self.endpoint, chunk, USB_COMM_TIMEOUT_MS
)
except usb1.USBErrorTimeout as e:
bytes_written = e.transferred
except Exception as e:
raise TransportException(f"USB write failed: {e}") from e
if bytes_written == 0:
continue
if bytes_written != len(chunk):
raise TransportException(
f"USB partial write: {bytes_written} out of {WEBUSB_CHUNK_SIZE}"
)
return
def read_chunk(self) -> bytes: def read_chunk(self) -> bytes:
assert self.handle is not None assert self.handle is not None
endpoint = 0x80 | self.endpoint endpoint = 0x80 | self.endpoint
while True: while True:
chunk = self.handle.interruptRead(endpoint, 64) try:
if chunk: chunk = self.handle.interruptRead(
break endpoint, WEBUSB_CHUNK_SIZE, USB_COMM_TIMEOUT_MS
else: )
time.sleep(0.001) if chunk:
break
else:
time.sleep(0.001)
except usb1.USBErrorTimeout:
pass
except Exception as e:
raise TransportException(f"USB read failed: {e}") from e
LOG.log(DUMP_PACKETS, f"read packet: {chunk.hex()}") LOG.log(DUMP_PACKETS, f"read packet: {chunk.hex()}")
if len(chunk) != 64: if len(chunk) != WEBUSB_CHUNK_SIZE:
raise TransportException(f"Unexpected chunk size: {len(chunk)}") raise TransportException(f"Unexpected chunk size: {len(chunk)}")
return chunk return chunk