1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-01-22 13:21:03 +00:00

Merge pull request #226 from matejcik/refactor-transport-nicediff

prefix search for `trezorctl -p`
This commit is contained in:
matejcik 2018-03-06 15:50:05 +01:00 committed by GitHub
commit e1e419485f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 224 additions and 319 deletions

View File

@ -33,6 +33,7 @@ setup(
url='https://github.com/trezor/python-trezor',
packages=[
'trezorlib',
'trezorlib.transport',
'trezorlib.messages',
'trezorlib.qt',
'trezorlib.tests.device_tests',

View File

@ -16,21 +16,15 @@ import hashlib
import binascii
from trezorlib.client import TrezorClient
from trezorlib.device import TrezorDevice
# Python2 vs Python3
try:
input = raw_input
except NameError:
pass
from trezorlib.transport import enumerate_devices
def wait_for_devices():
devices = TrezorDevice.enumerate()
devices = enumerate_devices()
while not len(devices):
sys.stderr.write("Please connect TREZOR to computer and press Enter...")
input()
devices = TrezorDevice.enumerate()
devices = enumerate_devices()
return devices

View File

@ -1,21 +1,11 @@
#!/usr/bin/env python3
from __future__ import print_function
from trezorlib.client import TrezorClient
from trezorlib.device import TrezorDevice
from trezorlib.transport import get_transport
def main():
# List all connected TREZORs on USB/UDP
devices = TrezorDevice.enumerate()
# Check whether we found any
if len(devices) == 0:
print('No TREZOR found')
return
# Use first connected device
transport = devices[0]
transport = get_transport()
# Creates object for manipulating TREZOR
client = TrezorClient(transport)

View File

@ -1,9 +1,7 @@
#!/usr/bin/env python3
from __future__ import print_function
from trezorlib.debuglink import DebugLink
from trezorlib.client import TrezorClient
from trezorlib.transport_hid import HidTransport
from trezorlib.transport import enumerate_devices
import binascii
import sys
@ -16,8 +14,8 @@ sectorlens = [0x4000, 0x4000, 0x4000, 0x4000,
def main():
# List all connected TREZORs on USB
devices = HidTransport.enumerate()
# List all debuggable TREZORs
devices = [device for device in enumerate_devices() if hasattr(device, 'find_debug')]
# Check whether we found any
if len(devices) == 0:

View File

@ -1,9 +1,7 @@
#!/usr/bin/env python3
from __future__ import print_function
from trezorlib.debuglink import DebugLink
from trezorlib.client import TrezorClient
from trezorlib.transport_hid import HidTransport
from trezorlib.transport import enumerate_devices
import sys
# usage examples
@ -16,8 +14,8 @@ import sys
def main():
# List all connected TREZORs on USB
devices = HidTransport.enumerate()
# List all debuggable TREZORs
devices = [device for device in enumerate_devices() if hasattr(device, 'find_debug')]
# Check whether we found any
if len(devices) == 0:

View File

@ -1,16 +1,14 @@
#!/usr/bin/env python3
from __future__ import print_function
from trezorlib.debuglink import DebugLink
from trezorlib.client import TrezorClient
from trezorlib.transport_hid import HidTransport
from trezorlib.transport import enumerate_devices
import binascii
import sys
def main():
# List all connected TREZORs on USB
devices = HidTransport.enumerate()
# List all debuggable TREZORs
devices = [device for device in enumerate_devices() if hasattr(device, 'find_debug')]
# Check whether we found any
if len(devices) == 0:

View File

@ -1,5 +1,4 @@
#!/usr/bin/env python3
from __future__ import print_function
import binascii
import hashlib
import mnemonic
@ -16,12 +15,6 @@ __doc__ = '''
without an internet connection).
'''
# Python2 vs Python3
try:
input = raw_input
except NameError:
pass
def generate_entropy(strength, internal_entropy, external_entropy):
'''

View File

@ -1,7 +1,4 @@
#!/usr/bin/env python3
from __future__ import print_function
from binascii import hexlify, unhexlify
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
@ -9,14 +6,10 @@ import hmac
import hashlib
import json
import os
try:
from urllib.parse import urlparse
except:
from urlparse import urlparse
input = raw_input
from urllib.parse import urlparse
from trezorlib.client import TrezorClient
from trezorlib.transport_hid import HidTransport
from trezorlib.transport import get_transport
# Return path by BIP-32
@ -132,12 +125,13 @@ def printEntries(entries):
def main():
devices = HidTransport.enumerate()
if not devices:
print('TREZOR is not plugged in. Please, connect TREZOR and retry.')
try:
transport = get_transport()
except Exception as e:
print(e)
return
client = TrezorClient(devices[0])
client = TrezorClient(transport)
print()
print('Confirm operation on TREZOR')

View File

@ -8,21 +8,14 @@ from __future__ import print_function
import io
import sys
from trezorlib.client import TrezorClient
from trezorlib.device import TrezorDevice
def get_client():
devices = TrezorDevice.enumerate() # list all connected TREZORs on USB
if len(devices) == 0: # check whether we found any
return None
transport = devices[0] # use first connected device
return TrezorClient(transport) # creates object for communicating with TREZOR
from trezorlib.transport import get_transport
def main():
client = get_client()
if not client:
print('No TREZOR connected')
try:
client = TrezorClient(get_transport())
except Exception as e:
print(e)
return
arg1 = sys.argv[1] # output file

View File

@ -1,5 +1,4 @@
#!/usr/bin/env python
from __future__ import print_function
import binascii
import os
import random
@ -12,8 +11,7 @@ import hashlib
from trezorlib.client import TrezorClient
from trezorlib.tx_api import TxApiTestnet
from trezorlib.tx_api import TxApiBitcoin
from trezorlib.transport_hid import HidTransport
from trezorlib.transport_bridge import BridgeTransport
from trezorlib.transport import get_transport
def hash160(x):
@ -152,16 +150,13 @@ def main():
numinputs = 100
sizeinputtx = 10
# List all connected TREZORs on USB
devices = HidTransport.enumerate()
# Check whether we found any
if len(devices) == 0:
print('No TREZOR found')
# Use first connected device
try:
transport = get_transport()
except Exception as e:
print(e)
return
# Use first connected device
transport = devices[0]
print(transport)
txstore = MyTxApiBitcoin()

View File

@ -29,7 +29,7 @@ import os
import sys
from trezorlib.client import TrezorClient, TrezorClientVerbose, CallException, format_protobuf
from trezorlib.device import TrezorDevice
from trezorlib.transport import get_transport, enumerate_devices, TransportException
from trezorlib import messages as proto
from trezorlib import protobuf
from trezorlib.coins import coins_txapi
@ -72,9 +72,24 @@ CHOICE_OUTPUT_SCRIPT_TYPE = ChoiceType({
def cli(ctx, path, verbose, is_json):
if ctx.invoked_subcommand != 'list':
if verbose:
ctx.obj = lambda: TrezorClientVerbose(TrezorDevice.find_by_path(path))
cls = TrezorClientVerbose
else:
ctx.obj = lambda: TrezorClient(TrezorDevice.find_by_path(path))
cls = TrezorClient
def get_device():
try:
device = get_transport(path, prefix_search=False)
except:
try:
device = get_transport(path, prefix_search=True)
except:
click.echo("Failed to find a TREZOR device.")
if path is not None:
click.echo("Using path: {}".format(path))
sys.exit(1)
return cls(device)
ctx.obj = get_device
@cli.resultcallback()
@ -108,7 +123,7 @@ def print_result(res, path, verbose, is_json):
@cli.command(name='list', help='List connected TREZOR devices.')
def ls():
return TrezorDevice.enumerate()
return enumerate_devices()
@cli.command(help='Show version of trezorctl/trezorlib.')

View File

@ -16,53 +16,24 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
import warnings
from .transport_bridge import BridgeTransport
from .transport_hid import HidTransport
from .transport_udp import UdpTransport
from .transport_webusb import WebUsbTransport
from .transport import enumerate_devices, get_transport
class TrezorDevice(object):
class TrezorDevice:
'''
This class is deprecated. (There is no reason for it to exist in the first
place, it is nothing but a collection of two functions.)
Instead, please use functions from the ``trezorlib.transport`` module.
'''
@classmethod
def enumerate(cls):
devices = []
for d in BridgeTransport.enumerate():
devices.append(d)
for d in UdpTransport.enumerate():
devices.append(d)
for d in HidTransport.enumerate():
devices.append(d)
for d in WebUsbTransport.enumerate():
devices.append(d)
return devices
warnings.warn('TrezorDevice is deprecated.', DeprecationWarning)
return enumerate_devices()
@classmethod
def find_by_path(cls, path):
if path is None:
try:
return cls.enumerate()[0]
except IndexError:
raise Exception("No TREZOR device found")
prefix = path.split(':')[0]
if prefix == BridgeTransport.PATH_PREFIX:
return BridgeTransport.find_by_path(path)
if prefix == UdpTransport.PATH_PREFIX:
return UdpTransport.find_by_path(path)
if prefix == WebUsbTransport.PATH_PREFIX:
return WebUsbTransport.find_by_path(path)
if prefix == HidTransport.PATH_PREFIX:
return HidTransport.find_by_path(path)
raise Exception("Unknown path prefix '%s'" % prefix)
warnings.warn('TrezorDevice is deprecated.', DeprecationWarning)
return get_transport(path, prefix_search=False)

View File

@ -23,86 +23,23 @@ import pytest
import os
from trezorlib.client import TrezorClient, TrezorClientDebugLink
from trezorlib.transport import get_transport
from trezorlib import tx_api
tests_dir = os.path.dirname(os.path.abspath(__file__))
tx_api.cache_dir = os.path.join(tests_dir, '../txcache')
try:
from trezorlib.transport_hid import HidTransport
HID_ENABLED = True
except ImportError as e:
print('HID transport disabled:', e)
HID_ENABLED = False
try:
from trezorlib.transport_webusb import WebUsbTransport
WEBUSB_ENABLED = True
except ImportError as e:
print('WebUSB transport disabled:', e)
WEBUSB_ENABLED = False
try:
from trezorlib.transport_pipe import PipeTransport
PIPE_ENABLED = True
except ImportError as e:
print('PIPE transport disabled:', e)
PIPE_ENABLED = False
try:
from trezorlib.transport_udp import UdpTransport
UDP_ENABLED = True
except ImportError as e:
print('UDP transport disabled:', e)
UDP_ENABLED = False
def get_device():
path = os.environ.get('TREZOR_PATH')
return get_transport(path)
def pipe_exists(path):
import os
import stat
try:
return stat.S_ISFIFO(os.stat(path).st_mode)
except:
return False
def get_transport():
if HID_ENABLED and HidTransport.enumerate():
devices = HidTransport.enumerate()
wirelink = devices[0]
debuglink = devices[0].find_debug()
elif WEBUSB_ENABLED and WebUsbTransport.enumerate():
devices = WebUsbTransport.enumerate()
wirelink = devices[0]
debuglink = devices[0].find_debug()
elif PIPE_ENABLED and pipe_exists('/tmp/pipe.trezor.to'):
wirelink = PipeTransport('/tmp/pipe.trezor', False)
debuglink = PipeTransport('/tmp/pipe.trezor_debug', False)
elif UDP_ENABLED:
wirelink = UdpTransport('127.0.0.1:21324')
debuglink = UdpTransport('127.0.0.1:21325')
return wirelink, debuglink
if HID_ENABLED and HidTransport.enumerate():
print('Using TREZOR')
elif WEBUSB_ENABLED and WebUsbTransport.enumerate():
print('Using TREZOR via WebUSB')
elif PIPE_ENABLED and pipe_exists('/tmp/pipe.trezor.to'):
print('Using Emulator (v1=pipe)')
elif UDP_ENABLED:
print('Using Emulator (v2=udp)')
class TrezorTest(object):
class TrezorTest:
def setup_method(self, method):
wirelink, debuglink = get_transport()
wirelink = get_device()
debuglink = wirelink.find_debug()
self.client = TrezorClientDebugLink(wirelink)
self.client.set_debuglink(debuglink)
self.client.set_tx_api(tx_api.TxApiBitcoin)

View File

@ -1,46 +0,0 @@
# This file is part of the TREZOR project.
#
# Copyright (C) 2012-2016 Marek Palatinus <slush@satoshilabs.com>
# Copyright (C) 2012-2016 Pavol Rusnak <stick@satoshilabs.com>
# Copyright (C) 2016 Jochen Hoenicke <hoenicke@gmail.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import
class TransportException(Exception):
pass
class Transport(object):
def __init__(self):
self.session_counter = 0
def session_begin(self):
if self.session_counter == 0:
self.open()
self.session_counter += 1
def session_end(self):
self.session_counter = max(self.session_counter - 1, 0)
if self.session_counter == 0:
self.close()
def open(self):
raise NotImplementedError
def close(self):
raise NotImplementedError

View File

@ -0,0 +1,97 @@
# This file is part of the TREZOR project.
#
# Copyright (C) 2012-2016 Marek Palatinus <slush@satoshilabs.com>
# Copyright (C) 2012-2016 Pavol Rusnak <stick@satoshilabs.com>
# Copyright (C) 2016 Jochen Hoenicke <hoenicke@gmail.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
class TransportException(Exception):
pass
class Transport(object):
def __init__(self):
self.session_counter = 0
def __str__(self):
return self.get_path()
def get_path(self):
return '{}:{}'.format(self.PATH_PREFIX, self.device)
def session_begin(self):
if self.session_counter == 0:
self.open()
self.session_counter += 1
def session_end(self):
self.session_counter = max(self.session_counter - 1, 0)
if self.session_counter == 0:
self.close()
def open(self):
raise NotImplementedError
def close(self):
raise NotImplementedError
@classmethod
def enumerate(cls):
raise NotImplementedError
@classmethod
def find_by_path(cls, path, prefix_search=False):
for device in cls.enumerate():
if path is None or device.get_path() == path \
or (prefix_search and device.get_path().startswith(path)):
return device
raise TransportException('{} device not found: {}'.format(cls.PATH_PREFIX, path))
def all_transports():
from .bridge import BridgeTransport
from .hid import HidTransport
from .udp import UdpTransport
from .webusb import WebUsbTransport
return (BridgeTransport, HidTransport, UdpTransport, WebUsbTransport)
def enumerate_devices():
return [device
for transport in all_transports()
for device in transport.enumerate()]
def get_transport(path=None, prefix_search=False):
if path is None:
try:
return enumerate_devices()[0]
except IndexError:
raise Exception("No TREZOR device found") from None
# Find whether B is prefix of A (transport name is part of the path)
# or A is prefix of B (path is a prefix, or a name, of transport).
# This naively expects that no two transports have a common prefix.
def match_prefix(a, b):
return a.startswith(b) or b.startswith(a)
transports = [t for t in all_transports() if match_prefix(path, t.PATH_PREFIX)]
if transports:
return transports[0].find_by_path(path, prefix_search=prefix_search)
raise Exception("Unknown path prefix '%s'" % prefix)

View File

@ -17,17 +17,15 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import
import requests
import binascii
from io import BytesIO
import struct
from . import mapping
from . import messages
from . import protobuf
from .transport import Transport, TransportException
from .. import mapping
from .. import messages
from .. import protobuf
from . import Transport, TransportException
TREZORD_HOST = 'http://127.0.0.1:21325'
@ -45,16 +43,13 @@ class BridgeTransport(Transport):
HEADERS = {'Origin': 'https://python.trezor.io'}
def __init__(self, device):
super(BridgeTransport, self).__init__()
super().__init__()
self.device = device
self.conn = requests.Session()
self.session = None
self.response = None
def __str__(self):
return self.get_path()
def get_path(self):
return '%s:%s' % (self.PATH_PREFIX, self.device['path'])
@ -68,17 +63,6 @@ class BridgeTransport(Transport):
except:
return []
@classmethod
def find_by_path(cls, path):
if isinstance(path, bytes):
path = path.decode()
path = path.replace('%s:' % cls.PATH_PREFIX, '')
for transport in BridgeTransport.enumerate():
if path is None or transport.device['path'] == path:
return transport
raise TransportException('Bridge device not found')
def open(self):
r = self.conn.post(TREZORD_HOST + '/acquire/%s/null' % self.device['path'], headers=self.HEADERS)
if r.status_code != 200:

View File

@ -16,22 +16,20 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import
import time
import hid
import os
from .protocol_v1 import ProtocolV1
from .protocol_v2 import ProtocolV2
from .transport import Transport, TransportException
from ..protocol_v1 import ProtocolV1
from ..protocol_v2 import ProtocolV2
from . import Transport, TransportException
DEV_TREZOR1 = (0x534c, 0x0001)
DEV_TREZOR2 = (0x1209, 0x53c1)
DEV_TREZOR2_BL = (0x1209, 0x53c0)
class HidHandle(object):
class HidHandle:
def __init__(self, path):
self.path = path
@ -79,9 +77,6 @@ class HidTransport(Transport):
self.hid = hid_handle
self.hid_version = None
def __str__(self):
return self.get_path()
def get_path(self):
return "%s:%s" % (self.PATH_PREFIX, self.device['path'].decode())
@ -100,17 +95,6 @@ class HidTransport(Transport):
devices.append(HidTransport(dev))
return devices
@classmethod
def find_by_path(cls, path):
if isinstance(path, str):
path = path.encode()
path = path.replace(b'%s:' % cls.PATH_PREFIX.encode(), b'')
for transport in HidTransport.enumerate():
if path is None or transport.device['path'] == path:
return transport
raise TransportException('HID device not found')
def find_debug(self):
if isinstance(self.protocol, ProtocolV2):
# For v2 protocol, lets use the same HID interface, but with a different session

View File

@ -16,14 +16,12 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import
import os
import socket
from .protocol_v1 import ProtocolV1
from .protocol_v2 import ProtocolV2
from .transport import Transport, TransportException
from ..protocol_v1 import ProtocolV1
from ..protocol_v2 import ProtocolV2
from . import Transport, TransportException
class UdpTransport(Transport):
@ -48,26 +46,41 @@ class UdpTransport(Transport):
self.protocol = protocol
self.socket = None
def __str__(self):
return self.get_path()
def get_path(self):
return "%s:%s:%s" % ((self.PATH_PREFIX,) + self.device)
@staticmethod
def enumerate():
devices = []
d = UdpTransport("%s:%d" % (UdpTransport.DEFAULT_HOST, UdpTransport.DEFAULT_PORT))
d.open()
if d._ping():
devices.append(d)
d.close()
return devices
def find_debug(self):
host, port = self.device
return UdpTransport('{}:{}'.format(host, port + 1), self.protocol)
@classmethod
def find_by_path(cls, path):
path = path.replace('%s:' % cls.PATH_PREFIX, '')
return UdpTransport(path)
def _try_path(cls, path):
d = cls(path)
try:
d.open()
if d._ping():
return d
else:
raise TransportException('No TREZOR device found at address {}'.format(path))
finally:
d.close()
@classmethod
def enumerate(cls):
devices = []
default_path = '{}:{}'.format(cls.DEFAULT_HOST, cls.DEFAULT_PORT)
try:
return [cls._try_path(default_path)]
except TransportException:
return []
@classmethod
def find_by_path(cls, path, prefix_search=False):
if prefix_search:
return super().find_by_path(path, prefix_search)
else:
path = path.replace('{}:'.format(cls.PATH_PREFIX), '')
return cls._try_path(path)
def open(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

View File

@ -16,16 +16,14 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import
import time
import os
import atexit
import usb1
from .protocol_v1 import ProtocolV1
from .protocol_v2 import ProtocolV2
from .transport import Transport, TransportException
from ..protocol_v1 import ProtocolV1
from ..protocol_v2 import ProtocolV2
from . import Transport, TransportException
DEV_TREZOR1 = (0x534c, 0x0001)
DEV_TREZOR2 = (0x1209, 0x53c1)
@ -37,7 +35,7 @@ DEBUG_INTERFACE = 1
DEBUG_ENDPOINT = 2
class WebUsbHandle(object):
class WebUsbHandle:
def __init__(self, device):
self.device = device
@ -88,9 +86,6 @@ class WebUsbTransport(Transport):
self.handle = handle
self.debug = debug
def __str__(self):
return self.get_path()
def get_path(self):
return "%s:%s" % (self.PATH_PREFIX, dev_to_str(self.device))
@ -106,17 +101,18 @@ class WebUsbTransport(Transport):
continue
if not is_vendor_class(dev):
continue
devices.append(WebUsbTransport(dev))
try:
# workaround for issue #223:
# on certain combinations of Windows USB drivers and libusb versions,
# Trezor is returned twice (possibly because Windows know it as both
# a HID and a WebUSB device), and one of the returned devices is
# non-functional.
dev.getProduct()
devices.append(WebUsbTransport(dev))
except usb1.USBErrorNotSupported:
pass
return devices
@classmethod
def find_by_path(cls, path):
path = path.replace('%s:' % cls.PATH_PREFIX, '') # Remove prefix from __str__()
for transport in WebUsbTransport.enumerate():
if path is None or dev_to_str(transport.device) == path:
return transport
raise TransportException('WebUSB device not found')
def find_debug(self):
if isinstance(self.protocol, ProtocolV2):
# TODO test this