tychovrahe/T3W1/devkit1_with_ble_sq3
tychovrahe 12 months ago
parent c8e6b029ac
commit 2c6a995b03

@ -1,4 +1,4 @@
check: messages.pb messages-binance.pb messages-bitcoin.pb messages-bootloader.pb messages-cardano.pb messages-common.pb messages-crypto.pb messages-debug.pb messages-ethereum.pb messages-management.pb messages-monero.pb messages-nem.pb messages-ripple.pb messages-stellar.pb messages-tezos.pb messages-eos.pb
check: messages.pb messages-binance.pb messages-bitcoin.pb messages-ble.pb messages-bootloader.pb messages-cardano.pb messages-common.pb messages-crypto.pb messages-debug.pb messages-ethereum.pb messages-management.pb messages-monero.pb messages-nem.pb messages-ripple.pb messages-stellar.pb messages-tezos.pb messages-eos.pb
%.pb: %.proto
protoc -I/usr/include -I. $< -o $@

@ -72,6 +72,11 @@ inotify = "*"
yamllint = "^1.25.0"
vulture = "^2.6"
# ble
dbus-python = "*"
PyGObject = "*"
nrfutil = "*"
[tool.poetry.dev-dependencies]
scan-build = "*"
towncrier = "^21.9.0"

@ -8,3 +8,6 @@ typing_extensions>=3.10
dataclasses ; python_version<'3.7'
simple-rlp>=0.1.2 ; python_version>='3.7'
construct-classes>=0.1.2
dbus-python>=1.3.2
pygobject>=3.44.1
nrfutil>=5.0.0

