1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-01-10 15:30:55 +00:00

Preparation for python3 support (WIP)

This commit is contained in:
slush0 2016-05-05 03:16:17 +02:00 committed by Pavol Rusnak
parent 79a64abd24
commit 6ec2ff3eac
No known key found for this signature in database
GPG Key ID: 91F3B339B9A02A3D
22 changed files with 56 additions and 57 deletions

View File

@ -24,8 +24,8 @@ class TrezorTest(unittest.TestCase):
self.client.wipe_device() self.client.wipe_device()
print "Setup finished" print("Setup finished")
print "--------------" print("--------------")
def setup_mnemonic_nopin_nopassphrase(self): def setup_mnemonic_nopin_nopassphrase(self):
self.client.load_device_by_mnemonic(mnemonic=self.mnemonic12, pin='', passphrase_protection=False, label='test', language='english') self.client.load_device_by_mnemonic(mnemonic=self.mnemonic12, pin='', passphrase_protection=False, label='test', language='english')

View File

@ -10,7 +10,7 @@ devices = HidTransport.enumerate()
if len(devices) > 0: if len(devices) > 0:
if devices[0][1] != None: if devices[0][1] != None:
print 'Using TREZOR' print('Using TREZOR')
TRANSPORT = HidTransport TRANSPORT = HidTransport
TRANSPORT_ARGS = (devices[0],) TRANSPORT_ARGS = (devices[0],)
TRANSPORT_KWARGS = {'debug_link': False} TRANSPORT_KWARGS = {'debug_link': False}
@ -18,7 +18,7 @@ if len(devices) > 0:
DEBUG_TRANSPORT_ARGS = (devices[0],) DEBUG_TRANSPORT_ARGS = (devices[0],)
DEBUG_TRANSPORT_KWARGS = {'debug_link': True} DEBUG_TRANSPORT_KWARGS = {'debug_link': True}
else: else:
print 'Using Raspberry Pi' print('Using Raspberry Pi')
TRANSPORT = HidTransport TRANSPORT = HidTransport
TRANSPORT_ARGS = (devices[0],) TRANSPORT_ARGS = (devices[0],)
TRANSPORT_KWARGS = {'debug_link': False} TRANSPORT_KWARGS = {'debug_link': False}
@ -26,7 +26,7 @@ if len(devices) > 0:
DEBUG_TRANSPORT_ARGS = ('trezor.bo:2000',) DEBUG_TRANSPORT_ARGS = ('trezor.bo:2000',)
DEBUG_TRANSPORT_KWARGS = {} DEBUG_TRANSPORT_KWARGS = {}
else: else:
print 'Using Emulator' print('Using Emulator')
TRANSPORT = PipeTransport TRANSPORT = PipeTransport
TRANSPORT_ARGS = ('/tmp/pipe.trezor', False) TRANSPORT_ARGS = ('/tmp/pipe.trezor', False)
TRANSPORT_KWARGS = {} TRANSPORT_KWARGS = {}

View File

@ -15,7 +15,7 @@ class TestBip32Speed(common.TrezorTest):
self.client.get_address('Bitcoin', range(depth)) self.client.get_address('Bitcoin', range(depth))
delay = time.time() - start delay = time.time() - start
expected = (depth + 1) * 0.26 expected = (depth + 1) * 0.26
print "DEPTH", depth, "EXPECTED DELAY", expected, "REAL DELAY", delay print("DEPTH", depth, "EXPECTED DELAY", expected, "REAL DELAY", delay)
self.assertLessEqual(delay, expected) self.assertLessEqual(delay, expected)
def test_private_ckd(self): def test_private_ckd(self):
@ -28,7 +28,7 @@ class TestBip32Speed(common.TrezorTest):
self.client.get_address('Bitcoin', range(-depth, 0)) self.client.get_address('Bitcoin', range(-depth, 0))
delay = time.time() - start delay = time.time() - start
expected = (depth + 1) * 0.26 expected = (depth + 1) * 0.26
print "DEPTH", depth, "EXPECTED DELAY", expected, "REAL DELAY", delay print("DEPTH", depth, "EXPECTED DELAY", expected, "REAL DELAY", delay)
self.assertLessEqual(delay, expected) self.assertLessEqual(delay, expected)
def test_cache(self): def test_cache(self):
@ -44,8 +44,8 @@ class TestBip32Speed(common.TrezorTest):
self.client.get_address('Bitcoin', [1, 2, 3, 4, 5, 6, 7, x]) self.client.get_address('Bitcoin', [1, 2, 3, 4, 5, 6, 7, x])
cache_time = time.time() - start cache_time = time.time() - start
print "NOCACHE TIME", nocache_time print("NOCACHE TIME", nocache_time)
print "CACHED TIME", cache_time print("CACHED TIME", cache_time)
# Cached time expected to be at least 2x faster # Cached time expected to be at least 2x faster
self.assertLessEqual(cache_time, nocache_time / 2.) self.assertLessEqual(cache_time, nocache_time / 2.)

