1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-04-16 15:25:47 +00:00

fix merge issues

This commit is contained in:
M1nd3r 2025-03-11 13:35:48 +01:00
parent 28eaa8552d
commit 564965d2d6
6 changed files with 50 additions and 52 deletions

View File

@ -29,14 +29,14 @@ workflow.start_default()
# initialize the wire codec
wire.setup(usb.iface_wire, WIRE_BUFFER)
wire.setup(usb.iface_wire)
if utils.USE_BLE:
import bluetooth
BLE_BUFFER = bytearray(_PROTOBUF_BUFFER_SIZE)
wire.setup(bluetooth.iface_ble, BLE_BUFFER)
wire.setup(bluetooth.iface_ble)
# start the event loop

View File

@ -23,6 +23,7 @@ reads the message's header. When the message type is known the first handler is
"""
from micropython import const
from typing import TYPE_CHECKING
from trezor import log, loop, protobuf, utils
@ -52,9 +53,9 @@ if TYPE_CHECKING:
LoadedMessageType = TypeVar("LoadedMessageType", bound=protobuf.MessageType)
def setup(iface: WireInterface, buffer: bytearray) -> None:
def setup(iface: WireInterface) -> None:
"""Initialize the wire stack on the provided WireInterface."""
loop.schedule(handle_session(iface, buffer))
loop.schedule(handle_session(iface))
if utils.USE_THP:

View File

@ -1,45 +1,43 @@
import typing as t
from .. import messages
from ..tools import session
if t.TYPE_CHECKING:
from ..client import TrezorClient
from ..transport.session import Session
@session
def update(
client: "TrezorClient",
session: "Session",
datfile: bytes,
binfile: bytes,
progress_update: t.Callable[[int], t.Any] = lambda _: None,
):
chunk_len = 4096
offset = 0
raise NotImplementedError()
# chunk_len = 4096
# offset = 0
resp = client.call(
messages.UploadBLEFirmwareInit(init_data=datfile, binsize=len(binfile))
)
# resp = session.call(
# messages.UploadBLEFirmwareInit(init_data=datfile, binsize=len(binfile))
# )
while isinstance(resp, messages.UploadBLEFirmwareNextChunk):
# while isinstance(resp, messages.UploadBLEFirmwareNextChunk):
payload = binfile[offset : offset + chunk_len]
resp = client.call(messages.UploadBLEFirmwareChunk(data=payload))
progress_update(chunk_len)
offset += chunk_len
# payload = binfile[offset : offset + chunk_len]
# resp = session.call(messages.UploadBLEFirmwareChunk(data=payload))
# progress_update(chunk_len)
# offset += chunk_len
if isinstance(resp, messages.Success):
return
else:
raise RuntimeError(f"Unexpected message {resp}")
# if isinstance(resp, messages.Success):
# return
# else:
# raise RuntimeError(f"Unexpected message {resp}")
@session
def erase_bonds(
client: "TrezorClient",
session: "Session",
):
resp = client.call(messages.EraseBonds())
resp = session.call(messages.EraseBonds())
if isinstance(resp, messages.Success):
return
@ -47,12 +45,11 @@ def erase_bonds(
raise RuntimeError(f"Unexpected message {resp}")
@session
def unpair(
client: "TrezorClient",
session: "Session",
):
resp = client.call(messages.Unpair())
resp = session.call(messages.Unpair())
if isinstance(resp, messages.Success):
return
@ -60,11 +57,10 @@ def unpair(
raise RuntimeError(f"Unexpected message {resp}")
@session
def disconnect(
client: "TrezorClient",
session: "Session",
):
resp = client.call(messages.Disconnect())
resp = session.call(messages.Disconnect())
if isinstance(resp, messages.Success):
return

View File

@ -23,10 +23,10 @@ import click
from .. import ble, exceptions
from ..transport.ble import BleProxy
from . import with_client
from . import with_session
if TYPE_CHECKING:
from ..client import TrezorClient
from ..transport.session import Session
@click.group(name="ble")
@ -38,9 +38,9 @@ def cli() -> None:
# fmt: off
@click.argument("package", type=click.File("rb"))
# fmt: on
@with_client
@with_session
def update(
client: "TrezorClient",
session: "Session",
package: BinaryIO,
) -> None:
"""Upload new BLE firmware to device."""
@ -60,7 +60,7 @@ def update(
with click.progressbar(
label="Uploading", length=len(binfile), show_eta=False
) as bar:
ble.update(client, datfile, binfile, bar.update)
ble.update(session, datfile, binfile, bar.update)
click.echo("Update successful.")
except exceptions.Cancelled:
click.echo("Update aborted on device.")
@ -94,11 +94,10 @@ def connect() -> None:
click.echo("Connected")
@with_client
def disconnect_device(client: "TrezorClient") -> None:
def disconnect_device(session: "Session") -> None:
"""Disconnect from device side."""
try:
ble.disconnect(client)
ble.disconnect(session)
except exceptions.Cancelled:
click.echo("Disconnect aborted on device.")
except exceptions.TrezorException as e:
@ -108,10 +107,11 @@ def disconnect_device(client: "TrezorClient") -> None:
@cli.command()
@click.option("--device", is_flag=True, help="Disconnect from device side.")
def disconnect(device: bool) -> None:
@with_session
def disconnect(session: "Session", device: bool) -> None:
if device:
disconnect_device()
disconnect_device(session)
else:
ble_proxy = BleProxy()
devices = [d for d in ble_proxy.lookup() if d.connected]
@ -123,14 +123,14 @@ def disconnect(device: bool) -> None:
@cli.command()
@with_client
@with_session
def erase_bonds(
client: "TrezorClient",
session: "Session",
) -> None:
"""Erase BLE bonds on device."""
try:
ble.erase_bonds(client)
ble.erase_bonds(session)
click.echo("Erase successful.")
except exceptions.Cancelled:
click.echo("Erase aborted on device.")
@ -140,14 +140,14 @@ def erase_bonds(
@cli.command()
@with_client
@with_session
def unpair(
client: "TrezorClient",
session: "Session",
) -> None:
"""Erase bond of currently connected device. (on device side)"""
try:
ble.unpair(client)
ble.unpair(session)
click.echo("Unpair successful.")
except exceptions.Cancelled:
click.echo("Unapair aborted on device.")

View File

@ -92,6 +92,7 @@ class Transport:
def all_transports() -> t.Iterable[t.Type["Transport"]]:
from .ble import BleTransport
from .bridge import BridgeTransport
from .hid import HidTransport
from .udp import UdpTransport

View File

@ -20,8 +20,7 @@ from multiprocessing import Pipe, Process
from multiprocessing.connection import Connection
from typing import TYPE_CHECKING, Any, Iterable, List, Optional
from . import TransportException
from .protocol import ProtocolBasedTransport, ProtocolV1
from . import Transport, TransportException
if TYPE_CHECKING:
from ..models import TrezorModel
@ -40,15 +39,16 @@ class Device:
connected: bool
class BleTransport(ProtocolBasedTransport):
ENABLED = True
class BleTransport(Transport):
PATH_PREFIX = "ble"
ENABLED = True
CHUNK_SIZE = 244
_ble = None
def __init__(self, mac_addr: str) -> None:
self.device = mac_addr
super().__init__(protocol=ProtocolV1(self, replen=244))
super().__init__()
def get_path(self) -> str:
return "{}:{}".format(self.PATH_PREFIX, self.device)