mirror of
https://github.com/trezor/trezor-firmware.git
synced 2025-01-15 01:40:57 +00:00
Implemented debuglink connection
This commit is contained in:
parent
0c81a7d8ec
commit
37b75d1bc9
@ -1,7 +1,5 @@
|
||||
import os
|
||||
|
||||
import bitkey_pb2 as proto
|
||||
from transport_pipe import PipeTransport
|
||||
|
||||
def show_message(message):
|
||||
print "MESSAGE FROM DEVICE:", message
|
||||
@ -13,46 +11,58 @@ def show_input(input_text, message=None):
|
||||
|
||||
class BitkeyClient(object):
|
||||
|
||||
def __init__(self, path, message_func=show_message, input_func=show_input, debug=False):
|
||||
def __init__(self, transport, debuglink=None, message_func=show_message, input_func=show_input, debug=False):
|
||||
self.master_public_key = None
|
||||
self.path = path
|
||||
self.connected = False
|
||||
self.device = None
|
||||
self.transport = transport
|
||||
self.debuglink = debuglink
|
||||
|
||||
self.message_func = message_func
|
||||
self.input_func = input_func
|
||||
self.debug = debug
|
||||
|
||||
def open(self):
|
||||
self.device = PipeTransport(self.path, False)
|
||||
|
||||
self.features = self.call(proto.Initialize())
|
||||
self.UUID = self.call(proto.GetUUID())
|
||||
|
||||
def close(self):
|
||||
if self.device:
|
||||
self.device.close()
|
||||
self.device = None
|
||||
|
||||
|
||||
def _pprint(self, msg):
|
||||
return "<%s>:\n%s" % (msg.__class__.__name__, msg)
|
||||
|
||||
def call(self, msg, tries=3):
|
||||
def call(self, msg, tries=1, button=None, pin_correct=True, otp_correct=True):
|
||||
if self.debug:
|
||||
print '----------------------'
|
||||
print "Sending", self._pprint(msg)
|
||||
|
||||
self.device.write(msg)
|
||||
resp = self.device.read()
|
||||
|
||||
self.transport.write(msg)
|
||||
|
||||
if self.debuglink and button != None:
|
||||
self.debuglink.press_button(button)
|
||||
|
||||
resp = self.transport.read()
|
||||
|
||||
if isinstance(resp, proto.OtpRequest):
|
||||
otp = self.input_func("OTP required: ", resp.message)
|
||||
self.device.write(proto.OtpAck(otp=otp))
|
||||
resp = self.device.read()
|
||||
if self.debuglink:
|
||||
otp = self.debuglink.read_otp()
|
||||
if otp_correct:
|
||||
self.transport.write(otp)
|
||||
else:
|
||||
self.transport.write(proto.OtpAck(otp='__42__'))
|
||||
else:
|
||||
otp = self.input_func("OTP required: ", resp.message)
|
||||
self.transport.write(proto.OtpAck(otp=otp))
|
||||
|
||||
resp = self.transport.read()
|
||||
|
||||
if isinstance(resp, proto.PinRequest):
|
||||
pin = self.input_func("PIN required: ", resp.message)
|
||||
self.device.write(proto.PinAck(pin=pin))
|
||||
resp = self.device.read()
|
||||
if self.debuglink:
|
||||
pin = self.debuglink.read_pin()
|
||||
if pin_correct:
|
||||
self.transport.write(pin)
|
||||
else:
|
||||
self.transport.write(proto.PinAck(pin='__42__'))
|
||||
else:
|
||||
pin = self.input_func("PIN required: ", resp.message)
|
||||
self.transport.write(proto.PinAck(pin=pin))
|
||||
|
||||
resp = self.transport.read()
|
||||
|
||||
if isinstance(resp, proto.Failure):
|
||||
self.message_func(resp.message)
|
||||
@ -70,7 +80,10 @@ class BitkeyClient(object):
|
||||
raise Exception("PIN is invalid, too many retries")
|
||||
self.message_func("PIN is invalid, let's try again...")
|
||||
|
||||
return self.call(msg, tries-1)
|
||||
return self.call(msg, tries-1,
|
||||
button=button,
|
||||
pin_correct=pin_correct,
|
||||
otp_correct=otp_correct)
|
||||
|
||||
if isinstance(resp, proto.Failure):
|
||||
raise Exception(resp.code, resp.message)
|
||||
|
Loading…
Reference in New Issue
Block a user