Major refactoring

Added commandline tool cmd.py
pull/25/head
slush 12 years ago
parent 37b75d1bc9
commit 8f84e356ad

@ -1,5 +1,6 @@
import os
import bitkey_pb2 as proto
import random
def show_message(message):
print "MESSAGE FROM DEVICE:", message
@ -11,22 +12,44 @@ def show_input(input_text, message=None):
class BitkeyClient(object):
def __init__(self, transport, debuglink=None, message_func=show_message, input_func=show_input, debug=False):
self.master_public_key = None
self.transport = transport
def __init__(self, transport, debuglink=None,
algo=proto.BIP32,
message_func=show_message, input_func=show_input, debug=False):
self.transport = transport
self.debuglink = debuglink
self.algo = algo
self.message_func = message_func
self.input_func = input_func
self.debug = debug
self.features = self.call(proto.Initialize())
self.setup_debuglink()
self.init_device()
def init_device(self):
self.master_public_key = None
self.session_id = ''.join([ chr(random.randrange(0, 255, 1)) for _ in xrange(0, 16) ])
self.features = self.call(proto.Initialize(session_id=self.session_id))
self.UUID = self.call(proto.GetUUID())
def get_master_public_key(self):
if self.master_public_key:
return self.master_public_key
self.master_public_key = self.call(proto.GetMasterPublicKey(algo=self.algo))
return self.master_public_key
def get_entropy(self, size):
return self.call(proto.GetEntropy(size=size)).entropy
def _pprint(self, msg):
return "<%s>:\n%s" % (msg.__class__.__name__, msg)
def call(self, msg, tries=1, button=None, pin_correct=True, otp_correct=True):
def setup_debuglink(self, pin_correct=False, otp_correct=False):
self.debug_pin = pin_correct
self.debug_otp = otp_correct
def call(self, msg, button=None, tries=1):
if self.debug:
print '----------------------'
print "Sending", self._pprint(msg)
@ -41,7 +64,7 @@ class BitkeyClient(object):
if isinstance(resp, proto.OtpRequest):
if self.debuglink:
otp = self.debuglink.read_otp()
if otp_correct:
if self.debug_otp:
self.transport.write(otp)
else:
self.transport.write(proto.OtpAck(otp='__42__'))
@ -54,7 +77,7 @@ class BitkeyClient(object):
if isinstance(resp, proto.PinRequest):
if self.debuglink:
pin = self.debuglink.read_pin()
if pin_correct:
if self.debug_pin:
self.transport.write(pin)
else:
self.transport.write(proto.PinAck(pin='__42__'))
@ -80,11 +103,7 @@ class BitkeyClient(object):
raise Exception("PIN is invalid, too many retries")
self.message_func("PIN is invalid, let's try again...")
return self.call(msg, tries-1,
button=button,
pin_correct=pin_correct,
otp_correct=otp_correct)
return self.call(msg, button, tries-1)
if isinstance(resp, proto.Failure):
raise Exception(resp.code, resp.message)
@ -93,39 +112,28 @@ class BitkeyClient(object):
return resp
def sign_tx(self, algo, inputs, outputs, fee):
def get_uuid(self):
return self.call(proto.GetUUID())
def sign_tx(self, inputs, outputs):
'''
inputs: list of TxInput
outputs: list of TxOutput
'''
tx = proto.SignTx()
tx.algo = algo # Choose BIP32 or ELECTRUM way for deterministic keys
tx.algo = self.algo # Choose BIP32 or ELECTRUM way for deterministic keys
tx.random = os.urandom(256) # Provide additional entropy to the device
for addr, amount in outputs:
if addr in self.addresses:
addr_n = self.addresses.index(addr)
else:
addr_n = None
tx.outputs.extend(outputs)
fee -= amount
output = tx.outputs.add()
output.address=addr
output.address_n.append(addr_n)
output.amount=amount
output.script_type=proto.PAYTOADDRESS
print "FEE", fee
#print inputs2, outputs2
tx.fee = fee
print "PBDATA", tx.SerializeToString().encode('hex')
return self.call(tx)
#print "PBDATA", tx.SerializeToString().encode('hex')
#################
#################
#################
'''
signatures = [('add550d6ba9ab7e01d37e17658f98b6e901208d241f24b08197b5e20dfa7f29f095ae01acbfa5c4281704a64053dcb80e9b089ecbe09f5871d67725803e36edd', '3045022100dced96eeb43836bc95676879eac303eabf39802e513f4379a517475c259da12502201fd36c90ecd91a32b2ca8fed2e1755a7f2a89c2d520eb0da10147802bc7ca217')]
s_inputs = []
@ -136,7 +144,7 @@ class BitkeyClient(object):
s_inputs.append((addr, v, p_hash, p_pos, p_scriptPubKey, pubkey, sig))
return s_inputs
'''
s_inputs = []
for i in range(len(inputs)):
addr, v, p_hash, p_pos, p_scriptPubKey, _, _ = inputs[i]
@ -149,3 +157,7 @@ class BitkeyClient(object):
s_inputs.append( (addr, v, p_hash, p_pos, p_scriptPubKey, pubkey, sig) )
return s_inputs
'''
def load_device(self, seed, otp, pin, spv, button=None):
self.call(proto.LoadDevice(seed=seed, otp=otp, pin=pin, spv=spv), button=button)
self.init_device()

@ -0,0 +1,44 @@
import bitkey_pb2 as proto
def otp_info(otp):
print "Device asks for OTP %s" % otp
def pin_info(pin):
print "Device asks for PIN %s" % pin
def button_press(yes_no):
print "User pressed", '"y"' if yes_no else '"n"'
class DebugLink(object):
def __init__(self, transport, otp_func=otp_info, pin_func=pin_info, button_func=button_press):
self.transport = transport
self.otp_func = otp_func
self.pin_func = pin_func
self.button_func = button_func
def read_otp(self):
obj = self.transport.read()
if not isinstance(obj, proto.OtpAck):
raise Exception("Expected OtpAck object, got %s" % obj)
self.otp_func(obj)
return obj
def read_pin(self):
obj = self.transport.read()
if not isinstance(obj, proto.PinAck):
raise Exception("Expected PinAck object, got %s" % obj)
self.pin_func(obj)
return obj
def press_button(self, yes_no):
self.button_func(yes_no)
self.transport.write(proto.DebugLinkDecision(yes_no=yes_no))
#obj = self.transport.read()
#if not isinstance(obj, proto.Success):
# raise Exception("Expected Success object, got %s" % obj)
def press_yes(self):
self.press_button(True)
def press_no(self):
self.press_button(False)

@ -17,17 +17,19 @@ map_type_to_class = {
13: proto.LoadDevice,
14: proto.ResetDevice,
15: proto.SignTx,
16: proto.SignedTx,
# 16: proto.SignedTx,
17: proto.Features,
18: proto.PinRequest,
19: proto.PinAck,
20: proto.PinCancel,
21: proto.SignInput,
22: proto.SignedInput,
21: proto.InputRequest,
22: proto.OutputRequest,
23: proto.TxInput,
24: proto.TxOutput,
25: proto.SetMaxFeeKb,
100: proto.DebugLinkDecision,
101: proto.DebugLinkGetState,
102: proto.DebugLinkState,
}
map_class_to_type = {}

@ -0,0 +1,24 @@
'''FakeTransport implements dummy interface for Transport.'''
# Local serial port loopback: socat PTY,link=COM8 PTY,link=COM9
from transport import Transport
class FakeTransport(Transport):
def __init__(self, device, *args, **kwargs):
super(FakeTransport, self).__init__(device, *args, **kwargs)
def _open(self):
pass
def _close(self):
pass
def ready_to_read(self):
return False
def _write(self, msg):
pass
def _read(self):
raise NotImplemented

108
cmd.py

@ -0,0 +1,108 @@
#!/usr/bin/python
import binascii
import argparse
import json
import bitkeylib.bitkey_pb2 as proto
from bitkeylib.client import BitkeyClient
def parse_args(commands):
parser = argparse.ArgumentParser(description='Commandline tool for Bitkey devices.')
parser.add_argument('-a', '--algorithm', dest='algorithm', choices=['bip32', 'electrum'], default='bip32', help='Key derivation algorithm')
parser.add_argument('-t', '--transport', dest='transport', choices=['usb', 'serial', 'pipe', 'socket'], default='serial', help="Transport used for talking with the device")
parser.add_argument('-p', '--path', dest='path', default='/dev/ttyAMA0', help="Path used by the transport (usually serial port)")
parser.add_argument('-dt', '--debuglink-transport', dest='debuglink_transport', choices=['usb', 'serial', 'pipe', 'socket'], default='socket', help="Debuglink transport")
parser.add_argument('-dp', '--debuglink-path', dest='debuglink_path', default='0.0.0.0:8001', help="Path used by the transport (usually serial port)")
parser.add_argument('-j', '--json', dest='json', action='store_true', help="Prints result as json object")
# parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='Enable low-level debugging messages')
cmdparser = parser.add_subparsers(title='Available commands')
for cmd in commands._list_commands():
func = object.__getattribute__(commands, cmd)
try:
help = func.help
except AttributeError:
help = ''
try:
arguments = func.arguments
except AttributeError:
arguments = ((('params',), {'nargs': '*'}),)
item = cmdparser.add_parser(cmd, help=func.help)
for arg in arguments:
item.add_argument(*arg[0], **arg[1])
item.set_defaults(func=func)
return parser.parse_args()
def get_transport(transport_string, path):
if transport_string == 'usb':
raise NotImplemented("USB HID transport not implemented yet")
if transport_string == 'serial':
from bitkeylib.transport_serial import SerialTransport
return SerialTransport(path)
if transport_string == 'pipe':
from bitkeylib.transport_pipe import PipeTransport
return PipeTransport(path, is_device=False)
if transport_string == 'socket':
from bitkeylib.transport_socket import SocketTransport
return SocketTransport(path, listen=False)
if transport_string == 'fake':
from bitkeylib.transport_fake import FakeTransport
return FakeTransport(path)
raise NotImplemented("Unknown transport")
class Commands(object):
def __init__(self, client):
self.client = client
@classmethod
def _list_commands(cls):
return [ x for x in dir(cls) if not x.startswith('_') ]
def get_master_public_key(self, args):
return 'ahoj'
def get_entropy(self, args):
return binascii.hexlify(self.client.get_entropy(args.size))
get_entropy.help = 'Get example entropy'
get_master_public_key.help = 'Get master public key'
get_entropy.arguments = (
(('size',), {'type': int}),
)
def main():
args = parse_args(Commands)
transport = get_transport(args.transport, args.path)
debuglink_transport = get_transport(args.debuglink_transport, args.debuglink_path)
if args.algorithm == 'electrum':
algo = proto.ELECTRUM
elif args.algorithm == 'bip32':
algo = proto.BIP32
else:
raise Exception("Unknown algorithm")
client = BitkeyClient(transport, debuglink_transport, algo=algo)
cmds = Commands(client)
res = args.func(cmds, args)
if args.json:
print json.dumps(res)
else:
print res
if __name__ == '__main__':
main()

@ -1,159 +1,235 @@
/*
This file describes Protocol buffers messages for bitcoin hardware wallet devices.
Author: slush <info@bitcoin.cz>
*/
// Specifies algorithm used for generating private/public keys from the seed.
enum Algorithm {
BIP32 = 0;
ELECTRUM = 1;
}
// Specifies which script will be used for given transaction output.
enum ScriptType {
PAYTOADDRESS = 0;
PAYTOSCRIPTHASH = 1;
}
// Response: None or Features
// Reset device's internal state
//
// Response: Features
message Initialize {
required bytes session_id = 1; // Any value identifying current connection, will be echoed back in Features message
}
// Response object for Initialize. Contains list of available features on the device.
message Features {
optional string version = 1;
optional bool otp = 2;
optional bool pin = 3;
optional bool spv = 4;
optional uint64 maxfee_kb = 5;
repeated Algorithm algo = 6;
optional bool debug_link = 7;
}
// Description: Test if another side is still alive.
required bytes session_id = 1; // Echoed back from Initialize message
optional string vendor = 2; // Name of the manufacturer, e.g. "bitkey"
optional uint32 major_version = 3; // Major version of the device, e.g. 1
optional uint32 minor_version = 4; // Minor version of the device, e.g. 0
optional bool otp = 5; // True when device will send OtpRequest on important action
optional bool pin = 6; // True when device will send PinRequest on important action
optional bool spv = 7; // True when device requires SPV verification of transaction inputs
optional uint64 maxfee_kb = 8; // Maximum accepted fee per kilobyte of signed transaction
repeated Algorithm algo = 9; // List of key generation algorithms supported by the device
optional bool debug_link = 10; // Indicates support for DebugLink connection
}
// Test if device is live, device will send back the message on success
//
// Response: None or Success
message Ping {
optional string message = 1;
optional string message = 1; // Message will be sent back in Success message
}
// Description: Response message for previous request with given id.
message Success {
optional string message = 1;
// Virtually "press" the button on the device.
// Message is available only on debugging connection and device must support "debug_link" feature.
//
// Response: Success
message DebugLinkDecision {
required bool yes_no = 1; // True for "confirm", False for "cancel"
}
// Description: Response message for previous request with given id.
message Failure {
optional int32 code = 1;
optional string message = 2;
// When sent over debug link connection, computer asks for some internal information of the device.
//
// Response: DebugLinkState
message DebugLinkGetState {
optional bool layout = 1; // Request raw buffer of display
optional bool otp = 2; // Request current OTP
optional bool pin = 3; // Request current PIN
optional bool seed = 4; // Request current seed
// optional bool state = 5;
}
message DebugLinkDecision {
required bool yes_no = 1;
// Response object reflecting device's current state. It can be received only over debug link connection.
message DebugLinkState {
optional bytes layout = 1; // Raw buffer of display
optional OtpAck otp = 2; // Current OTP, blank if device is not waiting to OTP
optional PinAck pin = 3; // Current PIN, blank if PIN is not set/enabled
optional string seed = 4; // Current seed (in mnemonic format)
// optional string state = 5;
}
// Response object defining success of the previous request
message Success {
optional string message = 1; // May contain human readable description of the action or request-specific payload
}
// Response object defining failure of the previous request
message Failure {
optional int32 code = 1; // May contain computer-readable definition of the error state
optional string message = 2; // May contain human-readable message of the error state
}
// Response: UUID or Failure
// Ask device for unique identifier.
//
// Response: UUID
message GetUUID {
}
// Identifier of the device. This identifier must be composed from CPU serial number
// or other persistent source and must be the same for consecutive requests.
message UUID {
required bytes UUID = 1;
}
// Message can be sent by the *device* as a response to any request.
// Message asks computer to send back OtpAck with the password printed on the device's display.
//
// Response: OtpAck, OtpCancel
message OtpRequest {
optional string message = 1;
optional string message = 1; // Human readable message
}
// Message is sent by the computer as a response to OtpRequest previously sent by the device.
message OtpAck {
required string otp = 1;
required string otp = 1; // User must be asked for the otp, which is displayed on the device's display
}
// Message is sent as a response to OtpRequest by the computer, asking the device to cancel
// pending action and reset to the default state.
message OtpCancel {
}
// Message can be sent by the *device* as a response to any request.
// Message asks computer to send back PinAck with the password associated with the device.
//
// Response: PinAck, PinCancel
message PinRequest {
optional string message = 1;
optional string message = 1; // Human readable message
}
// Message is sent by the computer as a response to PinRequest previously sent by the device.
message PinAck {
required string pin = 1;
required string pin = 1; // User must write down the password for accessing the device.
}
// Message is sent as a response to PinRequest by the computer, asking the device to cancel
// pending action and reset to the default state.
message PinCancel {
}
// Response: OtpRequest, Entropy, Failure
// Request a sample of random data generated by hardware RNG. May be used
// for tests of internal RNG.
//
// Response: OtpRequest, PinRequest, Entropy, Failure
message GetEntropy {
required uint32 size = 1;
required uint32 size = 1; // Size of randomly generated buffer
}
// Response to GetEntropy request contains random data generated by internal HRNG.
message Entropy {
required bytes entropy = 1;
required bytes entropy = 1; // Stream of generated bytes
}
// Set maximum allowed fee per kB of transaction. This is used by internal sanity checking
// in SignTx method. Transaction won't be signed if requested transaction fees are above
// current value.
//
// Response: Success, OtpRequest, PinRequest, Failure
message SetMaxFeeKb {
required uint64 maxfee_kb= 1;
required uint64 maxfee_kb= 1; // Maximum allowed transaction fee in satoshis per kB
}
// Ask device for it's current master public key. This may be used for generating
// public keys on the computer independently to the device. API doesn't provide
// any other way how to get bitcoin addresses from the device.
//
// Response: MasterPublicKey, Failure
message GetMasterPublicKey {
required Algorithm algo = 1 [default=BIP32];
required Algorithm algo = 1 [default=BIP32]; // Used algorithm for generating master public key
}
// Contains master public key derived from device's seed.
message MasterPublicKey {
required bytes key = 1;
required bytes key = 1; // master public key of requested algorithm in binary format
}
// Response: Success, OtpRequest, Failure
// Load seed and related internal settings from computer to the device. Existing seed is overwritten.
//
// Response: Success, OtpRequest, PinRequest, Failure
message LoadDevice {
required string seed = 1;
optional bool otp = 2 [default=true];
optional string pin = 3;
optional bool spv = 4 [default=true];
required string seed = 1; // Seed encoded as a mnemonic (12 english words)
optional bool otp = 2 [default=true]; // Enable OTP for important actions?
optional string pin = 3; // Set PIN protection for important actions
optional bool spv = 4 [default=true]; // Enable SPV verification for transaction inputs (if available on device)
}
// Request device to do full-reset, to generate new seed
// and ask user for new settings (OTP, PIN, SPV).
//
// Response: Success, OtpRequest, PinRequest, Failure
message ResetDevice {
optional bytes random = 7; // Provide additional entropy for seed generation function.
// Recommended to provide 256 bytes of random data.
}
message TxOutput {
required string address = 1;
repeated uint32 address_n = 2;
required uint64 amount = 3;
required ScriptType script_type = 4;
repeated bytes script_args = 5;
}
// Response: Success, SignedInput, Failure
message TxInput {
repeated uint32 address_n = 1;
required uint64 amount = 2;
required bytes prev_hash = 3;
required uint32 prev_index = 4;
optional bytes script_sig = 5;
}
// Response: SignedTx, Success, OtpRequest, PinRequest, Failure
// Request the device to sign the transaction
//
// Response: InputRequest, OutputRequest, OtpRequest, PinRequest, Failure
message SignTx {
required Algorithm algo = 1 [default=BIP32];
optional bool stream = 2; // enable streaming
required uint64 fee = 3;
repeated TxOutput outputs = 4;
repeated TxInput inputs = 5;
optional uint32 inputs_count = 6; // for streaming
optional bytes random = 7;
required Algorithm algo = 1 [default=BIP32]; // Algorithm using for key generation algorithm
repeated uint32 outputs_count = 3; // Count of outputs of the transaction
repeated uint32 inputs_count = 5; // Count of inputs of the transaction
optional bytes random = 6; // Provide additional entropy for signing function.
// Recommended to provide 256 bytes of random data.
}
message SignedTx {
repeated bytes signature = 1;
// Sent by the device as a response for SignTx.
// If request_index is set, device asks for TxInput message with details of index's input.
// If signed_index is set, 'signature' contains signed input of signed_index's input.
message InputRequest {
optional uint32 request_index = 1; // If presented, device expects TxInput message from the computer
optional uint32 signed_index = 2; // If presented, 'signature' contains signed input of this input
optional bytes signature = 3; // If presented, represent signature of the signed_index input
}
/*
inputs = [] # list of TxInput
for i in inputs:
for x in inputs:
send(x)
signature = send(SignInput(i))
*/
// Sent by the device as a response for SignTx or TxInput.
// Device asks for Tx
message OutputRequest {
required uint32 request_index = 1; // Device expects TxOutput message from the computer
}
// Response: SignedInput, Failure
message SignInput {
required TxInput input = 1;
// Transaction onput for SignTx workflow. It is response to InputRequest message sent by device.
//
// Response: InputRequest, OutputRequest, Failure
message TxInput {
required uint32 index = 1; // Position of input in proposed transaction
repeated uint32 address_n = 2; // Parameter for address generation algorithm to derive the address from the master public key
required uint64 amount = 3; // Amount to spend in satoshis. The rest will be used for transaction fees
required bytes prev_hash = 4; // Hash of previous transaction spent by this input
required uint32 prev_index = 5; // Index of previous spent output
optional bytes script_sig = 6; // Script signature
}
message SignedInput {
required bytes signature = 1;
// Transaction output for SignTx workflow. It is response to OutputRequest message sent by the device.
message TxOutput {
required uint32 index = 1; // Position of output in proposed transaction
required string address = 2; // Target bitcoin address in base58 encoding
repeated uint32 address_n = 3; // Has higher priority than "address". If the output is to myself, specify parameter for address generation algorithm.
required uint64 amount = 4; // Amount to send in satoshis
required ScriptType script_type = 5;// Select output script type
repeated bytes script_args = 6; // Provide additional parameters for the script (its script-depended)
}

@ -4,7 +4,7 @@ sys.path = ['../',] + sys.path
from bitkeylib.transport_pipe import PipeTransport
TRANSPORT = PipeTransport
TRANSPORT_ARGS = ('../../bitkey-python/device.socket', False)
TRANSPORT_ARGS = ('../../bitkey-python/pipe', False)
DEBUG_TRANSPORT = PipeTransport
DEBUG_TRANSPORT_ARGS = ('../../bitkey-python/device.socket.debug', False)
DEBUG_TRANSPORT_ARGS = ('../../bitkey-python/pipe.debug', False)

@ -2,18 +2,55 @@ import unittest
import config
from bitkeylib.client import BitkeyClient
from bitkeylib.debuglink import DebugLink
from bitkeylib import proto
'''
TODO:
* Features reflects all variations of LoadDevice
* Maxfee settings
* Client requires OTP
* Client requires PIN
'''
class TestBasic(unittest.TestCase):
def setUp(self):
self.debuglink = config.DEBUG_TRANSPORT(*config.DEBUG_TRANSPORT_ARGS)
self.debug_transport = config.DEBUG_TRANSPORT(*config.DEBUG_TRANSPORT_ARGS)
self.transport = config.TRANSPORT(*config.TRANSPORT_ARGS)
self.bitkey = BitkeyClient(self.transport, self.debuglink)
self.bitkey = BitkeyClient(self.transport, DebugLink(self.debug_transport), algo=proto.ELECTRUM)
self.bitkey.setup_debuglink(pin_correct=True, otp_correct=True)
self.bitkey.load_device(seed='beyond neighbor scratch swirl embarrass doll cause also stick softly physical nice',
otp=True, pin='1234', spv=True, button=True)
print "Setup finished"
print "--------------"
def tearDown(self):
self.debuglink.close()
self.debug_transport.close()
self.transport.close()
def test_basic(self):
self.assertEqual(self.bitkey.call(proto.Ping(message='ahoj!')), proto.Success(message='ahoj!'))
def test_features(self):
features = self.bitkey.call(proto.Initialize(session_id=self.bitkey.session_id))
# Result is the same as reported by BitkeyClient class
self.assertEqual(features, self.bitkey.features)
def test_ping(self):
ping = self.bitkey.call(proto.Ping(message='ahoj!'))
# Ping results in Success(message='Ahoj!')
self.assertEqual(ping, proto.Success(message='ahoj!'))
def test_uuid(self):
uuid1 = self.bitkey.get_uuid()
uuid2 = self.bitkey.get_uuid()
# UUID must be longer than 10 characters
self.assertGreater(len(uuid1.UUID), 10)
# Every resulf of UUID must be the same
self.assertEqual(uuid1, uuid2)

@ -0,0 +1,28 @@
import unittest
import config
from bitkeylib.client import BitkeyClient
from bitkeylib.debuglink import DebugLink
from bitkeylib import proto
class TestSignTx(unittest.TestCase):
def setUp(self):
self.debug_transport = config.DEBUG_TRANSPORT(*config.DEBUG_TRANSPORT_ARGS)
self.transport = config.TRANSPORT(*config.TRANSPORT_ARGS)
self.bitkey = BitkeyClient(self.transport, DebugLink(self.debug_transport), algo=proto.ELECTRUM)
self.bitkey.setup_debuglink(pin_correct=True, otp_correct=True)
self.bitkey.load_device(seed='beyond neighbor scratch swirl embarrass doll cause also stick softly physical nice',
otp=True, pin='1234', spv=True, button=True)
print "Setup finished"
print "--------------"
def tearDown(self):
self.debug_transport.close()
self.transport.close()
def test_signtx(self):
print self.bitkey.sign_tx([], [])
Loading…
Cancel
Save