@ -25,7 +25,7 @@ per-file-ignores =
helper-scripts/*:I
tools/*:I
tests/*:I
known-modules = libusb1:[usb1],hidapi:[hid],PyQt5:[PyQt5.QtWidgets,PyQt5.QtGui,PyQt5.QtCore],simple-rlp:[rlp]
known-modules = libusb1:[usb1],hidapi:[hid],PyQt5:[PyQt5.QtWidgets,PyQt5.QtGui,PyQt5.QtCore],simple-rlp:[rlp],dbus-python:[dbus]
[isort]
multi_line_output = 3

@ -0,0 +1,59 @@
import typing as t
from .. import messages
from ..tools import session
if t.TYPE_CHECKING:
from ..client import TrezorClient
@session
def update(
client: "TrezorClient",
datfile: bytes,
binfile: bytes,
progress_update: t.Callable[[int], t.Any] = lambda _: None,
):
chunk_len = 4096
offset = 0
resp = client.call(
messages.UploadBLEFirmwareInit(init_data=datfile, binsize=len(binfile))
)
while isinstance(resp, messages.UploadBLEFirmwareNextChunk):
payload = binfile[offset : offset + chunk_len]
resp = client.call(messages.UploadBLEFirmwareChunk(data=payload))
progress_update(chunk_len)
offset += chunk_len
if isinstance(resp, messages.Success):
return
else:
raise RuntimeError(f"Unexpected message {resp}")
@session
def erase_bonds(
client: "TrezorClient",
):
resp = client.call(messages.EraseBonds())
if isinstance(resp, messages.Success):
return
else:
raise RuntimeError(f"Unexpected message {resp}")
@session
def disconnect(
client: "TrezorClient",
):
resp = client.call(messages.Disconnect())
if isinstance(resp, messages.Success):
return
else:
raise RuntimeError(f"Unexpected message {resp}")

@ -16,12 +16,17 @@
import functools
import sys
import threading
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
import click
import dbus
import dbus.mainloop.glib
import dbus.service
from gi.repository import GLib
from .. import exceptions, transport
from .. import exceptions, messages, transport
from ..client import TrezorClient
from ..ui import ClickUI, ScriptUI
@ -103,6 +108,12 @@ class TrezorConnection:
except transport.DeviceIsBusy:
click.echo("Device is in use by another process.")
sys.exit(1)
except exceptions.TrezorFailure as e:
if e.code is messages.FailureType.DeviceIsBusy:
click.echo(str(e))
sys.exit(1)
else:
raise e
except Exception:
click.echo("Failed to find a Trezor device.")
if self.path is not None:
@ -134,26 +145,95 @@ def with_client(func: "Callable[Concatenate[TrezorClient, P], R]") -> "Callable[
def trezorctl_command_with_client(
obj: TrezorConnection, *args: "P.args", **kwargs: "P.kwargs"
) -> "R":
with obj.client_context() as client:
session_was_resumed = obj.session_id == client.session_id
if not session_was_resumed and obj.session_id is not None:
# tried to resume but failed
click.echo("Warning: failed to resume session.", err=True)
loop = GLib.MainLoop()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
def callback_wrapper(
r: List[Optional["R"]], exc: List[Optional[Exception]]
) -> None:
try:
return func(client, *args, **kwargs)
finally:
if not session_was_resumed:
with obj.client_context() as client:
session_was_resumed = obj.session_id == client.session_id
if not session_was_resumed and obj.session_id is not None:
# tried to resume but failed
click.echo("Warning: failed to resume session.", err=True)
try:
client.end_session()
except Exception:
pass
r.append(func(client, *args, **kwargs))
except Exception as e:
exc[0] = e
finally:
if not session_was_resumed:
try:
client.end_session()
except Exception:
pass
except Exception as e:
exc[0] = e
finally:
loop.quit()
result: List["R"] = []
exc: List[Optional[Exception]] = [None]
threading.Thread(
target=callback_wrapper, daemon=True, args=(result, exc)
).start()
loop.run()
if exc[0] is not None:
raise exc[0]
if len(result) == 0:
raise click.ClickException("Command did not return a result.")
return result[0]
# the return type of @click.pass_obj is improperly specified and pyright doesn't
# understand that it converts f(obj, *args, **kwargs) to f(*args, **kwargs)
return trezorctl_command_with_client # type: ignore [cannot be assigned to return type]
def with_ble(func: "Callable[P, R]") -> "Callable[P, R]":
"""Wrap a Click command in `with obj.client_context() as client`.
Sessions are handled transparently. The user is warned when session did not resume
cleanly. The session is closed after the command completes - unless the session
was resumed, in which case it should remain open.
"""
@functools.wraps(func)
def trezorctl_command(*args: "P.args", **kwargs: "P.kwargs") -> "R":
loop = GLib.MainLoop()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
def callback_wrapper(r: List[Optional["R"]], exc: List[Optional[Exception]]):
try:
r.append(func(*args, **kwargs))
except Exception as e:
exc[0] = e
finally:
loop.quit()
result: List["R"] = []
exc: List[Optional[Exception]] = [None]
threading.Thread(
target=callback_wrapper, daemon=True, args=(result, exc)
).start()
loop.run()
if exc[0] is not None:
raise exc[0]
if len(result) == 0:
raise click.ClickException("Command did not return a result.")
return result[0]
return trezorctl_command
class AliasedGroup(click.Group):
"""Command group that handles aliases and Click 6.x compatibility.

@ -0,0 +1,131 @@
# This file is part of the Trezor project.
#
# Copyright (C) 2012-2022 SatoshiLabs and contributors
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License version 3
# as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the License along with this library.
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
import sys
import zipfile
from typing import TYPE_CHECKING, BinaryIO
import click
from .. import ble, exceptions, tealblue
from ..transport.ble import lookup_device, scan_device
from . import with_ble, with_client
if TYPE_CHECKING:
from ..client import TrezorClient
@click.group(name="ble")
def cli() -> None:
"""BLE commands."""
@cli.command()
# fmt: off
@click.argument("package", type=click.File("rb"))
# fmt: on
@with_client
def update(
client: "TrezorClient",
package: BinaryIO,
) -> None:
"""Upload new BLE firmware to device."""
with zipfile.ZipFile(package) as archive:
binfile = archive.read("ble_firmware.bin")
datfile = archive.read("ble_firmware.dat")
"""Perform the final act of loading the firmware into Trezor."""
try:
click.echo("Uploading...\r", nl=False)
with click.progressbar(
label="Uploading", length=len(binfile), show_eta=False
) as bar:
ble.update(client, datfile, binfile, bar.update)
click.echo("Update successful.")
except exceptions.Cancelled:
click.echo("Update aborted on device.")
except exceptions.TrezorException as e:
click.echo(f"Update failed: {e}")
sys.exit(3)
@cli.command()
@with_ble
def connect() -> None:
"""Connect to the device via BLE."""
adapter = tealblue.TealBlue().find_adapter()
devices = lookup_device(adapter)
devices = [d for d in devices if d.connected]
if len(devices) == 0:
print("Scanning...")
devices = scan_device(adapter, devices)
if len(devices) == 0:
print("No BLE devices found")
return
else:
print("Found %d BLE device(s)" % len(devices))
for device in devices:
print(f"Device: {device.name}, {device.address}")
device = devices[0]
print(f"Connecting to {device.name}...")
device.connect()
print("Connected")
@cli.command()
@click.option("--device", is_flag=True, help="Disconnect from device side.")
@with_client
def disconnect(client: "TrezorClient", device: bool) -> None:
"""Connect to the device via BLE."""
adapter = tealblue.TealBlue().find_adapter()
devices = lookup_device(adapter)
devices = [d for d in devices if d.connected]
if len(devices) == 0:
print("No device is connected")
for d in devices:
if device:
ble.disconnect(client)
else:
d.disconnect()
print(f"Device {d.name}, {d.address}, disconnected.")
@cli.command()
@with_client
def erase_bonds(
client: "TrezorClient",
) -> None:
"""Erase BLE bonds on device."""
try:
ble.erase_bonds(client)
click.echo("Erase successful.")
except exceptions.Cancelled:
click.echo("Erase aborted on device.")
except exceptions.TrezorException as e:
click.echo(f"Update failed: {e}")
sys.exit(3)

@ -32,6 +32,7 @@ from . import (
AliasedGroup,
TrezorConnection,
binance,
ble,
btc,
cardano,
cosi,
@ -48,6 +49,7 @@ from . import (
settings,
stellar,
tezos,
with_ble,
with_client,
)
@ -86,6 +88,7 @@ COMMAND_ALIASES = {
"upgrade-firmware": firmware.update,
"firmware-upgrade": firmware.update,
"firmware-update": firmware.update,
"ble-update": ble.update,
}
@ -279,6 +282,7 @@ def format_device_name(features: messages.Features) -> str:
@cli.command(name="list")
@with_ble
@click.option("-n", "no_resolve", is_flag=True, help="Do not resolve Trezor names")
def list_devices(no_resolve: bool) -> Optional[Iterable["Transport"]]:
"""List connected Trezor devices."""
@ -415,6 +419,7 @@ cli.add_command(tezos.cli)
cli.add_command(firmware.cli)
cli.add_command(debug.cli)
cli.add_command(ble.cli)
#
# Main

@ -81,6 +81,15 @@ class MessageType(IntEnum):
FirmwareUpload = 7
FirmwareRequest = 8
SelfTest = 32
UploadBLEFirmwareInit = 8000
UploadBLEFirmwareNextChunk = 8001
UploadBLEFirmwareChunk = 8002
PairingRequest = 8003
AuthKey = 8004
RepairRequest = 8005
EraseBonds = 8006
Disconnect = 8007
ComparisonRequest = 8008
GetPublicKey = 11
PublicKey = 12
SignTx = 15
@ -276,6 +285,7 @@ class FailureType(IntEnum):
PinMismatch = 12
WipeCodeMismatch = 13
InvalidSession = 14
DeviceIsBusy = 15
FirmwareError = 99
@ -2049,6 +2059,95 @@ class TxAckPrevExtraDataWrapper(protobuf.MessageType):
self.extra_data_chunk = extra_data_chunk
class UploadBLEFirmwareInit(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 8000
FIELDS = {
1: protobuf.Field("init_data", "bytes", repeated=False, required=True),
2: protobuf.Field("binsize", "uint32", repeated=False, required=True),
}
def __init__(
self,
*,
init_data: "bytes",
binsize: "int",
) -> None:
self.init_data = init_data
self.binsize = binsize
class UploadBLEFirmwareNextChunk(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 8001
FIELDS = {
1: protobuf.Field("offset", "uint32", repeated=False, required=True),
}
def __init__(
self,
*,
offset: "int",
) -> None:
self.offset = offset
class UploadBLEFirmwareChunk(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 8002
FIELDS = {
1: protobuf.Field("data", "bytes", repeated=False, required=True),
}
def __init__(
self,
*,
data: "bytes",
) -> None:
self.data = data
class EraseBonds(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 8006
class Disconnect(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 8007
class PairingRequest(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 8003
class AuthKey(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 8004
FIELDS = {
1: protobuf.Field("key", "bytes", repeated=False, required=True),
}
def __init__(
self,
*,
key: "bytes",
) -> None:
self.key = key
class RepairRequest(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 8005
class ComparisonRequest(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 8008
FIELDS = {
1: protobuf.Field("key", "bytes", repeated=False, required=True),
}
def __init__(
self,
*,
key: "bytes",
) -> None:
self.key = key
class FirmwareErase(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 6
FIELDS = {

@ -0,0 +1,457 @@
# !/usr/bin/python3
# pyright: off
import queue
import threading
import time
import dbus
import dbus.mainloop.glib
import dbus.service
class NotConnectedError(Exception):
pass
class DBusInvalidArgsException(dbus.exceptions.DBusException):
_dbus_error_name = "org.freedesktop.DBus.Error.InvalidArgs"
def format_uuid(uuid):
if type(uuid) == int:
if uuid > 0xFFFF:
raise ValueError("32-bit UUID not supported yet")
uuid = "%04X" % uuid
return uuid
class TealBlue:
def __init__(self):
self._bus = dbus.SystemBus()
self._bluez = dbus.Interface(
self._bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager"
)
def find_adapter(self):
# find the first adapter
objects = self._bluez.GetManagedObjects()
for path in sorted(objects.keys()):
interfaces = objects[path]
if "org.bluez.Adapter1" not in interfaces:
continue
properties = interfaces["org.bluez.Adapter1"]
return Adapter(self, path, properties)
raise Exception("No adapter found")
# copied from:
# https://github.com/adafruit/Adafruit_Python_BluefruitLE/blob/master/Adafruit_BluefruitLE/bluez_dbus/provider.py
def _print_tree(self):
"""Print tree of all bluez objects, useful for debugging."""
# This is based on the bluez sample code get-managed-objects.py.
objects = self._bluez.GetManagedObjects()
for path in sorted(objects.keys()):
print("[ %s ]" % (path))
interfaces = objects[path]
for interface in sorted(interfaces.keys()):
if interface in [
"org.freedesktop.DBus.Introspectable",
"org.freedesktop.DBus.Properties",
]:
continue
print(" %s" % (interface))
properties = interfaces[interface]
for key in sorted(properties.keys()):
print(" %s = %s" % (key, properties[key]))
class Adapter:
def __init__(self, teal, path, properties):
self._teal = teal
self._path = path
self._properties = properties
self._object = dbus.Interface(
teal._bus.get_object("org.bluez", path), "org.bluez.Adapter1"
)
self._advertisement = None
def __repr__(self):
return "<tealblue.Adapter address=%s>" % (self._properties["Address"])
def devices(self):
"""
Returns the devices that BlueZ has discovered.
"""
objects = self._teal._bluez.GetManagedObjects()
for path in sorted(objects.keys()):
interfaces = objects[path]
if "org.bluez.Device1" not in interfaces:
continue
properties = interfaces["org.bluez.Device1"]
yield Device(self._teal, path, properties)
def scan(self, timeout_s):
return Scanner(self._teal, self, self.devices(), timeout_s)
@property
def advertisement(self):
if self._advertisement is None:
self._advertisement = Advertisement(self._teal, self)
return self._advertisement
def advertise(self, enable):
if enable:
self.advertisement.enable()
else:
self.advertisement.disable()
def advertise_data(
self,
local_name=None,
service_data=None,
service_uuids=None,
manufacturer_data=None,
):
self.advertisement.local_name = local_name
self.advertisement.service_data = service_data
self.advertisement.service_uuids = service_uuids
self.advertisement.manufacturer_data = manufacturer_data
class Scanner:
def __init__(self, teal, adapter, initial_devices, timeout_s):
self._teal = teal
self._adapter = adapter
self._was_discovering = adapter._properties[
"Discovering"
] # TODO get current value, or watch property changes
self._queue = queue.Queue()
self.timeout_s = timeout_s
for device in initial_devices:
self._queue.put(device)
def new_device(path, interfaces):
if "org.bluez.Device1" not in interfaces:
return
if not path.startswith(self._adapter._path + "/"):
return
# properties = interfaces["org.bluez.Device1"]
self._queue.put(Device(self._teal, path, interfaces["org.bluez.Device1"]))
self._signal_receiver = self._teal._bus.add_signal_receiver(
new_device,
dbus_interface="org.freedesktop.DBus.ObjectManager",
signal_name="InterfacesAdded",
)
if not self._was_discovering:
self._adapter._object.StartDiscovery()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
if not self._was_discovering:
self._adapter._object.StopDiscovery()
self._signal_receiver.remove()
def __iter__(self):
return self
def __next__(self):
try:
return self._queue.get(timeout=self.timeout_s)
except queue.Empty:
raise StopIteration
class Device:
def __init__(self, teal, path, properties):
self._teal = teal
self._path = path
self._properties = properties
self._services_resolved = threading.Event()
self._services = None
if properties["ServicesResolved"]:
self._services_resolved.set()
# Listen to device events (connect, disconnect, ServicesResolved, ...)
self._device = dbus.Interface(
teal._bus.get_object("org.bluez", path), "org.bluez.Device1"
)
self._device_props = dbus.Interface(
self._device, "org.freedesktop.DBus.Properties"
)
self._signal_receiver = self._device_props.connect_to_signal(
"PropertiesChanged",
lambda itf, ch, inv: self._on_prop_changed(itf, ch, inv),
)
def __del__(self):
self._signal_receiver.remove()
def __repr__(self):
return "<tealblue.Device address=%s name=%r>" % (self.address, self.name)
def _on_prop_changed(self, properties, changed_props, invalidated_props):
for key, value in changed_props.items():
self._properties[key] = value
if "ServicesResolved" in changed_props:
if changed_props["ServicesResolved"]:
self._services_resolved.set()
else:
self._services_resolved.clear()
def _wait_for_discovery(self):
# wait until ServicesResolved is True
self._services_resolved.wait()
def connect(self):
self._device.Connect()
def disconnect(self):
self._device.Disconnect()
def resolve_services(self):
self._services_resolved.wait()
@property
def services(self):
if not self._services_resolved.is_set():
return None
if self._services is None:
self._services = {}
objects = self._teal._bluez.GetManagedObjects()
for path in sorted(objects.keys()):
if not path.startswith(self._path + "/"):
continue
if "org.bluez.GattService1" in objects[path]:
properties = objects[path]["org.bluez.GattService1"]
service = Service(self._teal, self, path, properties)
self._services[service.uuid] = service
elif "org.bluez.GattCharacteristic1" in objects[path]:
properties = objects[path]["org.bluez.GattCharacteristic1"]
characterstic = Characteristic(self._teal, self, path, properties)
for service in self._services.values():
if properties["Service"] == service._path:
service.characteristics[characterstic.uuid] = characterstic
return self._services
@property
def connected(self):
return bool(self._properties["Connected"])
@property
def services_resolved(self):
return bool(self._properties["ServicesResolved"])
@property
def UUIDs(self):
return [str(s) for s in self._properties["UUIDs"]]
@property
def address(self):
return str(self._properties["Address"])
@property
def name(self):
if "Name" not in self._properties:
return None
return str(self._properties["Name"])
@property
def alias(self):
if "Alias" not in self._properties:
return None
return str(self._properties["Alias"])
class Service:
def __init__(self, teal, device, path, properties):
self._device = device
self._teal = teal
self._path = path
self._properties = properties
self.characteristics = {}
def __repr__(self):
return "<tealblue.Service device=%s uuid=%s>" % (
self._device.address,
self.uuid,
)
@property
def uuid(self):
return str(self._properties["UUID"])
class Characteristic:
def __init__(self, teal, device, path, properties):
self._device = device
self._teal = teal
self._path = path
self._properties = properties
self.on_notify = None
self._char = dbus.Interface(
teal._bus.get_object("org.bluez", path), "org.bluez.GattCharacteristic1"
)
char_props = dbus.Interface(self._char, "org.freedesktop.DBus.Properties")
self._signal_receiver = char_props.connect_to_signal(
"PropertiesChanged",
lambda itf, ch, inv: self._on_prop_changed(itf, ch, inv),
)
def __repr__(self):
return "<tealblue.Characteristic device=%s uuid=%s>" % (
self._device.address,
self.uuid,
)
def __del__(self):
self._signal_receiver.remove()
def _on_prop_changed(self, properties, changed_props, invalidated_props):
for key, value in changed_props.items():
self._properties[key] = bytes(value)
if "Value" in changed_props and self.on_notify is not None:
self.on_notify(self, changed_props["Value"])
def read(self):
return bytes(self._char.ReadValue({}))
def write(self, value, command=True):
start = time.time()
try:
if command:
self._char.WriteValue(value, {"type": "command"})
else:
self._char.WriteValue(value, {"type": "request"})
except dbus.DBusException as e:
if (
e.get_dbus_name() == "org.bluez.Error.Failed"
and e.get_dbus_message() == "Not connected"
):
raise NotConnectedError()
else:
raise # some other error
# Workaround: if the write took very long, it is possible the connection
# broke (without causing an exception). So check whether we are still
# connected.
# I think this is a bug in BlueZ.
if time.time() - start > 0.5:
if not self._device._device_props.Get("org.bluez.Device1", "Connected"):
raise NotConnectedError()
def start_notify(self):
self._char.StartNotify()
def stop_notify(self):
self._char.StopNotify()
@property
def uuid(self):
return str(self._properties["UUID"])
class Advertisement(dbus.service.Object):
PATH = "/com/github/aykevl/pynus/advertisement"
def __init__(self, teal, adapter):
self._teal = teal
self._adapter = adapter
self._enabled = False
self.service_uuids = None
self.manufacturer_data = None
self.solicit_uuids = None
self.service_data = None
self.local_name = None
self.include_tx_power = None
self._manager = dbus.Interface(
teal._bus.get_object("org.bluez", self._adapter._path),
"org.bluez.LEAdvertisingManager1",
)
self._adv_enabled = threading.Event()
dbus.service.Object.__init__(self, teal._bus, self.PATH)
def enable(self):
if self._enabled:
return
self._manager.RegisterAdvertisement(
dbus.ObjectPath(self.PATH),
dbus.Dictionary({}, signature="sv"),
reply_handler=self._cb_enabled,
error_handler=self._cb_enabled_err,
)
self._adv_enabled.wait()
self._adv_enabled.clear()
def _cb_enabled(self):
self._enabled = True
if self._adv_enabled.is_set():
raise RuntimeError("called enable() twice")
self._adv_enabled.set()
def _cb_enabled_err(self, err):
self._enabled = False
if self._adv_enabled.is_set():
raise RuntimeError("called enable() twice")
self._adv_enabled.set()
def disable(self):
if not self._enabled:
return
self._bus.UnregisterAdvertisement(self.PATH)
self._enabled = False
@property
def enabled(self):
return self._enabled
@dbus.service.method(
"org.freedesktop.DBus.Properties", in_signature="s", out_signature="a{sv}"
)
def GetAll(self, interface):
print("GetAll")
if interface != "org.bluez.LEAdvertisement1":
raise DBusInvalidArgsException()
try:
properties = {
"Type": dbus.String("peripheral"),
}
if self.service_uuids is not None:
properties["ServiceUUIDs"] = dbus.Array(
map(format_uuid, self.service_uuids), signature="s"
)
if self.solicit_uuids is not None:
properties["SolicitUUIDs"] = dbus.Array(
map(format_uuid, self.solicit_uuids), signature="s"
)
if self.manufacturer_data is not None:
properties["ManufacturerData"] = dbus.Dictionary(
{k: v for k, v in self.manufacturer_data.items()}, signature="qv"
)
if self.service_data is not None:
properties["ServiceData"] = dbus.Dictionary(
self.service_data, signature="sv"
)
if self.local_name is not None:
properties["LocalName"] = dbus.String(self.local_name)
if self.include_tx_power is not None:
properties["IncludeTxPower"] = dbus.Boolean(self.include_tx_power)
except Exception as e:
print("err: ", e)
print("properties:", properties)
return properties
@dbus.service.method(
"org.bluez.LEAdvertisement1", in_signature="", out_signature=""
)
def Release(self):
self._enabled = True

@ -117,12 +117,14 @@ def all_transports() -> Iterable[Type["Transport"]]:
from .hid import HidTransport
from .udp import UdpTransport
from .webusb import WebUsbTransport
from .ble import BleTransport
transports: Tuple[Type["Transport"], ...] = (
BridgeTransport,
HidTransport,
UdpTransport,
WebUsbTransport,
BleTransport,
)
return set(t for t in transports if t.ENABLED)

@ -0,0 +1,154 @@
# This file is part of the Trezor project.
#
# Copyright (C) 2012-2022 SatoshiLabs and contributors
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License version 3
# as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the License along with this library.
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
import logging
from queue import Queue
from typing import TYPE_CHECKING, Any, Iterable, List, Optional
from .. import tealblue
from ..tealblue import Adapter, Characteristic
from . import TransportException
from .protocol import ProtocolBasedTransport, ProtocolV1
if TYPE_CHECKING:
from ..models import TrezorModel
LOG = logging.getLogger(__name__)
NUS_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
NUS_CHARACTERISTIC_RX = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"
NUS_CHARACTERISTIC_TX = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"
def scan_device(adapter: Adapter, devices: List[tealblue.Device]):
with adapter.scan(2) as scanner:
for device in scanner:
if NUS_SERVICE_UUID in device.UUIDs:
if device.address not in [d.address for d in devices]:
print(f"Found device: {device.name}, {device.address}")
devices.append(device)
return devices
def lookup_device(adapter: Adapter):
devices = []
for device in adapter.devices():
if NUS_SERVICE_UUID in device.UUIDs:
devices.append(device)
return devices
class BleTransport(ProtocolBasedTransport):
ENABLED = True
PATH_PREFIX = "ble"
def __init__(self, mac_addr: str, adapter: Adapter) -> None:
self.tx = None
self.rx = None
self.device = mac_addr
self.adapter = adapter
self.received_data = Queue()
devices = lookup_device(self.adapter)
for d in devices:
if d.address == mac_addr:
self.ble_device = d
break
super().__init__(protocol=ProtocolV1(self, replen=244))
def get_path(self) -> str:
return "{}:{}".format(self.PATH_PREFIX, self.device)
def find_debug(self) -> "BleTransport":
mac = self.device
return BleTransport(f"{mac}", self.adapter)
@classmethod
def enumerate(
cls, _models: Optional[Iterable["TrezorModel"]] = None
) -> Iterable["BleTransport"]:
adapter = tealblue.TealBlue().find_adapter()
devices = lookup_device(adapter)
for device in devices:
print(f"Device: {device.name}, {device.address}")
devices = [d for d in devices if d.connected]
return [BleTransport(device.address, adapter) for device in devices]
@classmethod
def _try_path(cls, path: str) -> "BleTransport":
devices = cls.enumerate(None)
devices = [d for d in devices if d.device == path]
if len(devices) == 0:
raise TransportException(f"No BLE device: {path}")
return devices[0]
@classmethod
def find_by_path(cls, path: str, prefix_search: bool = False) -> "BleTransport":
if not prefix_search:
raise TransportException
if prefix_search:
return super().find_by_path(path, prefix_search)
else:
raise TransportException(f"No BLE device: {path}")
def open(self) -> None:
if not self.ble_device.connected:
print(
"Connecting to %s (%s)..."
% (self.ble_device.name, self.ble_device.address)
)
self.ble_device.connect()
else:
print(
"Connected to %s (%s)."
% (self.ble_device.name, self.ble_device.address)
)
if not self.ble_device.services_resolved:
print("Resolving services...")
self.ble_device.resolve_services()
service = self.ble_device.services[NUS_SERVICE_UUID]
self.rx = service.characteristics[NUS_CHARACTERISTIC_RX]
self.tx = service.characteristics[NUS_CHARACTERISTIC_TX]
def on_notify(characteristic: Characteristic, value: Any):
self.received_data.put(bytes(value))
self.tx.on_notify = on_notify
self.tx.start_notify()
def close(self) -> None:
pass
def write_chunk(self, chunk: bytes) -> None:
assert self.rx is not None
self.rx.write(chunk)
def read_chunk(self) -> bytes:
assert self.tx is not None
chunk = self.received_data.get()
# LOG.log(DUMP_PACKETS, f"received packet: {chunk.hex()}")
if len(chunk) != 64:
raise TransportException(f"Unexpected chunk size: {len(chunk)}")
return bytearray(chunk)

@ -75,8 +75,9 @@ class Protocol:
its messages.
"""
def __init__(self, handle: Handle) -> None:
def __init__(self, handle: Handle, replen: int = REPLEN) -> None:
self.handle = handle
self.replen = replen
self.session_counter = 0
# XXX we might be able to remove this now that TrezorClient does session handling
@ -133,10 +134,10 @@ class ProtocolV1(Protocol):
while buffer:
# Report ID, data padded to 63 bytes
chunk = b"?" + buffer[: REPLEN - 1]
chunk = chunk.ljust(REPLEN, b"\x00")
chunk = b"?" + buffer[: self.replen - 1]
chunk = chunk.ljust(self.replen, b"\x00")
self.handle.write_chunk(chunk)
buffer = buffer[63:]
buffer = buffer[self.replen - 1 :]
def read(self) -> MessagePayload:
buffer = bytearray()

@ -61,6 +61,8 @@ class WebUsbHandle:
self.handle.claimInterface(self.interface)
except usb1.USBErrorAccess as e:
raise DeviceIsBusy(self.device) from e
except usb1.USBErrorBusy as e:
raise DeviceIsBusy(self.device) from e
def close(self) -> None:
if self.handle is not None:

@ -12,3 +12,4 @@
^\./crypto/sha3
^\./legacy/vendor
^\./core/embed/segger
^\./core/embed/sdk

Loading…
Cancel
Save