View File

@ -14,14 +14,14 @@ def test_ecies_backforth(cls, test_string):
# encrypt without signature # encrypt without signature
enc = cls.client.encrypt_message(pubkey, test_string, display_only=False, coin_name='Bitcoin', n=[]) enc = cls.client.encrypt_message(pubkey, test_string, display_only=False, coin_name='Bitcoin', n=[])
print 'base64:', base64.b64encode(enc.nonce + enc.message + enc.hmac) print('base64:', base64.b64encode(enc.nonce + enc.message + enc.hmac))
dec = cls.client.decrypt_message([1], enc.nonce, enc.message, enc.hmac) dec = cls.client.decrypt_message([1], enc.nonce, enc.message, enc.hmac)
cls.assertEqual(dec.message, test_string) cls.assertEqual(dec.message, test_string)
cls.assertEqual(dec.address, '') cls.assertEqual(dec.address, '')
# encrypt with signature # encrypt with signature
enc = cls.client.encrypt_message(pubkey, test_string, display_only=False, coin_name='Bitcoin', n=[5]) enc = cls.client.encrypt_message(pubkey, test_string, display_only=False, coin_name='Bitcoin', n=[5])
print 'base64:', base64.b64encode(enc.nonce + enc.message + enc.hmac) print('base64:', base64.b64encode(enc.nonce + enc.message + enc.hmac))
dec = cls.client.decrypt_message([1], enc.nonce, enc.message, enc.hmac) dec = cls.client.decrypt_message([1], enc.nonce, enc.message, enc.hmac)
cls.assertEqual(dec.message, test_string) cls.assertEqual(dec.message, test_string)
cls.assertEqual(dec.address, '1Csf6LVPkv24FBs6bpj4ELPszE6mGf6jeV') cls.assertEqual(dec.address, '1Csf6LVPkv24FBs6bpj4ELPszE6mGf6jeV')

View File

@ -26,7 +26,7 @@ class TestMsgGetentropy(common.TrezorTest):
self.client.set_expected_responses([proto.ButtonRequest(code=proto_types.ButtonRequest_ProtectCall), proto.Entropy()]) self.client.set_expected_responses([proto.ButtonRequest(code=proto_types.ButtonRequest_ProtectCall), proto.Entropy()])
ent = self.client.get_entropy(l) ent = self.client.get_entropy(l)
self.assertTrue(len(ent) >= l) self.assertTrue(len(ent) >= l)
print 'entropy = ', entropy(ent) print('entropy = ', entropy(ent))
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -36,7 +36,7 @@ class TestDeviceRecovery(common.TrezorTest):
ret = self.client.call_raw(proto.WordAck(word=word)) ret = self.client.call_raw(proto.WordAck(word=word))
fakes += 1 fakes += 1
print mnemonic print(mnemonic)
# Workflow succesfully ended # Workflow succesfully ended
self.assertIsInstance(ret, proto.Success) self.assertIsInstance(ret, proto.Success)
@ -83,7 +83,7 @@ class TestDeviceRecovery(common.TrezorTest):
ret = self.client.call_raw(proto.WordAck(word=word)) ret = self.client.call_raw(proto.WordAck(word=word))
fakes += 1 fakes += 1
print mnemonic print(mnemonic)
# Workflow succesfully ended # Workflow succesfully ended
self.assertIsInstance(ret, proto.Success) self.assertIsInstance(ret, proto.Success)

View File

