mirror of
https://github.com/trezor/trezor-firmware.git
synced 2024-12-16 11:28:14 +00:00
make msg functions more readable, add pbuf wrappers
This commit is contained in:
parent
362e835807
commit
8f49a104bf
@ -1,7 +1,6 @@
|
||||
import ustruct
|
||||
|
||||
from trezor import loop
|
||||
|
||||
from TrezorMsg import Msg
|
||||
|
||||
_msg = Msg()
|
||||
@ -20,60 +19,77 @@ REPORT_NUM = const(63)
|
||||
HEADER_MAGIC = const(35) # '#'
|
||||
|
||||
|
||||
def read_report():
|
||||
report = yield loop.Select(loop.HID_READ)
|
||||
assert report[0] == REPORT_NUM
|
||||
return report
|
||||
def read():
|
||||
_, rep = yield loop.Select(loop.HID_READ)
|
||||
assert rep[0] == REPORT_NUM
|
||||
return rep
|
||||
|
||||
|
||||
def write_report(report):
|
||||
return send(report) # FIXME
|
||||
def read_wire_msg():
|
||||
rep = yield from read()
|
||||
assert rep[1] == HEADER_MAGIC
|
||||
assert rep[2] == HEADER_MAGIC
|
||||
(mtype, mlen) = ustruct.unpack_from('>HL', rep, 3)
|
||||
|
||||
# TODO: validate mlen for sane values
|
||||
|
||||
def read_message():
|
||||
report = yield from read_report()
|
||||
assert report[1] == HEADER_MAGIC
|
||||
assert report[2] == HEADER_MAGIC
|
||||
(msgtype, msglen) = ustruct.unpack_from('>HL', report, 3)
|
||||
rep = memoryview(rep)
|
||||
data = rep[9:]
|
||||
data = data[:mlen]
|
||||
|
||||
# TODO: validate msglen for sane values
|
||||
|
||||
report = memoryview(report)
|
||||
data = report[9:]
|
||||
data = data[:msglen]
|
||||
|
||||
msgdata = bytearray(data) # TODO: allocate msglen bytes
|
||||
remaining = msglen - len(msgdata)
|
||||
mbuf = bytearray(data) # TODO: allocate mlen bytes
|
||||
remaining = mlen - len(mbuf)
|
||||
|
||||
while remaining > 0:
|
||||
report = yield from read_report()
|
||||
report = memoryview(report)
|
||||
data = report[1:]
|
||||
rep = yield from read()
|
||||
rep = memoryview(rep)
|
||||
data = rep[1:]
|
||||
data = data[:remaining]
|
||||
msgdata.extend(data)
|
||||
mbuf.extend(data)
|
||||
remaining -= len(data)
|
||||
|
||||
return (msgtype, msgdata)
|
||||
return (mtype, mbuf)
|
||||
|
||||
|
||||
def write_message(msgtype, msgdata):
|
||||
report = bytearray(REPORT_LEN)
|
||||
report[0] = REPORT_NUM
|
||||
report[1] = HEADER_MAGIC
|
||||
report[2] = HEADER_MAGIC
|
||||
ustruct.pack_into('>HL', report, 3, msgtype, len(msgdata))
|
||||
def write_wire_msg(mtype, mbuf):
|
||||
rep = bytearray(REPORT_LEN)
|
||||
rep[0] = REPORT_NUM
|
||||
rep[1] = HEADER_MAGIC
|
||||
rep[2] = HEADER_MAGIC
|
||||
ustruct.pack_into('>HL', rep, 3, mtype, len(mbuf))
|
||||
|
||||
msgdata = memoryview(msgdata)
|
||||
report = memoryview(report)
|
||||
data = report[9:]
|
||||
rep = memoryview(rep)
|
||||
mbuf = memoryview(mbuf)
|
||||
data = rep[9:]
|
||||
|
||||
while msgdata:
|
||||
n = min(len(data), len(msgdata))
|
||||
data[:n] = msgdata[:n]
|
||||
while mbuf:
|
||||
n = min(len(data), len(mbuf))
|
||||
data[:n] = mbuf[:n]
|
||||
i = n
|
||||
while i < len(data):
|
||||
data[i] = 0
|
||||
i += 1
|
||||
write_report(report)
|
||||
msgdata = msgdata[n:]
|
||||
data = report[1:]
|
||||
send(rep)
|
||||
mbuf = mbuf[n:]
|
||||
data = rep[1:]
|
||||
|
||||
|
||||
def read_msg(*types):
|
||||
mtype, mbuf = yield from read_wire_msg()
|
||||
for t in types:
|
||||
if t.wire_type == mtype:
|
||||
return t.loads(mbuf)
|
||||
else:
|
||||
raise Exception('Unexpected message')
|
||||
|
||||
|
||||
def write_msg(msg):
|
||||
mbuf = msg.dumps()
|
||||
mtype = msg.message_type.wire_type
|
||||
write_wire_msg(mtype, mbuf)
|
||||
|
||||
|
||||
def call(req, *types):
|
||||
write_msg(req)
|
||||
res = yield from read_msg(*types)
|
||||
return res
|
||||
|
Loading…
Reference in New Issue
Block a user