From 31a550117b4357d7c19389452450a23c64deae95 Mon Sep 17 00:00:00 2001 From: tychovrahe Date: Wed, 7 Jun 2023 13:18:57 +0200 Subject: [PATCH] PYTHON --- common/protob/Makefile | 2 +- pyproject.toml | 5 + python/requirements.txt | 3 + python/setup.cfg | 2 +- python/src/trezorlib/ble/__init__.py | 59 +++ python/src/trezorlib/cli/__init__.py | 106 ++++- python/src/trezorlib/cli/ble.py | 132 ++++++ python/src/trezorlib/cli/trezorctl.py | 5 + python/src/trezorlib/messages.py | 99 +++++ python/src/trezorlib/tealblue.py | 457 +++++++++++++++++++++ python/src/trezorlib/transport/__init__.py | 2 + python/src/trezorlib/transport/ble.py | 154 +++++++ python/src/trezorlib/transport/protocol.py | 9 +- python/src/trezorlib/transport/webusb.py | 2 + 14 files changed, 1018 insertions(+), 19 deletions(-) create mode 100644 python/src/trezorlib/ble/__init__.py create mode 100644 python/src/trezorlib/cli/ble.py create mode 100755 python/src/trezorlib/tealblue.py create mode 100644 python/src/trezorlib/transport/ble.py diff --git a/common/protob/Makefile b/common/protob/Makefile index f8df2d2d5..c2feff5a8 100644 --- a/common/protob/Makefile +++ b/common/protob/Makefile @@ -1,4 +1,4 @@ -check: messages.pb messages-binance.pb messages-bitcoin.pb messages-bootloader.pb messages-cardano.pb messages-common.pb messages-crypto.pb messages-debug.pb messages-ethereum.pb messages-management.pb messages-monero.pb messages-nem.pb messages-ripple.pb messages-stellar.pb messages-tezos.pb messages-eos.pb +check: messages.pb messages-binance.pb messages-bitcoin.pb messages-ble.pb messages-bootloader.pb messages-cardano.pb messages-common.pb messages-crypto.pb messages-debug.pb messages-ethereum.pb messages-management.pb messages-monero.pb messages-nem.pb messages-ripple.pb messages-stellar.pb messages-tezos.pb messages-eos.pb %.pb: %.proto protoc -I/usr/include -I. $< -o $@ diff --git a/pyproject.toml b/pyproject.toml index 698bb75bc..e2878dfcc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,6 +76,11 @@ vulture = "^2.6" binsize = "^0.1.3" toiftool = {path = "./python/tools/toiftool", develop = true, python = ">=3.8"} +# ble +dbus-python = "*" +PyGObject = "*" +nrfutil = "*" + [tool.poetry.dev-dependencies] scan-build = "*" towncrier = "^23.6.0" diff --git a/python/requirements.txt b/python/requirements.txt index 916fad94f..1493b3e27 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -8,3 +8,6 @@ typing_extensions>=3.10 dataclasses ; python_version<'3.7' simple-rlp>=0.1.2 ; python_version>='3.7' construct-classes>=0.1.2 +dbus-python>=1.3.2 +pygobject>=3.44.1 +nrfutil>=5.0.0 diff --git a/python/setup.cfg b/python/setup.cfg index e801ad666..2889dd2e6 100644 --- a/python/setup.cfg +++ b/python/setup.cfg @@ -25,7 +25,7 @@ per-file-ignores = helper-scripts/*:I tools/*:I tests/*:I -known-modules = libusb1:[usb1],hidapi:[hid],PyQt5:[PyQt5.QtWidgets,PyQt5.QtGui,PyQt5.QtCore],simple-rlp:[rlp] +known-modules = libusb1:[usb1],hidapi:[hid],PyQt5:[PyQt5.QtWidgets,PyQt5.QtGui,PyQt5.QtCore],simple-rlp:[rlp],dbus-python:[dbus] [isort] profile = black diff --git a/python/src/trezorlib/ble/__init__.py b/python/src/trezorlib/ble/__init__.py new file mode 100644 index 000000000..be1c1eb68 --- /dev/null +++ b/python/src/trezorlib/ble/__init__.py @@ -0,0 +1,59 @@ +import typing as t + +from .. import messages +from ..tools import session + +if t.TYPE_CHECKING: + from ..client import TrezorClient + + +@session +def update( + client: "TrezorClient", + datfile: bytes, + binfile: bytes, + progress_update: t.Callable[[int], t.Any] = lambda _: None, +): + chunk_len = 4096 + offset = 0 + + resp = client.call( + messages.UploadBLEFirmwareInit(init_data=datfile, binsize=len(binfile)) + ) + + while isinstance(resp, messages.UploadBLEFirmwareNextChunk): + + payload = binfile[offset : offset + chunk_len] + resp = client.call(messages.UploadBLEFirmwareChunk(data=payload)) + progress_update(chunk_len) + offset += chunk_len + + if isinstance(resp, messages.Success): + return + else: + raise RuntimeError(f"Unexpected message {resp}") + + +@session +def erase_bonds( + client: "TrezorClient", +): + + resp = client.call(messages.EraseBonds()) + + if isinstance(resp, messages.Success): + return + else: + raise RuntimeError(f"Unexpected message {resp}") + + +@session +def disconnect( + client: "TrezorClient", +): + resp = client.call(messages.Disconnect()) + + if isinstance(resp, messages.Success): + return + else: + raise RuntimeError(f"Unexpected message {resp}") diff --git a/python/src/trezorlib/cli/__init__.py b/python/src/trezorlib/cli/__init__.py index 8a4191a8e..d975de8bf 100644 --- a/python/src/trezorlib/cli/__init__.py +++ b/python/src/trezorlib/cli/__init__.py @@ -16,12 +16,17 @@ import functools import sys +import threading from contextlib import contextmanager -from typing import TYPE_CHECKING, Any, Callable, Dict, Optional +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional import click +import dbus +import dbus.mainloop.glib +import dbus.service +from gi.repository import GLib -from .. import exceptions, transport +from .. import exceptions, messages, transport from ..client import TrezorClient from ..ui import ClickUI, ScriptUI @@ -104,6 +109,12 @@ class TrezorConnection: except transport.DeviceIsBusy: click.echo("Device is in use by another process.") sys.exit(1) + except exceptions.TrezorFailure as e: + if e.code is messages.FailureType.DeviceIsBusy: + click.echo(str(e)) + sys.exit(1) + else: + raise e except Exception: click.echo("Failed to find a Trezor device.") if self.path is not None: @@ -135,26 +146,95 @@ def with_client(func: "Callable[Concatenate[TrezorClient, P], R]") -> "Callable[ def trezorctl_command_with_client( obj: TrezorConnection, *args: "P.args", **kwargs: "P.kwargs" ) -> "R": - with obj.client_context() as client: - session_was_resumed = obj.session_id == client.session_id - if not session_was_resumed and obj.session_id is not None: - # tried to resume but failed - click.echo("Warning: failed to resume session.", err=True) + loop = GLib.MainLoop() + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + + def callback_wrapper( + r: List[Optional["R"]], exc: List[Optional[Exception]] + ) -> None: try: - return func(client, *args, **kwargs) - finally: - if not session_was_resumed: + with obj.client_context() as client: + session_was_resumed = obj.session_id == client.session_id + if not session_was_resumed and obj.session_id is not None: + # tried to resume but failed + click.echo("Warning: failed to resume session.", err=True) + try: - client.end_session() - except Exception: - pass + r.append(func(client, *args, **kwargs)) + except Exception as e: + exc[0] = e + finally: + if not session_was_resumed: + try: + client.end_session() + except Exception: + pass + except Exception as e: + exc[0] = e + finally: + loop.quit() + + result: List["R"] = [] + exc: List[Optional[Exception]] = [None] + threading.Thread( + target=callback_wrapper, daemon=True, args=(result, exc) + ).start() + loop.run() + + if exc[0] is not None: + raise exc[0] + + if len(result) == 0: + raise click.ClickException("Command did not return a result.") + + return result[0] # the return type of @click.pass_obj is improperly specified and pyright doesn't # understand that it converts f(obj, *args, **kwargs) to f(*args, **kwargs) return trezorctl_command_with_client # type: ignore [cannot be assigned to return type] +def with_ble(func: "Callable[P, R]") -> "Callable[P, R]": + """Wrap a Click command in `with obj.client_context() as client`. + + Sessions are handled transparently. The user is warned when session did not resume + cleanly. The session is closed after the command completes - unless the session + was resumed, in which case it should remain open. + """ + + @functools.wraps(func) + def trezorctl_command(*args: "P.args", **kwargs: "P.kwargs") -> "R": + + loop = GLib.MainLoop() + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + + def callback_wrapper(r: List[Optional["R"]], exc: List[Optional[Exception]]): + try: + r.append(func(*args, **kwargs)) + except Exception as e: + exc[0] = e + finally: + loop.quit() + + result: List["R"] = [] + exc: List[Optional[Exception]] = [None] + threading.Thread( + target=callback_wrapper, daemon=True, args=(result, exc) + ).start() + loop.run() + + if exc[0] is not None: + raise exc[0] + + if len(result) == 0: + raise click.ClickException("Command did not return a result.") + + return result[0] + + return trezorctl_command + + class AliasedGroup(click.Group): """Command group that handles aliases and Click 6.x compatibility. diff --git a/python/src/trezorlib/cli/ble.py b/python/src/trezorlib/cli/ble.py new file mode 100644 index 000000000..5f9ef5c2f --- /dev/null +++ b/python/src/trezorlib/cli/ble.py @@ -0,0 +1,132 @@ +# This file is part of the Trezor project. +# +# Copyright (C) 2012-2022 SatoshiLabs and contributors +# +# This library is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the License along with this library. +# If not, see . + +import sys +import zipfile +from typing import TYPE_CHECKING, BinaryIO + +import click + +from .. import ble, exceptions, tealblue +from ..transport.ble import lookup_device, scan_device +from . import with_ble, with_client + +if TYPE_CHECKING: + from ..client import TrezorClient + + +@click.group(name="ble") +def cli() -> None: + """BLE commands.""" + + +@cli.command() +# fmt: off +@click.argument("package", type=click.File("rb")) +# fmt: on +@with_client +def update( + client: "TrezorClient", + package: BinaryIO, +) -> None: + """Upload new BLE firmware to device.""" + + with zipfile.ZipFile(package) as archive: + binfile = archive.read("ble_firmware.bin") + datfile = archive.read("ble_firmware.dat") + + """Perform the final act of loading the firmware into Trezor.""" + try: + click.echo("Uploading...\r", nl=False) + with click.progressbar( + label="Uploading", length=len(binfile), show_eta=False + ) as bar: + ble.update(client, datfile, binfile, bar.update) + click.echo("Update successful.") + except exceptions.Cancelled: + click.echo("Update aborted on device.") + except exceptions.TrezorException as e: + click.echo(f"Update failed: {e}") + sys.exit(3) + + +@cli.command() +@with_ble +def connect() -> None: + """Connect to the device via BLE.""" + adapter = tealblue.TealBlue().find_adapter() + + devices = lookup_device(adapter) + + devices = [d for d in devices if d.connected] + + if len(devices) == 0: + print("Scanning...") + devices = scan_device(adapter, devices) + + if len(devices) == 0: + print("No BLE devices found") + return + else: + print("Found %d BLE device(s)" % len(devices)) + + for device in devices: + print(f"Device: {device.name}, {device.address}") + + device = devices[0] + print(f"Connecting to {device.name}...") + device.connect() + print("Connected") + + +@cli.command() +@click.option("--device", is_flag=True, help="Disconnect from device side.") +@with_client +def disconnect(client: "TrezorClient", device: bool) -> None: + + if device: + ble.disconnect(client) + else: + """Connect to the device via BLE.""" + adapter = tealblue.TealBlue().find_adapter() + + devices = lookup_device(adapter) + + devices = [d for d in devices if d.connected] + + if len(devices) == 0: + print("No device is connected") + + for d in devices: + d.disconnect() + print(f"Device {d.name}, {d.address}, disconnected.") + + +@cli.command() +@with_client +def erase_bonds( + client: "TrezorClient", +) -> None: + """Erase BLE bonds on device.""" + + try: + ble.erase_bonds(client) + click.echo("Erase successful.") + except exceptions.Cancelled: + click.echo("Erase aborted on device.") + except exceptions.TrezorException as e: + click.echo(f"Update failed: {e}") + sys.exit(3) diff --git a/python/src/trezorlib/cli/trezorctl.py b/python/src/trezorlib/cli/trezorctl.py index b820f28d9..752337114 100755 --- a/python/src/trezorlib/cli/trezorctl.py +++ b/python/src/trezorlib/cli/trezorctl.py @@ -32,6 +32,7 @@ from . import ( AliasedGroup, TrezorConnection, binance, + ble, btc, cardano, cosi, @@ -48,6 +49,7 @@ from . import ( settings, stellar, tezos, + with_ble, with_client, ) @@ -86,6 +88,7 @@ COMMAND_ALIASES = { "upgrade-firmware": firmware.update, "firmware-upgrade": firmware.update, "firmware-update": firmware.update, + "ble-update": ble.update, } @@ -279,6 +282,7 @@ def format_device_name(features: messages.Features) -> str: @cli.command(name="list") +@with_ble @click.option("-n", "no_resolve", is_flag=True, help="Do not resolve Trezor names") def list_devices(no_resolve: bool) -> Optional[Iterable["Transport"]]: """List connected Trezor devices.""" @@ -415,6 +419,7 @@ cli.add_command(tezos.cli) cli.add_command(firmware.cli) cli.add_command(debug.cli) +cli.add_command(ble.cli) # # Main diff --git a/python/src/trezorlib/messages.py b/python/src/trezorlib/messages.py index e1b90f79a..49072733a 100644 --- a/python/src/trezorlib/messages.py +++ b/python/src/trezorlib/messages.py @@ -82,6 +82,15 @@ class MessageType(IntEnum): FirmwareUpload = 7 FirmwareRequest = 8 SelfTest = 32 + UploadBLEFirmwareInit = 8000 + UploadBLEFirmwareNextChunk = 8001 + UploadBLEFirmwareChunk = 8002 + PairingRequest = 8003 + AuthKey = 8004 + RepairRequest = 8005 + EraseBonds = 8006 + Disconnect = 8007 + ComparisonRequest = 8008 GetPublicKey = 11 PublicKey = 12 SignTx = 15 @@ -277,6 +286,7 @@ class FailureType(IntEnum): PinMismatch = 12 WipeCodeMismatch = 13 InvalidSession = 14 + DeviceIsBusy = 15 FirmwareError = 99 @@ -2050,6 +2060,95 @@ class TxAckPrevExtraDataWrapper(protobuf.MessageType): self.extra_data_chunk = extra_data_chunk +class UploadBLEFirmwareInit(protobuf.MessageType): + MESSAGE_WIRE_TYPE = 8000 + FIELDS = { + 1: protobuf.Field("init_data", "bytes", repeated=False, required=True), + 2: protobuf.Field("binsize", "uint32", repeated=False, required=True), + } + + def __init__( + self, + *, + init_data: "bytes", + binsize: "int", + ) -> None: + self.init_data = init_data + self.binsize = binsize + + +class UploadBLEFirmwareNextChunk(protobuf.MessageType): + MESSAGE_WIRE_TYPE = 8001 + FIELDS = { + 1: protobuf.Field("offset", "uint32", repeated=False, required=True), + } + + def __init__( + self, + *, + offset: "int", + ) -> None: + self.offset = offset + + +class UploadBLEFirmwareChunk(protobuf.MessageType): + MESSAGE_WIRE_TYPE = 8002 + FIELDS = { + 1: protobuf.Field("data", "bytes", repeated=False, required=True), + } + + def __init__( + self, + *, + data: "bytes", + ) -> None: + self.data = data + + +class EraseBonds(protobuf.MessageType): + MESSAGE_WIRE_TYPE = 8006 + + +class Disconnect(protobuf.MessageType): + MESSAGE_WIRE_TYPE = 8007 + + +class PairingRequest(protobuf.MessageType): + MESSAGE_WIRE_TYPE = 8003 + + +class AuthKey(protobuf.MessageType): + MESSAGE_WIRE_TYPE = 8004 + FIELDS = { + 1: protobuf.Field("key", "bytes", repeated=False, required=True), + } + + def __init__( + self, + *, + key: "bytes", + ) -> None: + self.key = key + + +class RepairRequest(protobuf.MessageType): + MESSAGE_WIRE_TYPE = 8005 + + +class ComparisonRequest(protobuf.MessageType): + MESSAGE_WIRE_TYPE = 8008 + FIELDS = { + 1: protobuf.Field("key", "bytes", repeated=False, required=True), + } + + def __init__( + self, + *, + key: "bytes", + ) -> None: + self.key = key + + class FirmwareErase(protobuf.MessageType): MESSAGE_WIRE_TYPE = 6 FIELDS = { diff --git a/python/src/trezorlib/tealblue.py b/python/src/trezorlib/tealblue.py new file mode 100755 index 000000000..3756c74e1 --- /dev/null +++ b/python/src/trezorlib/tealblue.py @@ -0,0 +1,457 @@ +# !/usr/bin/python3 +# pyright: off + +import queue +import threading +import time + +import dbus +import dbus.mainloop.glib +import dbus.service + + +class NotConnectedError(Exception): + pass + + +class DBusInvalidArgsException(dbus.exceptions.DBusException): + _dbus_error_name = "org.freedesktop.DBus.Error.InvalidArgs" + + +def format_uuid(uuid): + if type(uuid) == int: + if uuid > 0xFFFF: + raise ValueError("32-bit UUID not supported yet") + uuid = "%04X" % uuid + return uuid + + +class TealBlue: + def __init__(self): + self._bus = dbus.SystemBus() + self._bluez = dbus.Interface( + self._bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager" + ) + + def find_adapter(self): + # find the first adapter + objects = self._bluez.GetManagedObjects() + for path in sorted(objects.keys()): + interfaces = objects[path] + if "org.bluez.Adapter1" not in interfaces: + continue + properties = interfaces["org.bluez.Adapter1"] + return Adapter(self, path, properties) + raise Exception("No adapter found") + + # copied from: + # https://github.com/adafruit/Adafruit_Python_BluefruitLE/blob/master/Adafruit_BluefruitLE/bluez_dbus/provider.py + def _print_tree(self): + """Print tree of all bluez objects, useful for debugging.""" + # This is based on the bluez sample code get-managed-objects.py. + objects = self._bluez.GetManagedObjects() + for path in sorted(objects.keys()): + print("[ %s ]" % (path)) + interfaces = objects[path] + for interface in sorted(interfaces.keys()): + if interface in [ + "org.freedesktop.DBus.Introspectable", + "org.freedesktop.DBus.Properties", + ]: + continue + print(" %s" % (interface)) + properties = interfaces[interface] + for key in sorted(properties.keys()): + print(" %s = %s" % (key, properties[key])) + + +class Adapter: + def __init__(self, teal, path, properties): + self._teal = teal + self._path = path + self._properties = properties + self._object = dbus.Interface( + teal._bus.get_object("org.bluez", path), "org.bluez.Adapter1" + ) + self._advertisement = None + + def __repr__(self): + return "" % (self._properties["Address"]) + + def devices(self): + """ + Returns the devices that BlueZ has discovered. + """ + objects = self._teal._bluez.GetManagedObjects() + for path in sorted(objects.keys()): + interfaces = objects[path] + if "org.bluez.Device1" not in interfaces: + continue + properties = interfaces["org.bluez.Device1"] + yield Device(self._teal, path, properties) + + def scan(self, timeout_s): + return Scanner(self._teal, self, self.devices(), timeout_s) + + @property + def advertisement(self): + if self._advertisement is None: + self._advertisement = Advertisement(self._teal, self) + return self._advertisement + + def advertise(self, enable): + if enable: + self.advertisement.enable() + else: + self.advertisement.disable() + + def advertise_data( + self, + local_name=None, + service_data=None, + service_uuids=None, + manufacturer_data=None, + ): + self.advertisement.local_name = local_name + self.advertisement.service_data = service_data + self.advertisement.service_uuids = service_uuids + self.advertisement.manufacturer_data = manufacturer_data + + +class Scanner: + def __init__(self, teal, adapter, initial_devices, timeout_s): + self._teal = teal + self._adapter = adapter + self._was_discovering = adapter._properties[ + "Discovering" + ] # TODO get current value, or watch property changes + self._queue = queue.Queue() + self.timeout_s = timeout_s + for device in initial_devices: + self._queue.put(device) + + def new_device(path, interfaces): + if "org.bluez.Device1" not in interfaces: + return + if not path.startswith(self._adapter._path + "/"): + return + # properties = interfaces["org.bluez.Device1"] + self._queue.put(Device(self._teal, path, interfaces["org.bluez.Device1"])) + + self._signal_receiver = self._teal._bus.add_signal_receiver( + new_device, + dbus_interface="org.freedesktop.DBus.ObjectManager", + signal_name="InterfacesAdded", + ) + if not self._was_discovering: + self._adapter._object.StartDiscovery() + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + if not self._was_discovering: + self._adapter._object.StopDiscovery() + self._signal_receiver.remove() + + def __iter__(self): + return self + + def __next__(self): + try: + return self._queue.get(timeout=self.timeout_s) + except queue.Empty: + raise StopIteration + + +class Device: + def __init__(self, teal, path, properties): + self._teal = teal + self._path = path + self._properties = properties + self._services_resolved = threading.Event() + self._services = None + + if properties["ServicesResolved"]: + self._services_resolved.set() + + # Listen to device events (connect, disconnect, ServicesResolved, ...) + self._device = dbus.Interface( + teal._bus.get_object("org.bluez", path), "org.bluez.Device1" + ) + self._device_props = dbus.Interface( + self._device, "org.freedesktop.DBus.Properties" + ) + self._signal_receiver = self._device_props.connect_to_signal( + "PropertiesChanged", + lambda itf, ch, inv: self._on_prop_changed(itf, ch, inv), + ) + + def __del__(self): + self._signal_receiver.remove() + + def __repr__(self): + return "" % (self.address, self.name) + + def _on_prop_changed(self, properties, changed_props, invalidated_props): + for key, value in changed_props.items(): + self._properties[key] = value + + if "ServicesResolved" in changed_props: + if changed_props["ServicesResolved"]: + self._services_resolved.set() + else: + self._services_resolved.clear() + + def _wait_for_discovery(self): + # wait until ServicesResolved is True + self._services_resolved.wait() + + def connect(self): + self._device.Connect() + + def disconnect(self): + self._device.Disconnect() + + def resolve_services(self): + self._services_resolved.wait() + + @property + def services(self): + if not self._services_resolved.is_set(): + return None + if self._services is None: + self._services = {} + objects = self._teal._bluez.GetManagedObjects() + for path in sorted(objects.keys()): + if not path.startswith(self._path + "/"): + continue + if "org.bluez.GattService1" in objects[path]: + properties = objects[path]["org.bluez.GattService1"] + service = Service(self._teal, self, path, properties) + self._services[service.uuid] = service + elif "org.bluez.GattCharacteristic1" in objects[path]: + properties = objects[path]["org.bluez.GattCharacteristic1"] + characterstic = Characteristic(self._teal, self, path, properties) + for service in self._services.values(): + if properties["Service"] == service._path: + service.characteristics[characterstic.uuid] = characterstic + return self._services + + @property + def connected(self): + return bool(self._properties["Connected"]) + + @property + def services_resolved(self): + return bool(self._properties["ServicesResolved"]) + + @property + def UUIDs(self): + return [str(s) for s in self._properties["UUIDs"]] + + @property + def address(self): + return str(self._properties["Address"]) + + @property + def name(self): + if "Name" not in self._properties: + return None + return str(self._properties["Name"]) + + @property + def alias(self): + if "Alias" not in self._properties: + return None + return str(self._properties["Alias"]) + + +class Service: + def __init__(self, teal, device, path, properties): + self._device = device + self._teal = teal + self._path = path + self._properties = properties + self.characteristics = {} + + def __repr__(self): + return "" % ( + self._device.address, + self.uuid, + ) + + @property + def uuid(self): + return str(self._properties["UUID"]) + + +class Characteristic: + def __init__(self, teal, device, path, properties): + self._device = device + self._teal = teal + self._path = path + self._properties = properties + + self.on_notify = None + + self._char = dbus.Interface( + teal._bus.get_object("org.bluez", path), "org.bluez.GattCharacteristic1" + ) + char_props = dbus.Interface(self._char, "org.freedesktop.DBus.Properties") + self._signal_receiver = char_props.connect_to_signal( + "PropertiesChanged", + lambda itf, ch, inv: self._on_prop_changed(itf, ch, inv), + ) + + def __repr__(self): + return "" % ( + self._device.address, + self.uuid, + ) + + def __del__(self): + self._signal_receiver.remove() + + def _on_prop_changed(self, properties, changed_props, invalidated_props): + for key, value in changed_props.items(): + self._properties[key] = bytes(value) + + if "Value" in changed_props and self.on_notify is not None: + self.on_notify(self, changed_props["Value"]) + + def read(self): + return bytes(self._char.ReadValue({})) + + def write(self, value, command=True): + start = time.time() + try: + if command: + self._char.WriteValue(value, {"type": "command"}) + else: + self._char.WriteValue(value, {"type": "request"}) + + except dbus.DBusException as e: + if ( + e.get_dbus_name() == "org.bluez.Error.Failed" + and e.get_dbus_message() == "Not connected" + ): + raise NotConnectedError() + else: + raise # some other error + + # Workaround: if the write took very long, it is possible the connection + # broke (without causing an exception). So check whether we are still + # connected. + # I think this is a bug in BlueZ. + if time.time() - start > 0.5: + if not self._device._device_props.Get("org.bluez.Device1", "Connected"): + raise NotConnectedError() + + def start_notify(self): + self._char.StartNotify() + + def stop_notify(self): + self._char.StopNotify() + + @property + def uuid(self): + return str(self._properties["UUID"]) + + +class Advertisement(dbus.service.Object): + PATH = "/com/github/aykevl/pynus/advertisement" + + def __init__(self, teal, adapter): + self._teal = teal + self._adapter = adapter + self._enabled = False + self.service_uuids = None + self.manufacturer_data = None + self.solicit_uuids = None + self.service_data = None + self.local_name = None + self.include_tx_power = None + self._manager = dbus.Interface( + teal._bus.get_object("org.bluez", self._adapter._path), + "org.bluez.LEAdvertisingManager1", + ) + self._adv_enabled = threading.Event() + dbus.service.Object.__init__(self, teal._bus, self.PATH) + + def enable(self): + if self._enabled: + return + self._manager.RegisterAdvertisement( + dbus.ObjectPath(self.PATH), + dbus.Dictionary({}, signature="sv"), + reply_handler=self._cb_enabled, + error_handler=self._cb_enabled_err, + ) + self._adv_enabled.wait() + self._adv_enabled.clear() + + def _cb_enabled(self): + self._enabled = True + if self._adv_enabled.is_set(): + raise RuntimeError("called enable() twice") + self._adv_enabled.set() + + def _cb_enabled_err(self, err): + self._enabled = False + if self._adv_enabled.is_set(): + raise RuntimeError("called enable() twice") + self._adv_enabled.set() + + def disable(self): + if not self._enabled: + return + self._bus.UnregisterAdvertisement(self.PATH) + self._enabled = False + + @property + def enabled(self): + return self._enabled + + @dbus.service.method( + "org.freedesktop.DBus.Properties", in_signature="s", out_signature="a{sv}" + ) + def GetAll(self, interface): + print("GetAll") + if interface != "org.bluez.LEAdvertisement1": + raise DBusInvalidArgsException() + + try: + properties = { + "Type": dbus.String("peripheral"), + } + if self.service_uuids is not None: + properties["ServiceUUIDs"] = dbus.Array( + map(format_uuid, self.service_uuids), signature="s" + ) + if self.solicit_uuids is not None: + properties["SolicitUUIDs"] = dbus.Array( + map(format_uuid, self.solicit_uuids), signature="s" + ) + if self.manufacturer_data is not None: + properties["ManufacturerData"] = dbus.Dictionary( + {k: v for k, v in self.manufacturer_data.items()}, signature="qv" + ) + if self.service_data is not None: + properties["ServiceData"] = dbus.Dictionary( + self.service_data, signature="sv" + ) + if self.local_name is not None: + properties["LocalName"] = dbus.String(self.local_name) + if self.include_tx_power is not None: + properties["IncludeTxPower"] = dbus.Boolean(self.include_tx_power) + except Exception as e: + print("err: ", e) + print("properties:", properties) + return properties + + @dbus.service.method( + "org.bluez.LEAdvertisement1", in_signature="", out_signature="" + ) + def Release(self): + self._enabled = True diff --git a/python/src/trezorlib/transport/__init__.py b/python/src/trezorlib/transport/__init__.py index b04876b6b..d7661b59b 100644 --- a/python/src/trezorlib/transport/__init__.py +++ b/python/src/trezorlib/transport/__init__.py @@ -117,12 +117,14 @@ def all_transports() -> Iterable[Type["Transport"]]: from .hid import HidTransport from .udp import UdpTransport from .webusb import WebUsbTransport + from .ble import BleTransport transports: Tuple[Type["Transport"], ...] = ( BridgeTransport, HidTransport, UdpTransport, WebUsbTransport, + BleTransport, ) return set(t for t in transports if t.ENABLED) diff --git a/python/src/trezorlib/transport/ble.py b/python/src/trezorlib/transport/ble.py new file mode 100644 index 000000000..ee2474560 --- /dev/null +++ b/python/src/trezorlib/transport/ble.py @@ -0,0 +1,154 @@ +# This file is part of the Trezor project. +# +# Copyright (C) 2012-2022 SatoshiLabs and contributors +# +# This library is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the License along with this library. +# If not, see . +import logging +from queue import Queue +from typing import TYPE_CHECKING, Any, Iterable, List, Optional + +from .. import tealblue +from ..tealblue import Adapter, Characteristic +from . import TransportException +from .protocol import ProtocolBasedTransport, ProtocolV1 + +if TYPE_CHECKING: + from ..models import TrezorModel + +LOG = logging.getLogger(__name__) + +NUS_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e" +NUS_CHARACTERISTIC_RX = "6e400002-b5a3-f393-e0a9-e50e24dcca9e" +NUS_CHARACTERISTIC_TX = "6e400003-b5a3-f393-e0a9-e50e24dcca9e" + + +def scan_device(adapter: Adapter, devices: List[tealblue.Device]): + with adapter.scan(2) as scanner: + for device in scanner: + if NUS_SERVICE_UUID in device.UUIDs: + if device.address not in [d.address for d in devices]: + print(f"Found device: {device.name}, {device.address}") + devices.append(device) + return devices + + +def lookup_device(adapter: Adapter): + devices = [] + for device in adapter.devices(): + if NUS_SERVICE_UUID in device.UUIDs: + devices.append(device) + return devices + + +class BleTransport(ProtocolBasedTransport): + ENABLED = True + PATH_PREFIX = "ble" + + def __init__(self, mac_addr: str, adapter: Adapter) -> None: + + self.tx = None + self.rx = None + self.device = mac_addr + self.adapter = adapter + self.received_data = Queue() + + devices = lookup_device(self.adapter) + + for d in devices: + if d.address == mac_addr: + self.ble_device = d + break + + super().__init__(protocol=ProtocolV1(self, replen=244)) + + def get_path(self) -> str: + return "{}:{}".format(self.PATH_PREFIX, self.device) + + def find_debug(self) -> "BleTransport": + mac = self.device + return BleTransport(f"{mac}", self.adapter) + + @classmethod + def enumerate( + cls, _models: Optional[Iterable["TrezorModel"]] = None + ) -> Iterable["BleTransport"]: + adapter = tealblue.TealBlue().find_adapter() + devices = lookup_device(adapter) + + for device in devices: + print(f"Device: {device.name}, {device.address}") + + devices = [d for d in devices if d.connected] + + return [BleTransport(device.address, adapter) for device in devices] + + @classmethod + def _try_path(cls, path: str) -> "BleTransport": + devices = cls.enumerate(None) + devices = [d for d in devices if d.device == path] + if len(devices) == 0: + raise TransportException(f"No BLE device: {path}") + return devices[0] + + @classmethod + def find_by_path(cls, path: str, prefix_search: bool = False) -> "BleTransport": + if not prefix_search: + raise TransportException + + if prefix_search: + return super().find_by_path(path, prefix_search) + else: + raise TransportException(f"No BLE device: {path}") + + def open(self) -> None: + + if not self.ble_device.connected: + print( + "Connecting to %s (%s)..." + % (self.ble_device.name, self.ble_device.address) + ) + self.ble_device.connect() + else: + print( + "Connected to %s (%s)." + % (self.ble_device.name, self.ble_device.address) + ) + + if not self.ble_device.services_resolved: + print("Resolving services...") + self.ble_device.resolve_services() + + service = self.ble_device.services[NUS_SERVICE_UUID] + self.rx = service.characteristics[NUS_CHARACTERISTIC_RX] + self.tx = service.characteristics[NUS_CHARACTERISTIC_TX] + + def on_notify(characteristic: Characteristic, value: Any): + self.received_data.put(bytes(value)) + + self.tx.on_notify = on_notify + self.tx.start_notify() + + def close(self) -> None: + pass + + def write_chunk(self, chunk: bytes) -> None: + assert self.rx is not None + self.rx.write(chunk) + + def read_chunk(self) -> bytes: + assert self.tx is not None + chunk = self.received_data.get() + # LOG.log(DUMP_PACKETS, f"received packet: {chunk.hex()}") + if len(chunk) != 64: + raise TransportException(f"Unexpected chunk size: {len(chunk)}") + return bytearray(chunk) diff --git a/python/src/trezorlib/transport/protocol.py b/python/src/trezorlib/transport/protocol.py index 9069650f5..5aba296ee 100644 --- a/python/src/trezorlib/transport/protocol.py +++ b/python/src/trezorlib/transport/protocol.py @@ -75,8 +75,9 @@ class Protocol: its messages. """ - def __init__(self, handle: Handle) -> None: + def __init__(self, handle: Handle, replen: int = REPLEN) -> None: self.handle = handle + self.replen = replen self.session_counter = 0 # XXX we might be able to remove this now that TrezorClient does session handling @@ -133,10 +134,10 @@ class ProtocolV1(Protocol): while buffer: # Report ID, data padded to 63 bytes - chunk = b"?" + buffer[: REPLEN - 1] - chunk = chunk.ljust(REPLEN, b"\x00") + chunk = b"?" + buffer[: self.replen - 1] + chunk = chunk.ljust(self.replen, b"\x00") self.handle.write_chunk(chunk) - buffer = buffer[63:] + buffer = buffer[self.replen - 1 :] def read(self) -> MessagePayload: buffer = bytearray() diff --git a/python/src/trezorlib/transport/webusb.py b/python/src/trezorlib/transport/webusb.py index d733ce666..632fee1b4 100644 --- a/python/src/trezorlib/transport/webusb.py +++ b/python/src/trezorlib/transport/webusb.py @@ -61,6 +61,8 @@ class WebUsbHandle: self.handle.claimInterface(self.interface) except usb1.USBErrorAccess as e: raise DeviceIsBusy(self.device) from e + except usb1.USBErrorBusy as e: + raise DeviceIsBusy(self.device) from e def close(self) -> None: if self.handle is not None: