mirror of
https://github.com/trezor/trezor-firmware.git
synced 2024-11-14 03:30:02 +00:00
PYTHON
This commit is contained in:
parent
d5dd619635
commit
1efb2a1009
@ -1,4 +1,4 @@
|
||||
check: messages.pb messages-binance.pb messages-bitcoin.pb messages-bootloader.pb messages-cardano.pb messages-common.pb messages-crypto.pb messages-debug.pb messages-ethereum.pb messages-management.pb messages-monero.pb messages-nem.pb messages-ripple.pb messages-stellar.pb messages-tezos.pb messages-eos.pb
|
||||
check: messages.pb messages-binance.pb messages-bitcoin.pb messages-ble.pb messages-bootloader.pb messages-cardano.pb messages-common.pb messages-crypto.pb messages-debug.pb messages-ethereum.pb messages-management.pb messages-monero.pb messages-nem.pb messages-ripple.pb messages-stellar.pb messages-tezos.pb messages-eos.pb
|
||||
|
||||
%.pb: %.proto
|
||||
protoc -I/usr/include -I. $< -o $@
|
||||
|
@ -76,6 +76,11 @@ vulture = "^2.6"
|
||||
binsize = "^0.1.3"
|
||||
toiftool = {path = "./python/tools/toiftool", develop = true, python = ">=3.8"}
|
||||
|
||||
# ble
|
||||
dbus-python = "*"
|
||||
PyGObject = "*"
|
||||
nrfutil = "*"
|
||||
|
||||
[tool.poetry.dev-dependencies]
|
||||
scan-build = "*"
|
||||
towncrier = "^23.6.0"
|
||||
|
@ -8,3 +8,6 @@ typing_extensions>=3.10
|
||||
dataclasses ; python_version<'3.7'
|
||||
simple-rlp>=0.1.2 ; python_version>='3.7'
|
||||
construct-classes>=0.1.2
|
||||
dbus-python>=1.3.2
|
||||
pygobject>=3.44.1
|
||||
nrfutil>=5.0.0
|
||||
|
@ -25,7 +25,7 @@ per-file-ignores =
|
||||
helper-scripts/*:I
|
||||
tools/*:I
|
||||
tests/*:I
|
||||
known-modules = libusb1:[usb1],hidapi:[hid],PyQt5:[PyQt5.QtWidgets,PyQt5.QtGui,PyQt5.QtCore],simple-rlp:[rlp]
|
||||
known-modules = libusb1:[usb1],hidapi:[hid],PyQt5:[PyQt5.QtWidgets,PyQt5.QtGui,PyQt5.QtCore],simple-rlp:[rlp],dbus-python:[dbus]
|
||||
|
||||
[isort]
|
||||
profile = black
|
||||
|
59
python/src/trezorlib/ble/__init__.py
Normal file
59
python/src/trezorlib/ble/__init__.py
Normal file
@ -0,0 +1,59 @@
|
||||
import typing as t
|
||||
|
||||
from .. import messages
|
||||
from ..tools import session
|
||||
|
||||
if t.TYPE_CHECKING:
|
||||
from ..client import TrezorClient
|
||||
|
||||
|
||||
@session
|
||||
def update(
|
||||
client: "TrezorClient",
|
||||
datfile: bytes,
|
||||
binfile: bytes,
|
||||
progress_update: t.Callable[[int], t.Any] = lambda _: None,
|
||||
):
|
||||
chunk_len = 4096
|
||||
offset = 0
|
||||
|
||||
resp = client.call(
|
||||
messages.UploadBLEFirmwareInit(init_data=datfile, binsize=len(binfile))
|
||||
)
|
||||
|
||||
while isinstance(resp, messages.UploadBLEFirmwareNextChunk):
|
||||
|
||||
payload = binfile[offset : offset + chunk_len]
|
||||
resp = client.call(messages.UploadBLEFirmwareChunk(data=payload))
|
||||
progress_update(chunk_len)
|
||||
offset += chunk_len
|
||||
|
||||
if isinstance(resp, messages.Success):
|
||||
return
|
||||
else:
|
||||
raise RuntimeError(f"Unexpected message {resp}")
|
||||
|
||||
|
||||
@session
|
||||
def erase_bonds(
|
||||
client: "TrezorClient",
|
||||
):
|
||||
|
||||
resp = client.call(messages.EraseBonds())
|
||||
|
||||
if isinstance(resp, messages.Success):
|
||||
return
|
||||
else:
|
||||
raise RuntimeError(f"Unexpected message {resp}")
|
||||
|
||||
|
||||
@session
|
||||
def disconnect(
|
||||
client: "TrezorClient",
|
||||
):
|
||||
resp = client.call(messages.Disconnect())
|
||||
|
||||
if isinstance(resp, messages.Success):
|
||||
return
|
||||
else:
|
||||
raise RuntimeError(f"Unexpected message {resp}")
|
@ -16,12 +16,17 @@
|
||||
|
||||
import functools
|
||||
import sys
|
||||
import threading
|
||||
from contextlib import contextmanager
|
||||
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional
|
||||
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
|
||||
|
||||
import click
|
||||
import dbus
|
||||
import dbus.mainloop.glib
|
||||
import dbus.service
|
||||
from gi.repository import GLib
|
||||
|
||||
from .. import exceptions, transport
|
||||
from .. import exceptions, messages, transport
|
||||
from ..client import TrezorClient
|
||||
from ..ui import ClickUI, ScriptUI
|
||||
|
||||
@ -104,6 +109,12 @@ class TrezorConnection:
|
||||
except transport.DeviceIsBusy:
|
||||
click.echo("Device is in use by another process.")
|
||||
sys.exit(1)
|
||||
except exceptions.TrezorFailure as e:
|
||||
if e.code is messages.FailureType.DeviceIsBusy:
|
||||
click.echo(str(e))
|
||||
sys.exit(1)
|
||||
else:
|
||||
raise e
|
||||
except Exception:
|
||||
click.echo("Failed to find a Trezor device.")
|
||||
if self.path is not None:
|
||||
@ -135,26 +146,95 @@ def with_client(func: "Callable[Concatenate[TrezorClient, P], R]") -> "Callable[
|
||||
def trezorctl_command_with_client(
|
||||
obj: TrezorConnection, *args: "P.args", **kwargs: "P.kwargs"
|
||||
) -> "R":
|
||||
with obj.client_context() as client:
|
||||
session_was_resumed = obj.session_id == client.session_id
|
||||
if not session_was_resumed and obj.session_id is not None:
|
||||
# tried to resume but failed
|
||||
click.echo("Warning: failed to resume session.", err=True)
|
||||
|
||||
loop = GLib.MainLoop()
|
||||
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
||||
|
||||
def callback_wrapper(
|
||||
r: List[Optional["R"]], exc: List[Optional[Exception]]
|
||||
) -> None:
|
||||
try:
|
||||
return func(client, *args, **kwargs)
|
||||
finally:
|
||||
if not session_was_resumed:
|
||||
with obj.client_context() as client:
|
||||
session_was_resumed = obj.session_id == client.session_id
|
||||
if not session_was_resumed and obj.session_id is not None:
|
||||
# tried to resume but failed
|
||||
click.echo("Warning: failed to resume session.", err=True)
|
||||
|
||||
try:
|
||||
client.end_session()
|
||||
except Exception:
|
||||
pass
|
||||
r.append(func(client, *args, **kwargs))
|
||||
except Exception as e:
|
||||
exc[0] = e
|
||||
finally:
|
||||
if not session_was_resumed:
|
||||
try:
|
||||
client.end_session()
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
exc[0] = e
|
||||
finally:
|
||||
loop.quit()
|
||||
|
||||
result: List["R"] = []
|
||||
exc: List[Optional[Exception]] = [None]
|
||||
threading.Thread(
|
||||
target=callback_wrapper, daemon=True, args=(result, exc)
|
||||
).start()
|
||||
loop.run()
|
||||
|
||||
if exc[0] is not None:
|
||||
raise exc[0]
|
||||
|
||||
if len(result) == 0:
|
||||
raise click.ClickException("Command did not return a result.")
|
||||
|
||||
return result[0]
|
||||
|
||||
# the return type of @click.pass_obj is improperly specified and pyright doesn't
|
||||
# understand that it converts f(obj, *args, **kwargs) to f(*args, **kwargs)
|
||||
return trezorctl_command_with_client # type: ignore [cannot be assigned to return type]
|
||||
|
||||
|
||||
def with_ble(func: "Callable[P, R]") -> "Callable[P, R]":
|
||||
"""Wrap a Click command in `with obj.client_context() as client`.
|
||||
|
||||
Sessions are handled transparently. The user is warned when session did not resume
|
||||
cleanly. The session is closed after the command completes - unless the session
|
||||
was resumed, in which case it should remain open.
|
||||
"""
|
||||
|
||||
@functools.wraps(func)
|
||||
def trezorctl_command(*args: "P.args", **kwargs: "P.kwargs") -> "R":
|
||||
|
||||
loop = GLib.MainLoop()
|
||||
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
||||
|
||||
def callback_wrapper(r: List[Optional["R"]], exc: List[Optional[Exception]]):
|
||||
try:
|
||||
r.append(func(*args, **kwargs))
|
||||
except Exception as e:
|
||||
exc[0] = e
|
||||
finally:
|
||||
loop.quit()
|
||||
|
||||
result: List["R"] = []
|
||||
exc: List[Optional[Exception]] = [None]
|
||||
threading.Thread(
|
||||
target=callback_wrapper, daemon=True, args=(result, exc)
|
||||
).start()
|
||||
loop.run()
|
||||
|
||||
if exc[0] is not None:
|
||||
raise exc[0]
|
||||
|
||||
if len(result) == 0:
|
||||
raise click.ClickException("Command did not return a result.")
|
||||
|
||||
return result[0]
|
||||
|
||||
return trezorctl_command
|
||||
|
||||
|
||||
class AliasedGroup(click.Group):
|
||||
"""Command group that handles aliases and Click 6.x compatibility.
|
||||
|
||||
|
132
python/src/trezorlib/cli/ble.py
Normal file
132
python/src/trezorlib/cli/ble.py
Normal file
@ -0,0 +1,132 @@
|
||||
# This file is part of the Trezor project.
|
||||
#
|
||||
# Copyright (C) 2012-2022 SatoshiLabs and contributors
|
||||
#
|
||||
# This library is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Lesser General Public License version 3
|
||||
# as published by the Free Software Foundation.
|
||||
#
|
||||
# This library is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Lesser General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the License along with this library.
|
||||
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
|
||||
|
||||
import sys
|
||||
import zipfile
|
||||
from typing import TYPE_CHECKING, BinaryIO
|
||||
|
||||
import click
|
||||
|
||||
from .. import ble, exceptions, tealblue
|
||||
from ..transport.ble import lookup_device, scan_device
|
||||
from . import with_ble, with_client
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..client import TrezorClient
|
||||
|
||||
|
||||
@click.group(name="ble")
|
||||
def cli() -> None:
|
||||
"""BLE commands."""
|
||||
|
||||
|
||||
@cli.command()
|
||||
# fmt: off
|
||||
@click.argument("package", type=click.File("rb"))
|
||||
# fmt: on
|
||||
@with_client
|
||||
def update(
|
||||
client: "TrezorClient",
|
||||
package: BinaryIO,
|
||||
) -> None:
|
||||
"""Upload new BLE firmware to device."""
|
||||
|
||||
with zipfile.ZipFile(package) as archive:
|
||||
binfile = archive.read("ble_firmware.bin")
|
||||
datfile = archive.read("ble_firmware.dat")
|
||||
|
||||
"""Perform the final act of loading the firmware into Trezor."""
|
||||
try:
|
||||
click.echo("Uploading...\r", nl=False)
|
||||
with click.progressbar(
|
||||
label="Uploading", length=len(binfile), show_eta=False
|
||||
) as bar:
|
||||
ble.update(client, datfile, binfile, bar.update)
|
||||
click.echo("Update successful.")
|
||||
except exceptions.Cancelled:
|
||||
click.echo("Update aborted on device.")
|
||||
except exceptions.TrezorException as e:
|
||||
click.echo(f"Update failed: {e}")
|
||||
sys.exit(3)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@with_ble
|
||||
def connect() -> None:
|
||||
"""Connect to the device via BLE."""
|
||||
adapter = tealblue.TealBlue().find_adapter()
|
||||
|
||||
devices = lookup_device(adapter)
|
||||
|
||||
devices = [d for d in devices if d.connected]
|
||||
|
||||
if len(devices) == 0:
|
||||
print("Scanning...")
|
||||
devices = scan_device(adapter, devices)
|
||||
|
||||
if len(devices) == 0:
|
||||
print("No BLE devices found")
|
||||
return
|
||||
else:
|
||||
print("Found %d BLE device(s)" % len(devices))
|
||||
|
||||
for device in devices:
|
||||
print(f"Device: {device.name}, {device.address}")
|
||||
|
||||
device = devices[0]
|
||||
print(f"Connecting to {device.name}...")
|
||||
device.connect()
|
||||
print("Connected")
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option("--device", is_flag=True, help="Disconnect from device side.")
|
||||
@with_client
|
||||
def disconnect(client: "TrezorClient", device: bool) -> None:
|
||||
|
||||
if device:
|
||||
ble.disconnect(client)
|
||||
else:
|
||||
"""Connect to the device via BLE."""
|
||||
adapter = tealblue.TealBlue().find_adapter()
|
||||
|
||||
devices = lookup_device(adapter)
|
||||
|
||||
devices = [d for d in devices if d.connected]
|
||||
|
||||
if len(devices) == 0:
|
||||
print("No device is connected")
|
||||
|
||||
for d in devices:
|
||||
d.disconnect()
|
||||
print(f"Device {d.name}, {d.address}, disconnected.")
|
||||
|
||||
|
||||
@cli.command()
|
||||
@with_client
|
||||
def erase_bonds(
|
||||
client: "TrezorClient",
|
||||
) -> None:
|
||||
"""Erase BLE bonds on device."""
|
||||
|
||||
try:
|
||||
ble.erase_bonds(client)
|
||||
click.echo("Erase successful.")
|
||||
except exceptions.Cancelled:
|
||||
click.echo("Erase aborted on device.")
|
||||
except exceptions.TrezorException as e:
|
||||
click.echo(f"Update failed: {e}")
|
||||
sys.exit(3)
|
@ -32,6 +32,7 @@ from . import (
|
||||
AliasedGroup,
|
||||
TrezorConnection,
|
||||
binance,
|
||||
ble,
|
||||
btc,
|
||||
cardano,
|
||||
cosi,
|
||||
@ -48,6 +49,7 @@ from . import (
|
||||
settings,
|
||||
stellar,
|
||||
tezos,
|
||||
with_ble,
|
||||
with_client,
|
||||
)
|
||||
|
||||
@ -86,6 +88,7 @@ COMMAND_ALIASES = {
|
||||
"upgrade-firmware": firmware.update,
|
||||
"firmware-upgrade": firmware.update,
|
||||
"firmware-update": firmware.update,
|
||||
"ble-update": ble.update,
|
||||
}
|
||||
|
||||
|
||||
@ -279,6 +282,7 @@ def format_device_name(features: messages.Features) -> str:
|
||||
|
||||
|
||||
@cli.command(name="list")
|
||||
@with_ble
|
||||
@click.option("-n", "no_resolve", is_flag=True, help="Do not resolve Trezor names")
|
||||
def list_devices(no_resolve: bool) -> Optional[Iterable["Transport"]]:
|
||||
"""List connected Trezor devices."""
|
||||
@ -415,6 +419,7 @@ cli.add_command(tezos.cli)
|
||||
|
||||
cli.add_command(firmware.cli)
|
||||
cli.add_command(debug.cli)
|
||||
cli.add_command(ble.cli)
|
||||
|
||||
#
|
||||
# Main
|
||||
|
@ -82,6 +82,15 @@ class MessageType(IntEnum):
|
||||
FirmwareUpload = 7
|
||||
FirmwareRequest = 8
|
||||
SelfTest = 32
|
||||
UploadBLEFirmwareInit = 8000
|
||||
UploadBLEFirmwareNextChunk = 8001
|
||||
UploadBLEFirmwareChunk = 8002
|
||||
PairingRequest = 8003
|
||||
AuthKey = 8004
|
||||
RepairRequest = 8005
|
||||
EraseBonds = 8006
|
||||
Disconnect = 8007
|
||||
ComparisonRequest = 8008
|
||||
GetPublicKey = 11
|
||||
PublicKey = 12
|
||||
SignTx = 15
|
||||
@ -277,6 +286,7 @@ class FailureType(IntEnum):
|
||||
PinMismatch = 12
|
||||
WipeCodeMismatch = 13
|
||||
InvalidSession = 14
|
||||
DeviceIsBusy = 15
|
||||
FirmwareError = 99
|
||||
|
||||
|
||||
@ -2050,6 +2060,95 @@ class TxAckPrevExtraDataWrapper(protobuf.MessageType):
|
||||
self.extra_data_chunk = extra_data_chunk
|
||||
|
||||
|
||||
class UploadBLEFirmwareInit(protobuf.MessageType):
|
||||
MESSAGE_WIRE_TYPE = 8000
|
||||
FIELDS = {
|
||||
1: protobuf.Field("init_data", "bytes", repeated=False, required=True),
|
||||
2: protobuf.Field("binsize", "uint32", repeated=False, required=True),
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
init_data: "bytes",
|
||||
binsize: "int",
|
||||
) -> None:
|
||||
self.init_data = init_data
|
||||
self.binsize = binsize
|
||||
|
||||
|
||||
class UploadBLEFirmwareNextChunk(protobuf.MessageType):
|
||||
MESSAGE_WIRE_TYPE = 8001
|
||||
FIELDS = {
|
||||
1: protobuf.Field("offset", "uint32", repeated=False, required=True),
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
offset: "int",
|
||||
) -> None:
|
||||
self.offset = offset
|
||||
|
||||
|
||||
class UploadBLEFirmwareChunk(protobuf.MessageType):
|
||||
MESSAGE_WIRE_TYPE = 8002
|
||||
FIELDS = {
|
||||
1: protobuf.Field("data", "bytes", repeated=False, required=True),
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
data: "bytes",
|
||||
) -> None:
|
||||
self.data = data
|
||||
|
||||
|
||||
class EraseBonds(protobuf.MessageType):
|
||||
MESSAGE_WIRE_TYPE = 8006
|
||||
|
||||
|
||||
class Disconnect(protobuf.MessageType):
|
||||
MESSAGE_WIRE_TYPE = 8007
|
||||
|
||||
|
||||
class PairingRequest(protobuf.MessageType):
|
||||
MESSAGE_WIRE_TYPE = 8003
|
||||
|
||||
|
||||
class AuthKey(protobuf.MessageType):
|
||||
MESSAGE_WIRE_TYPE = 8004
|
||||
FIELDS = {
|
||||
1: protobuf.Field("key", "bytes", repeated=False, required=True),
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
key: "bytes",
|
||||
) -> None:
|
||||
self.key = key
|
||||
|
||||
|
||||
class RepairRequest(protobuf.MessageType):
|
||||
MESSAGE_WIRE_TYPE = 8005
|
||||
|
||||
|
||||
class ComparisonRequest(protobuf.MessageType):
|
||||
MESSAGE_WIRE_TYPE = 8008
|
||||
FIELDS = {
|
||||
1: protobuf.Field("key", "bytes", repeated=False, required=True),
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
key: "bytes",
|
||||
) -> None:
|
||||
self.key = key
|
||||
|
||||
|
||||
class FirmwareErase(protobuf.MessageType):
|
||||
MESSAGE_WIRE_TYPE = 6
|
||||
FIELDS = {
|
||||
|
457
python/src/trezorlib/tealblue.py
Executable file
457
python/src/trezorlib/tealblue.py
Executable file
@ -0,0 +1,457 @@
|
||||
# !/usr/bin/python3
|
||||
# pyright: off
|
||||
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
|
||||
import dbus
|
||||
import dbus.mainloop.glib
|
||||
import dbus.service
|
||||
|
||||
|
||||
class NotConnectedError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class DBusInvalidArgsException(dbus.exceptions.DBusException):
|
||||
_dbus_error_name = "org.freedesktop.DBus.Error.InvalidArgs"
|
||||
|
||||
|
||||
def format_uuid(uuid):
|
||||
if type(uuid) == int:
|
||||
if uuid > 0xFFFF:
|
||||
raise ValueError("32-bit UUID not supported yet")
|
||||
uuid = "%04X" % uuid
|
||||
return uuid
|
||||
|
||||
|
||||
class TealBlue:
|
||||
def __init__(self):
|
||||
self._bus = dbus.SystemBus()
|
||||
self._bluez = dbus.Interface(
|
||||
self._bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager"
|
||||
)
|
||||
|
||||
def find_adapter(self):
|
||||
# find the first adapter
|
||||
objects = self._bluez.GetManagedObjects()
|
||||
for path in sorted(objects.keys()):
|
||||
interfaces = objects[path]
|
||||
if "org.bluez.Adapter1" not in interfaces:
|
||||
continue
|
||||
properties = interfaces["org.bluez.Adapter1"]
|
||||
return Adapter(self, path, properties)
|
||||
raise Exception("No adapter found")
|
||||
|
||||
# copied from:
|
||||
# https://github.com/adafruit/Adafruit_Python_BluefruitLE/blob/master/Adafruit_BluefruitLE/bluez_dbus/provider.py
|
||||
def _print_tree(self):
|
||||
"""Print tree of all bluez objects, useful for debugging."""
|
||||
# This is based on the bluez sample code get-managed-objects.py.
|
||||
objects = self._bluez.GetManagedObjects()
|
||||
for path in sorted(objects.keys()):
|
||||
print("[ %s ]" % (path))
|
||||
interfaces = objects[path]
|
||||
for interface in sorted(interfaces.keys()):
|
||||
if interface in [
|
||||
"org.freedesktop.DBus.Introspectable",
|
||||
"org.freedesktop.DBus.Properties",
|
||||
]:
|
||||
continue
|
||||
print(" %s" % (interface))
|
||||
properties = interfaces[interface]
|
||||
for key in sorted(properties.keys()):
|
||||
print(" %s = %s" % (key, properties[key]))
|
||||
|
||||
|
||||
class Adapter:
|
||||
def __init__(self, teal, path, properties):
|
||||
self._teal = teal
|
||||
self._path = path
|
||||
self._properties = properties
|
||||
self._object = dbus.Interface(
|
||||
teal._bus.get_object("org.bluez", path), "org.bluez.Adapter1"
|
||||
)
|
||||
self._advertisement = None
|
||||
|
||||
def __repr__(self):
|
||||
return "<tealblue.Adapter address=%s>" % (self._properties["Address"])
|
||||
|
||||
def devices(self):
|
||||
"""
|
||||
Returns the devices that BlueZ has discovered.
|
||||
"""
|
||||
objects = self._teal._bluez.GetManagedObjects()
|
||||
for path in sorted(objects.keys()):
|
||||
interfaces = objects[path]
|
||||
if "org.bluez.Device1" not in interfaces:
|
||||
continue
|
||||
properties = interfaces["org.bluez.Device1"]
|
||||
yield Device(self._teal, path, properties)
|
||||
|
||||
def scan(self, timeout_s):
|
||||
return Scanner(self._teal, self, self.devices(), timeout_s)
|
||||
|
||||
@property
|
||||
def advertisement(self):
|
||||
if self._advertisement is None:
|
||||
self._advertisement = Advertisement(self._teal, self)
|
||||
return self._advertisement
|
||||
|
||||
def advertise(self, enable):
|
||||
if enable:
|
||||
self.advertisement.enable()
|
||||
else:
|
||||
self.advertisement.disable()
|
||||
|
||||
def advertise_data(
|
||||
self,
|
||||
local_name=None,
|
||||
service_data=None,
|
||||
service_uuids=None,
|
||||
manufacturer_data=None,
|
||||
):
|
||||
self.advertisement.local_name = local_name
|
||||
self.advertisement.service_data = service_data
|
||||
self.advertisement.service_uuids = service_uuids
|
||||
self.advertisement.manufacturer_data = manufacturer_data
|
||||
|
||||
|
||||
class Scanner:
|
||||
def __init__(self, teal, adapter, initial_devices, timeout_s):
|
||||
self._teal = teal
|
||||
self._adapter = adapter
|
||||
self._was_discovering = adapter._properties[
|
||||
"Discovering"
|
||||
] # TODO get current value, or watch property changes
|
||||
self._queue = queue.Queue()
|
||||
self.timeout_s = timeout_s
|
||||
for device in initial_devices:
|
||||
self._queue.put(device)
|
||||
|
||||
def new_device(path, interfaces):
|
||||
if "org.bluez.Device1" not in interfaces:
|
||||
return
|
||||
if not path.startswith(self._adapter._path + "/"):
|
||||
return
|
||||
# properties = interfaces["org.bluez.Device1"]
|
||||
self._queue.put(Device(self._teal, path, interfaces["org.bluez.Device1"]))
|
||||
|
||||
self._signal_receiver = self._teal._bus.add_signal_receiver(
|
||||
new_device,
|
||||
dbus_interface="org.freedesktop.DBus.ObjectManager",
|
||||
signal_name="InterfacesAdded",
|
||||
)
|
||||
if not self._was_discovering:
|
||||
self._adapter._object.StartDiscovery()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
if not self._was_discovering:
|
||||
self._adapter._object.StopDiscovery()
|
||||
self._signal_receiver.remove()
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
try:
|
||||
return self._queue.get(timeout=self.timeout_s)
|
||||
except queue.Empty:
|
||||
raise StopIteration
|
||||
|
||||
|
||||
class Device:
|
||||
def __init__(self, teal, path, properties):
|
||||
self._teal = teal
|
||||
self._path = path
|
||||
self._properties = properties
|
||||
self._services_resolved = threading.Event()
|
||||
self._services = None
|
||||
|
||||
if properties["ServicesResolved"]:
|
||||
self._services_resolved.set()
|
||||
|
||||
# Listen to device events (connect, disconnect, ServicesResolved, ...)
|
||||
self._device = dbus.Interface(
|
||||
teal._bus.get_object("org.bluez", path), "org.bluez.Device1"
|
||||
)
|
||||
self._device_props = dbus.Interface(
|
||||
self._device, "org.freedesktop.DBus.Properties"
|
||||
)
|
||||
self._signal_receiver = self._device_props.connect_to_signal(
|
||||
"PropertiesChanged",
|
||||
lambda itf, ch, inv: self._on_prop_changed(itf, ch, inv),
|
||||
)
|
||||
|
||||
def __del__(self):
|
||||
self._signal_receiver.remove()
|
||||
|
||||
def __repr__(self):
|
||||
return "<tealblue.Device address=%s name=%r>" % (self.address, self.name)
|
||||
|
||||
def _on_prop_changed(self, properties, changed_props, invalidated_props):
|
||||
for key, value in changed_props.items():
|
||||
self._properties[key] = value
|
||||
|
||||
if "ServicesResolved" in changed_props:
|
||||
if changed_props["ServicesResolved"]:
|
||||
self._services_resolved.set()
|
||||
else:
|
||||
self._services_resolved.clear()
|
||||
|
||||
def _wait_for_discovery(self):
|
||||
# wait until ServicesResolved is True
|
||||
self._services_resolved.wait()
|
||||
|
||||
def connect(self):
|
||||
self._device.Connect()
|
||||
|
||||
def disconnect(self):
|
||||
self._device.Disconnect()
|
||||
|
||||
def resolve_services(self):
|
||||
self._services_resolved.wait()
|
||||
|
||||
@property
|
||||
def services(self):
|
||||
if not self._services_resolved.is_set():
|
||||
return None
|
||||
if self._services is None:
|
||||
self._services = {}
|
||||
objects = self._teal._bluez.GetManagedObjects()
|
||||
for path in sorted(objects.keys()):
|
||||
if not path.startswith(self._path + "/"):
|
||||
continue
|
||||
if "org.bluez.GattService1" in objects[path]:
|
||||
properties = objects[path]["org.bluez.GattService1"]
|
||||
service = Service(self._teal, self, path, properties)
|
||||
self._services[service.uuid] = service
|
||||
elif "org.bluez.GattCharacteristic1" in objects[path]:
|
||||
properties = objects[path]["org.bluez.GattCharacteristic1"]
|
||||
characterstic = Characteristic(self._teal, self, path, properties)
|
||||
for service in self._services.values():
|
||||
if properties["Service"] == service._path:
|
||||
service.characteristics[characterstic.uuid] = characterstic
|
||||
return self._services
|
||||
|
||||
@property
|
||||
def connected(self):
|
||||
return bool(self._properties["Connected"])
|
||||
|
||||
@property
|
||||
def services_resolved(self):
|
||||
return bool(self._properties["ServicesResolved"])
|
||||
|
||||
@property
|
||||
def UUIDs(self):
|
||||
return [str(s) for s in self._properties["UUIDs"]]
|
||||
|
||||
@property
|
||||
def address(self):
|
||||
return str(self._properties["Address"])
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
if "Name" not in self._properties:
|
||||
return None
|
||||
return str(self._properties["Name"])
|
||||
|
||||
@property
|
||||
def alias(self):
|
||||
if "Alias" not in self._properties:
|
||||
return None
|
||||
return str(self._properties["Alias"])
|
||||
|
||||
|
||||
class Service:
|
||||
def __init__(self, teal, device, path, properties):
|
||||
self._device = device
|
||||
self._teal = teal
|
||||
self._path = path
|
||||
self._properties = properties
|
||||
self.characteristics = {}
|
||||
|
||||
def __repr__(self):
|
||||
return "<tealblue.Service device=%s uuid=%s>" % (
|
||||
self._device.address,
|
||||
self.uuid,
|
||||
)
|
||||
|
||||
@property
|
||||
def uuid(self):
|
||||
return str(self._properties["UUID"])
|
||||
|
||||
|
||||
class Characteristic:
|
||||
def __init__(self, teal, device, path, properties):
|
||||
self._device = device
|
||||
self._teal = teal
|
||||
self._path = path
|
||||
self._properties = properties
|
||||
|
||||
self.on_notify = None
|
||||
|
||||
self._char = dbus.Interface(
|
||||
teal._bus.get_object("org.bluez", path), "org.bluez.GattCharacteristic1"
|
||||
)
|
||||
char_props = dbus.Interface(self._char, "org.freedesktop.DBus.Properties")
|
||||
self._signal_receiver = char_props.connect_to_signal(
|
||||
"PropertiesChanged",
|
||||
lambda itf, ch, inv: self._on_prop_changed(itf, ch, inv),
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
return "<tealblue.Characteristic device=%s uuid=%s>" % (
|
||||
self._device.address,
|
||||
self.uuid,
|
||||
)
|
||||
|
||||
def __del__(self):
|
||||
self._signal_receiver.remove()
|
||||
|
||||
def _on_prop_changed(self, properties, changed_props, invalidated_props):
|
||||
for key, value in changed_props.items():
|
||||
self._properties[key] = bytes(value)
|
||||
|
||||
if "Value" in changed_props and self.on_notify is not None:
|
||||
self.on_notify(self, changed_props["Value"])
|
||||
|
||||
def read(self):
|
||||
return bytes(self._char.ReadValue({}))
|
||||
|
||||
def write(self, value, command=True):
|
||||
start = time.time()
|
||||
try:
|
||||
if command:
|
||||
self._char.WriteValue(value, {"type": "command"})
|
||||
else:
|
||||
self._char.WriteValue(value, {"type": "request"})
|
||||
|
||||
except dbus.DBusException as e:
|
||||
if (
|
||||
e.get_dbus_name() == "org.bluez.Error.Failed"
|
||||
and e.get_dbus_message() == "Not connected"
|
||||
):
|
||||
raise NotConnectedError()
|
||||
else:
|
||||
raise # some other error
|
||||
|
||||
# Workaround: if the write took very long, it is possible the connection
|
||||
# broke (without causing an exception). So check whether we are still
|
||||
# connected.
|
||||
# I think this is a bug in BlueZ.
|
||||
if time.time() - start > 0.5:
|
||||
if not self._device._device_props.Get("org.bluez.Device1", "Connected"):
|
||||
raise NotConnectedError()
|
||||
|
||||
def start_notify(self):
|
||||
self._char.StartNotify()
|
||||
|
||||
def stop_notify(self):
|
||||
self._char.StopNotify()
|
||||
|
||||
@property
|
||||
def uuid(self):
|
||||
return str(self._properties["UUID"])
|
||||
|
||||
|
||||
class Advertisement(dbus.service.Object):
|
||||
PATH = "/com/github/aykevl/pynus/advertisement"
|
||||
|
||||
def __init__(self, teal, adapter):
|
||||
self._teal = teal
|
||||
self._adapter = adapter
|
||||
self._enabled = False
|
||||
self.service_uuids = None
|
||||
self.manufacturer_data = None
|
||||
self.solicit_uuids = None
|
||||
self.service_data = None
|
||||
self.local_name = None
|
||||
self.include_tx_power = None
|
||||
self._manager = dbus.Interface(
|
||||
teal._bus.get_object("org.bluez", self._adapter._path),
|
||||
"org.bluez.LEAdvertisingManager1",
|
||||
)
|
||||
self._adv_enabled = threading.Event()
|
||||
dbus.service.Object.__init__(self, teal._bus, self.PATH)
|
||||
|
||||
def enable(self):
|
||||
if self._enabled:
|
||||
return
|
||||
self._manager.RegisterAdvertisement(
|
||||
dbus.ObjectPath(self.PATH),
|
||||
dbus.Dictionary({}, signature="sv"),
|
||||
reply_handler=self._cb_enabled,
|
||||
error_handler=self._cb_enabled_err,
|
||||
)
|
||||
self._adv_enabled.wait()
|
||||
self._adv_enabled.clear()
|
||||
|
||||
def _cb_enabled(self):
|
||||
self._enabled = True
|
||||
if self._adv_enabled.is_set():
|
||||
raise RuntimeError("called enable() twice")
|
||||
self._adv_enabled.set()
|
||||
|
||||
def _cb_enabled_err(self, err):
|
||||
self._enabled = False
|
||||
if self._adv_enabled.is_set():
|
||||
raise RuntimeError("called enable() twice")
|
||||
self._adv_enabled.set()
|
||||
|
||||
def disable(self):
|
||||
if not self._enabled:
|
||||
return
|
||||
self._bus.UnregisterAdvertisement(self.PATH)
|
||||
self._enabled = False
|
||||
|
||||
@property
|
||||
def enabled(self):
|
||||
return self._enabled
|
||||
|
||||
@dbus.service.method(
|
||||
"org.freedesktop.DBus.Properties", in_signature="s", out_signature="a{sv}"
|
||||
)
|
||||
def GetAll(self, interface):
|
||||
print("GetAll")
|
||||
if interface != "org.bluez.LEAdvertisement1":
|
||||
raise DBusInvalidArgsException()
|
||||
|
||||
try:
|
||||
properties = {
|
||||
"Type": dbus.String("peripheral"),
|
||||
}
|
||||
if self.service_uuids is not None:
|
||||
properties["ServiceUUIDs"] = dbus.Array(
|
||||
map(format_uuid, self.service_uuids), signature="s"
|
||||
)
|
||||
if self.solicit_uuids is not None:
|
||||
properties["SolicitUUIDs"] = dbus.Array(
|
||||
map(format_uuid, self.solicit_uuids), signature="s"
|
||||
)
|
||||
if self.manufacturer_data is not None:
|
||||
properties["ManufacturerData"] = dbus.Dictionary(
|
||||
{k: v for k, v in self.manufacturer_data.items()}, signature="qv"
|
||||
)
|
||||
if self.service_data is not None:
|
||||
properties["ServiceData"] = dbus.Dictionary(
|
||||
self.service_data, signature="sv"
|
||||
)
|
||||
if self.local_name is not None:
|
||||
properties["LocalName"] = dbus.String(self.local_name)
|
||||
if self.include_tx_power is not None:
|
||||
properties["IncludeTxPower"] = dbus.Boolean(self.include_tx_power)
|
||||
except Exception as e:
|
||||
print("err: ", e)
|
||||
print("properties:", properties)
|
||||
return properties
|
||||
|
||||
@dbus.service.method(
|
||||
"org.bluez.LEAdvertisement1", in_signature="", out_signature=""
|
||||
)
|
||||
def Release(self):
|
||||
self._enabled = True
|
@ -117,12 +117,14 @@ def all_transports() -> Iterable[Type["Transport"]]:
|
||||
from .hid import HidTransport
|
||||
from .udp import UdpTransport
|
||||
from .webusb import WebUsbTransport
|
||||
from .ble import BleTransport
|
||||
|
||||
transports: Tuple[Type["Transport"], ...] = (
|
||||
BridgeTransport,
|
||||
HidTransport,
|
||||
UdpTransport,
|
||||
WebUsbTransport,
|
||||
BleTransport,
|
||||
)
|
||||
return set(t for t in transports if t.ENABLED)
|
||||
|
||||
|
154
python/src/trezorlib/transport/ble.py
Normal file
154
python/src/trezorlib/transport/ble.py
Normal file
@ -0,0 +1,154 @@
|
||||
# This file is part of the Trezor project.
|
||||
#
|
||||
# Copyright (C) 2012-2022 SatoshiLabs and contributors
|
||||
#
|
||||
# This library is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Lesser General Public License version 3
|
||||
# as published by the Free Software Foundation.
|
||||
#
|
||||
# This library is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Lesser General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the License along with this library.
|
||||
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
|
||||
import logging
|
||||
from queue import Queue
|
||||
from typing import TYPE_CHECKING, Any, Iterable, List, Optional
|
||||
|
||||
from .. import tealblue
|
||||
from ..tealblue import Adapter, Characteristic
|
||||
from . import TransportException
|
||||
from .protocol import ProtocolBasedTransport, ProtocolV1
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..models import TrezorModel
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
NUS_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
|
||||
NUS_CHARACTERISTIC_RX = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"
|
||||
NUS_CHARACTERISTIC_TX = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"
|
||||
|
||||
|
||||
def scan_device(adapter: Adapter, devices: List[tealblue.Device]):
|
||||
with adapter.scan(2) as scanner:
|
||||
for device in scanner:
|
||||
if NUS_SERVICE_UUID in device.UUIDs:
|
||||
if device.address not in [d.address for d in devices]:
|
||||
print(f"Found device: {device.name}, {device.address}")
|
||||
devices.append(device)
|
||||
return devices
|
||||
|
||||
|
||||
def lookup_device(adapter: Adapter):
|
||||
devices = []
|
||||
for device in adapter.devices():
|
||||
if NUS_SERVICE_UUID in device.UUIDs:
|
||||
devices.append(device)
|
||||
return devices
|
||||
|
||||
|
||||
class BleTransport(ProtocolBasedTransport):
|
||||
ENABLED = True
|
||||
PATH_PREFIX = "ble"
|
||||
|
||||
def __init__(self, mac_addr: str, adapter: Adapter) -> None:
|
||||
|
||||
self.tx = None
|
||||
self.rx = None
|
||||
self.device = mac_addr
|
||||
self.adapter = adapter
|
||||
self.received_data = Queue()
|
||||
|
||||
devices = lookup_device(self.adapter)
|
||||
|
||||
for d in devices:
|
||||
if d.address == mac_addr:
|
||||
self.ble_device = d
|
||||
break
|
||||
|
||||
super().__init__(protocol=ProtocolV1(self, replen=244))
|
||||
|
||||
def get_path(self) -> str:
|
||||
return "{}:{}".format(self.PATH_PREFIX, self.device)
|
||||
|
||||
def find_debug(self) -> "BleTransport":
|
||||
mac = self.device
|
||||
return BleTransport(f"{mac}", self.adapter)
|
||||
|
||||
@classmethod
|
||||
def enumerate(
|
||||
cls, _models: Optional[Iterable["TrezorModel"]] = None
|
||||
) -> Iterable["BleTransport"]:
|
||||
adapter = tealblue.TealBlue().find_adapter()
|
||||
devices = lookup_device(adapter)
|
||||
|
||||
for device in devices:
|
||||
print(f"Device: {device.name}, {device.address}")
|
||||
|
||||
devices = [d for d in devices if d.connected]
|
||||
|
||||
return [BleTransport(device.address, adapter) for device in devices]
|
||||
|
||||
@classmethod
|
||||
def _try_path(cls, path: str) -> "BleTransport":
|
||||
devices = cls.enumerate(None)
|
||||
devices = [d for d in devices if d.device == path]
|
||||
if len(devices) == 0:
|
||||
raise TransportException(f"No BLE device: {path}")
|
||||
return devices[0]
|
||||
|
||||
@classmethod
|
||||
def find_by_path(cls, path: str, prefix_search: bool = False) -> "BleTransport":
|
||||
if not prefix_search:
|
||||
raise TransportException
|
||||
|
||||
if prefix_search:
|
||||
return super().find_by_path(path, prefix_search)
|
||||
else:
|
||||
raise TransportException(f"No BLE device: {path}")
|
||||
|
||||
def open(self) -> None:
|
||||
|
||||
if not self.ble_device.connected:
|
||||
print(
|
||||
"Connecting to %s (%s)..."
|
||||
% (self.ble_device.name, self.ble_device.address)
|
||||
)
|
||||
self.ble_device.connect()
|
||||
else:
|
||||
print(
|
||||
"Connected to %s (%s)."
|
||||
% (self.ble_device.name, self.ble_device.address)
|
||||
)
|
||||
|
||||
if not self.ble_device.services_resolved:
|
||||
print("Resolving services...")
|
||||
self.ble_device.resolve_services()
|
||||
|
||||
service = self.ble_device.services[NUS_SERVICE_UUID]
|
||||
self.rx = service.characteristics[NUS_CHARACTERISTIC_RX]
|
||||
self.tx = service.characteristics[NUS_CHARACTERISTIC_TX]
|
||||
|
||||
def on_notify(characteristic: Characteristic, value: Any):
|
||||
self.received_data.put(bytes(value))
|
||||
|
||||
self.tx.on_notify = on_notify
|
||||
self.tx.start_notify()
|
||||
|
||||
def close(self) -> None:
|
||||
pass
|
||||
|
||||
def write_chunk(self, chunk: bytes) -> None:
|
||||
assert self.rx is not None
|
||||
self.rx.write(chunk)
|
||||
|
||||
def read_chunk(self) -> bytes:
|
||||
assert self.tx is not None
|
||||
chunk = self.received_data.get()
|
||||
# LOG.log(DUMP_PACKETS, f"received packet: {chunk.hex()}")
|
||||
if len(chunk) != 64:
|
||||
raise TransportException(f"Unexpected chunk size: {len(chunk)}")
|
||||
return bytearray(chunk)
|
@ -75,8 +75,9 @@ class Protocol:
|
||||
its messages.
|
||||
"""
|
||||
|
||||
def __init__(self, handle: Handle) -> None:
|
||||
def __init__(self, handle: Handle, replen: int = REPLEN) -> None:
|
||||
self.handle = handle
|
||||
self.replen = replen
|
||||
self.session_counter = 0
|
||||
|
||||
# XXX we might be able to remove this now that TrezorClient does session handling
|
||||
@ -133,10 +134,10 @@ class ProtocolV1(Protocol):
|
||||
|
||||
while buffer:
|
||||
# Report ID, data padded to 63 bytes
|
||||
chunk = b"?" + buffer[: REPLEN - 1]
|
||||
chunk = chunk.ljust(REPLEN, b"\x00")
|
||||
chunk = b"?" + buffer[: self.replen - 1]
|
||||
chunk = chunk.ljust(self.replen, b"\x00")
|
||||
self.handle.write_chunk(chunk)
|
||||
buffer = buffer[63:]
|
||||
buffer = buffer[self.replen - 1 :]
|
||||
|
||||
def read(self) -> MessagePayload:
|
||||
buffer = bytearray()
|
||||
|
@ -61,6 +61,8 @@ class WebUsbHandle:
|
||||
self.handle.claimInterface(self.interface)
|
||||
except usb1.USBErrorAccess as e:
|
||||
raise DeviceIsBusy(self.device) from e
|
||||
except usb1.USBErrorBusy as e:
|
||||
raise DeviceIsBusy(self.device) from e
|
||||
|
||||
def close(self) -> None:
|
||||
if self.handle is not None:
|
||||
|
Loading…
Reference in New Issue
Block a user