1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-04-04 17:36:02 +00:00

feat(core): add BleInterface to session handling

Cherry-picked from b62a5aec6b.

[no changelog]
This commit is contained in:
Roman Zeyde 2025-03-20 11:56:12 +02:00
parent 96549622a6
commit 9f7392f6ff
5 changed files with 50 additions and 11 deletions

View File

@ -33,6 +33,8 @@ from trezor import utils
all_modules
import all_modules
bluetooth
import bluetooth
boot
import boot
main

21
core/src/bluetooth.py Normal file
View File

@ -0,0 +1,21 @@
from trezorio import BLE, ble
# TODO: reimplement in C
class BleInterface:
RX_PACKET_LEN = ble.RX_PACKET_LEN
TX_PACKET_LEN = ble.TX_PACKET_LEN
def iface_num(self) -> int:
return BLE
def write(self, msg: bytes) -> int:
return ble.write(msg)
def read(self, buffer: bytearray, offset: int = 0) -> int:
return ble.read(buffer, offset)
# interface used for trezor wire protocol
iface_ble = BleInterface()

View File

@ -49,6 +49,12 @@ import storage.device
usb.bus.open(storage.device.get_device_id())
if utils.USE_BLE:
from trezorio import ble
ble.start_comm()
# run the endless loop
while True:
with unimport_manager:

View File

@ -1,9 +1,18 @@
# isort: skip_file
from micropython import const
from trezor import log, loop, utils, wire, workflow
import apps.base
import usb
_PROTOBUF_BUFFER_SIZE = const(8192)
USB_BUFFER = bytearray(_PROTOBUF_BUFFER_SIZE)
if utils.USE_BLE:
BLE_BUFFER = bytearray(_PROTOBUF_BUFFER_SIZE)
apps.base.boot()
if not utils.BITCOIN_ONLY and usb.ENABLE_IFACE_WEBAUTHN:
@ -20,8 +29,14 @@ if __debug__:
apps.base.set_homescreen()
workflow.start_default()
# initialize the wire codec
wire.setup(usb.iface_wire)
# initialize the wire codec over USB
wire.setup(usb.iface_wire, USB_BUFFER)
if utils.USE_BLE:
import bluetooth
# initialize the wire codec over BLE
wire.setup(bluetooth.iface_ble, BLE_BUFFER)
# start the event loop
loop.run()

View File

@ -23,7 +23,6 @@ reads the message's header. When the message type is known the first handler is
"""
from micropython import const
from typing import TYPE_CHECKING
from trezor import log, loop, protobuf, utils
@ -37,10 +36,6 @@ from .message_handler import failure
# other packages.
from .errors import * # isort:skip # noqa: F401,F403
_PROTOBUF_BUFFER_SIZE = const(8192)
WIRE_BUFFER = bytearray(_PROTOBUF_BUFFER_SIZE)
if TYPE_CHECKING:
from trezorio import WireInterface
from typing import Any, Callable, Coroutine, TypeVar
@ -52,13 +47,13 @@ if TYPE_CHECKING:
LoadedMessageType = TypeVar("LoadedMessageType", bound=protobuf.MessageType)
def setup(iface: WireInterface) -> None:
def setup(iface: WireInterface, buffer: bytearray) -> None:
"""Initialize the wire stack on the provided WireInterface."""
loop.schedule(handle_session(iface))
loop.schedule(handle_session(iface, buffer))
async def handle_session(iface: WireInterface) -> None:
ctx = CodecContext(iface, WIRE_BUFFER)
async def handle_session(iface: WireInterface, buffer: bytearray) -> None:
ctx = CodecContext(iface, buffer)
next_msg: protocol_common.Message | None = None
# Take a mark of modules that are imported at this point, so we can