@ -17,10 +17,10 @@ def check_path(identity):
if identity.port: uri += ':' + identity.port if identity.port: uri += ':' + identity.port
if identity.path: uri += identity.path if identity.path: uri += identity.path
m.update(uri) m.update(uri)
print 'hash:', m.hexdigest() print('hash:', m.hexdigest())
(a, b, c, d, _, _, _, _) = struct.unpack('<8I', m.digest()) (a, b, c, d, _, _, _, _) = struct.unpack('<8I', m.digest())
address_n = [0x80000000 | 13, 0x80000000 | a, 0x80000000 | b, 0x80000000 | c, 0x80000000 | d] address_n = [0x80000000 | 13, 0x80000000 | a, 0x80000000 | b, 0x80000000 | c, 0x80000000 | d]
print 'path:', 'm/' + '/'.join([str(x) for x in address_n]) print('path:', 'm/' + '/'.join([str(x) for x in address_n]))
class TestMsgSignidentity(common.TrezorTest): class TestMsgSignidentity(common.TrezorTest):

View File

@ -100,7 +100,7 @@ class TestProtectCall(common.TrezorTest):
got = time.time() - start got = time.time() - start
msg = "Pin delay expected to be at least %s seconds, got %s" % (expected, got) msg = "Pin delay expected to be at least %s seconds, got %s" % (expected, got)
print msg print(msg)
self.assertLessEqual(expected, got, msg) self.assertLessEqual(expected, got, msg)
for attempt in range(1, 6): for attempt in range(1, 6):

View File

@ -7,8 +7,8 @@ from ecdsa.util import string_to_number, number_to_string
from ecdsa.curves import SECP256k1 from ecdsa.curves import SECP256k1
from ecdsa.ellipticcurve import Point, INFINITY from ecdsa.ellipticcurve import Point, INFINITY
import tools from . import tools
import types_pb2 as proto_types from . import types_pb2 as proto_types
PRIME_DERIVATION_FLAG = 0x80000000 PRIME_DERIVATION_FLAG = 0x80000000

View File

@ -4,15 +4,15 @@ import time
import binascii import binascii
import hashlib import hashlib
import unicodedata import unicodedata
import mapping
import json import json
import getpass import getpass
import tools import tools
import mapping
import messages_pb2 as proto import messages_pb2 as proto
import types_pb2 as types import types_pb2 as types
import protobuf_json import protobuf_json
from trezorlib.debuglink import DebugLink from debuglink import DebugLink
from mnemonic import Mnemonic from mnemonic import Mnemonic

View File

@ -1,11 +1,11 @@
import messages_pb2 as proto from . import messages_pb2 as proto
from transport import NotImplementedException from .transport import NotImplementedException
def pin_info(pin): def pin_info(pin):
print "Device asks for PIN %s" % pin print("Device asks for PIN %s" % pin)
def button_press(yes_no): def button_press(yes_no):
print "User pressed", '"y"' if yes_no else '"n"' print("User pressed", '"y"' if yes_no else '"n"')
def pprint(msg): def pprint(msg):
return "<%s> (%d bytes):\n%s" % (msg.__class__.__name__, msg.ByteSize(), msg) return "<%s> (%d bytes):\n%s" % (msg.__class__.__name__, msg.ByteSize(), msg)
@ -21,18 +21,18 @@ class DebugLink(object):
self.transport.close() self.transport.close()
def _call(self, msg, nowait=False): def _call(self, msg, nowait=False):
print "DEBUGLINK SEND", pprint(msg) print("DEBUGLINK SEND", pprint(msg))
self.transport.write(msg) self.transport.write(msg)
if nowait: if nowait:
return return
ret = self.transport.read_blocking() ret = self.transport.read_blocking()
print "DEBUGLINK RECV", pprint(ret) print("DEBUGLINK RECV", pprint(ret))
return ret return ret
def read_pin(self): def read_pin(self):
obj = self._call(proto.DebugLinkGetState()) obj = self._call(proto.DebugLinkGetState())
print "Read PIN:", obj.pin print("Read PIN:", obj.pin)
print "Read matrix:", obj.matrix print("Read matrix:", obj.matrix)
return (obj.pin, obj.matrix) return (obj.pin, obj.matrix)
@ -51,7 +51,7 @@ class DebugLink(object):
# on keypad, not a real PIN. # on keypad, not a real PIN.
pin_encoded = ''.join([ str(matrix.index(p) + 1) for p in pin]) pin_encoded = ''.join([ str(matrix.index(p) + 1) for p in pin])
print "Encoded PIN:", pin_encoded print("Encoded PIN:", pin_encoded)
return pin_encoded return pin_encoded
def read_layout(self): def read_layout(self):
@ -83,7 +83,7 @@ class DebugLink(object):
return obj.passphrase_protection return obj.passphrase_protection
def press_button(self, yes_no): def press_button(self, yes_no):
print "Pressing", yes_no print("Pressing", yes_no)
self.button_func(yes_no) self.button_func(yes_no)
self._call(proto.DebugLinkDecision(yes_no=yes_no), nowait=True) self._call(proto.DebugLinkDecision(yes_no=yes_no), nowait=True)

