From 99a9006d85198649c13cd37d9de0dd00111927c6 Mon Sep 17 00:00:00 2001 From: tychovrahe Date: Wed, 28 Jun 2023 00:45:27 +0200 Subject: [PATCH] disconnect --- python/src/trezorlib/cli/ble.py | 34 +++++++++++++++------------ python/src/trezorlib/transport/ble.py | 13 ++++++++++ 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/python/src/trezorlib/cli/ble.py b/python/src/trezorlib/cli/ble.py index d2b97245f..1a1411259 100644 --- a/python/src/trezorlib/cli/ble.py +++ b/python/src/trezorlib/cli/ble.py @@ -88,27 +88,31 @@ def connect() -> None: click.echo("Connected") +@with_client +def disconnect_device(client: "TrezorClient") -> None: + """Disconnect from device side.""" + try: + ble.disconnect(client) + except exceptions.Cancelled: + click.echo("Disconnect aborted on device.") + except exceptions.TrezorException as e: + click.echo(f"Disconnect failed: {e}") + sys.exit(3) + @cli.command() @click.option("--device", is_flag=True, help="Disconnect from device side.") -@with_client -def disconnect(client: "TrezorClient", device: bool) -> None: +def disconnect(device: bool) -> None: if device: - ble.disconnect(client) + disconnect_device() else: - """Connect to the device via BLE.""" - adapter = tealblue.TealBlue().find_adapter() - - devices = lookup_device(adapter) - - devices = [d for d in devices if d.connected] - + ble_proxy = BleProxy() + devices = [d for d in ble_proxy.lookup() if d.connected] if len(devices) == 0: - print("No device is connected") - - for d in devices: - d.disconnect() - print(f"Device {d.name}, {d.address}, disconnected.") + click.echo("No BLE devices found") + return + ble_proxy.connect(devices[0].address) + ble_proxy.disconnect() @cli.command() diff --git a/python/src/trezorlib/transport/ble.py b/python/src/trezorlib/transport/ble.py index c8a7415f1..54c24874f 100644 --- a/python/src/trezorlib/transport/ble.py +++ b/python/src/trezorlib/transport/ble.py @@ -213,6 +213,19 @@ class BleAsync: ].acquire() self.current = address + async def disconnect(self): + if self.current is None: + return + ble_device = self.devices[self.current] + if ble_device.connected: + LOG.info("Disconnecting from %s (%s)..." % (ble_device.name, ble_device.address)) + await ble_device.disconnect() + else: + LOG.info("Disconnected from %s (%s)." % (ble_device.name, ble_device.address)) + self.current = None + self.rx = None + self.tx = None + async def read(self): assert self.tx is not None await ready(self.tx)