View File

@ -1,4 +1,4 @@
import messages_pb2 as proto from . import messages_pb2 as proto
map_type_to_class = {} map_type_to_class = {}
map_class_to_type = {} map_class_to_type = {}

View File

@ -44,7 +44,7 @@ __author__='Paul Dovbush <dpp@dpp.su>'
import json import json
from google.protobuf.descriptor import FieldDescriptor as FD from google.protobuf.descriptor import FieldDescriptor as FD
import binascii import binascii
import types_pb2 as types from . import types_pb2 as types
class ParseError(Exception): pass class ParseError(Exception): pass

View File

@ -1,5 +1,5 @@
import struct import struct
import mapping from . import mapping
class NotImplementedException(Exception): class NotImplementedException(Exception):
pass pass
@ -115,7 +115,6 @@ class Transport(object):
if i >= 64: if i >= 64:
# timeout # timeout
raise Exception("Timed out while waiting for the magic character") raise Exception("Timed out while waiting for the magic character")
#print "Aligning to magic characters"
c = read_f.read(1) c = read_f.read(1)
if read_f.read(1) != "#": if read_f.read(1) != "#":

View File

@ -1,11 +1,11 @@
'''BridgeTransport implements transport TREZOR Bridge (aka trezord).''' '''BridgeTransport implements transport TREZOR Bridge (aka trezord).'''
import requests import requests
import protobuf_json
import json import json
import mapping from . import protobuf_json
from transport import Transport from . import mapping
import messages_pb2 as proto from . import messages_pb2 as proto
from .transport import Transport
TREZORD_HOST = 'https://localback.net:21324' TREZORD_HOST = 'https://localback.net:21324'
CONFIG_URL = 'https://mytrezor.com/data/plugin/config_signed.bin' CONFIG_URL = 'https://mytrezor.com/data/plugin/config_signed.bin'

View File

@ -2,7 +2,7 @@
# Local serial port loopback: socat PTY,link=COM8 PTY,link=COM9 # Local serial port loopback: socat PTY,link=COM8 PTY,link=COM9
from transport import Transport, NotImplementedException from .transport import Transport, NotImplementedException
class FakeTransport(Transport): class FakeTransport(Transport):
def __init__(self, device, *args, **kwargs): def __init__(self, device, *args, **kwargs):

View File

@ -2,7 +2,7 @@
import hid import hid
import time import time
from transport import Transport, ConnectionError from .transport import Transport, ConnectionError
DEVICE_IDS = [ DEVICE_IDS = [
# (0x10c4, 0xea80), # TREZOR Shield # (0x10c4, 0xea80), # TREZOR Shield

View File

@ -3,7 +3,7 @@ Use this transport for talking with trezor simulator.'''
import os import os
from select import select from select import select
from transport import Transport from .transport import Transport
class PipeTransport(Transport): class PipeTransport(Transport):
def __init__(self, device, is_device, *args, **kwargs): def __init__(self, device, is_device, *args, **kwargs):
@ -16,8 +16,8 @@ class PipeTransport(Transport):
self.filename_read = self.device+'.to' self.filename_read = self.device+'.to'
self.filename_write = self.device+'.from' self.filename_write = self.device+'.from'
os.mkfifo(self.filename_read, 0600) os.mkfifo(self.filename_read, 0o600)
os.mkfifo(self.filename_write, 0600) os.mkfifo(self.filename_write, 0o600)
else: else:
self.filename_read = self.device+'.from' self.filename_read = self.device+'.from'
self.filename_write = self.device+'.to' self.filename_write = self.device+'.to'
@ -47,7 +47,7 @@ class PipeTransport(Transport):
self.write_f.write(msg) self.write_f.write(msg)
self.write_f.flush() self.write_f.flush()
except OSError: except OSError:
print "Error while writing to socket" print("Error while writing to socket")
raise raise
def _read(self): def _read(self):
@ -55,5 +55,5 @@ class PipeTransport(Transport):
(msg_type, datalen) = self._read_headers(self.read_f) (msg_type, datalen) = self._read_headers(self.read_f)
return (msg_type, self.read_f.read(datalen)) return (msg_type, self.read_f.read(datalen))
except IOError: except IOError:
print "Failed to read from device" print("Failed to read from device")
raise raise

View File

@ -4,7 +4,7 @@
import serial import serial
from select import select from select import select
from transport import Transport from .transport import Transport
class SerialTransport(Transport): class SerialTransport(Transport):
def __init__(self, device, *args, **kwargs): def __init__(self, device, *args, **kwargs):
@ -27,7 +27,7 @@ class SerialTransport(Transport):
self.serial.write(msg) self.serial.write(msg)
self.serial.flush() self.serial.flush()
except serial.SerialException: except serial.SerialException:
print "Error while writing to socket" print("Error while writing to socket")
raise raise
def _read(self): def _read(self):
@ -35,5 +35,5 @@ class SerialTransport(Transport):
(msg_type, datalen) = self._read_headers(self.serial) (msg_type, datalen) = self._read_headers(self.serial)
return (msg_type, self.serial.read(datalen)) return (msg_type, self.serial.read(datalen))
except serial.SerialException: except serial.SerialException:
print "Failed to read from device" print("Failed to read from device")
raise raise

View File

@ -2,7 +2,7 @@
import socket import socket
from select import select from select import select
from transport import Transport from .transport import Transport
class SocketTransportClient(Transport): class SocketTransportClient(Transport):
def __init__(self, device, *args, **kwargs): def __init__(self, device, *args, **kwargs):
@ -37,7 +37,7 @@ class SocketTransportClient(Transport):
(msg_type, datalen) = self._read_headers(self.filelike) (msg_type, datalen) = self._read_headers(self.filelike)
return (msg_type, self.filelike.read(datalen)) return (msg_type, self.filelike.read(datalen))
except socket.error: except socket.error:
print "Failed to read from device" print("Failed to read from device")
return None return None
class SocketTransport(Transport): class SocketTransport(Transport):
@ -63,7 +63,7 @@ class SocketTransport(Transport):
self.socket.listen(5) self.socket.listen(5)
def _disconnect_client(self): def _disconnect_client(self):
print "Disconnecting client" print("Disconnecting client")
if self.client != None: if self.client != None:
self.client.close() self.client.close()
self.client = None self.client = None
@ -84,7 +84,7 @@ class SocketTransport(Transport):
rlist, _, _ = select([self.socket], [], [], 0) rlist, _, _ = select([self.socket], [], [], 0)
if len(rlist) > 0: if len(rlist) > 0:
(self.client, ipaddr) = self.socket.accept() (self.client, ipaddr) = self.socket.accept()
print "Connected", ipaddr[0] print("Connected", ipaddr[0])
self.filelike = self.client.makefile() self.filelike = self.client.makefile()
return self.ready_to_read() return self.ready_to_read()
return False return False
@ -97,7 +97,7 @@ class SocketTransport(Transport):
self.filelike.write(msg) self.filelike.write(msg)
self.filelike.flush() self.filelike.flush()
except socket.error: except socket.error:
print "Socket error" print("Socket error")
self._disconnect_client() self._disconnect_client()
def _read(self): def _read(self):
@ -105,6 +105,6 @@ class SocketTransport(Transport):
(msg_type, datalen) = self._read_headers(self.filelike) (msg_type, datalen) = self._read_headers(self.filelike)
return (msg_type, self.filelike.read(datalen)) return (msg_type, self.filelike.read(datalen))
except Exception: except Exception:
print "Failed to read from device" print("Failed to read from device")
self._disconnect_client() self._disconnect_client()
return None return None

View File

@ -3,7 +3,7 @@
import socket import socket
from select import select from select import select
import time import time
from transport import Transport, ConnectionError from .transport import Transport, ConnectionError
class FakeRead(object): class FakeRead(object):
# Let's pretend we have a file-like interface # Let's pretend we have a file-like interface

View File

@ -3,7 +3,7 @@ import urllib2
import json import json
from decimal import Decimal from decimal import Decimal
# from filecache import filecache, DAY # from filecache import filecache, DAY
import types_pb2 as proto_types from . import types_pb2 as proto_types
def insight_tx(url, rawdata=False): def insight_tx(url, rawdata=False):
if not rawdata: if not rawdata: