#!/usr/bin/env python3 # Converts Google's protobuf python definitions of TREZOR wire messages # to plain-python objects as used in TREZOR Core and python-trezor import sys import os import argparse from google.protobuf.internal.enum_type_wrapper import EnumTypeWrapper def process_type(t, cls, msg_id, indexfile, is_upy): print(" * type %s" % t) imports = [] out = ["", "", "class %s(p.MessageType):" % t, ] if cls.DESCRIPTOR.fields_by_name: out.append(" FIELDS = {") elif msg_id is None: out.append(" pass") for v in sorted(cls.DESCRIPTOR.fields_by_name.values(), key=lambda x: x.number): number = v.number fieldname = v.name type = None repeated = v.label == 3 required = v.label == 2 # print v.has_default_value, v.default_value if v.type in (4, 13, 14): # TYPE_UINT64 = 4 # TYPE_UINT32 = 13 # TYPE_ENUM = 14 type = 'p.UVarintType' elif v.type in (17,): # TYPE_SINT32 = 17 type = 'p.Sint32Type' elif v.type in (18,): # TYPE_SINT64 = 18 type = 'p.Sint64Type' elif v.type == 9: # TYPE_STRING = 9 type = 'p.UnicodeType' elif v.type == 8: # TYPE_BOOL = 8 type = 'p.BoolType' elif v.type == 12: # TYPE_BYTES = 12 type = 'p.BytesType' elif v.type == 11: # TYPE_MESSAGE = 1 type = v.message_type.name imports.append("from .%s import %s" % (v.message_type.name, v.message_type.name)) else: raise Exception("Unknown field type %s for field %s" % (v.type, fieldname)) if required: comment = ' # required' elif v.has_default_value: comment = ' # default=%s' % repr(v.default_value) else: comment = '' if repeated: flags = 'p.FLAG_REPEATED' else: flags = '0' out.append(" %d: ('%s', %s, %s),%s" % (number, fieldname, type, flags, comment)) # print fieldname, number, type, repeated, comment # print v.__dict__ # print v.CPPTYPE_STRING # print v.LABEL_REPEATED # print v.enum_type # v.has_default_value, v.default_value # v.label == 3 # repeated # print v.number if cls.DESCRIPTOR.fields_by_name: out.append(" }") if msg_id is not None: out.append(" MESSAGE_WIRE_TYPE = %d" % msg_id) if indexfile is not None: if is_upy: indexfile.write("%s = const(%d)\n" % (t, msg_id)) else: indexfile.write("%s = %d\n" % (t, msg_id)) # Remove duplicate imports imports = sorted(list(set(imports))) if is_upy: imports = ['import protobuf as p'] + imports else: imports = ['from __future__ import absolute_import', 'from .. import protobuf as p'] + imports return imports + out def process_enum(t, cls, is_upy): out = [] if is_upy: out += ("from micropython import const", "") print(" * enum %s" % t) for k, v in cls.items(): # Remove type name from the beginning of the constant # For example "PinMatrixRequestType_Current" -> "Current" if k.startswith("%s_" % t): k = k.replace("%s_" % t, '') # If type ends with *Type, but constant use type name without *Type, remove it too :) # For example "ButtonRequestType & ButtonRequest_Other" => "Other" if t.endswith("Type") and k.startswith("%s_" % t.replace("Type", '')): k = k.replace("%s_" % t.replace("Type", ''), '') if is_upy: out.append("%s = const(%s)" % (k, v)) else: out.append("%s = %s" % (k, v)) return out def find_msg_type(msg_types, t): for k, v in msg_types: msg_name = k.replace('MessageType_', '') if msg_name == t: return v def process_module(mod, genpath, indexfile, modlist, is_upy): print("Processing module %s" % mod.__name__) types = dict([(name, cls) for name, cls in mod.__dict__.items() if isinstance(cls, type)]) msg_types = __import__('pb2', globals(), locals(), [ 'messages_pb2', ]).messages_pb2.MessageType.items() for t, cls in sorted(types.items()): # Find message type for given class msg_id = find_msg_type(msg_types, t) out = process_type(t, cls, msg_id, indexfile, is_upy) write_to_file(genpath, t, out) if modlist: modlist.write("from .%s import *\n" % t) enums = dict([(name, cls) for name, cls in mod.__dict__.items() if isinstance(cls, EnumTypeWrapper)]) for t, cls in enums.items(): out = process_enum(t, cls, is_upy) write_to_file(genpath, t, out) if modlist: modlist.write("from . import %s\n" % t) def write_to_file(genpath, t, out): # Write generated sourcecode to given file f = open(os.path.join(genpath, "%s.py" % t), 'w') out = ["# Automatically generated by pb2py"] + out data = "\n".join(out) + "\n" f.write(data) f.close() if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('modulename', type=str, help="Name of module to generate") parser.add_argument('genpath', type=str, help="Directory for generated source code") parser.add_argument('-i', '--indexfile', type=str, help="[optional] Generate index file of wire types") parser.add_argument('-l', '--modlist', type=str, help="[optional] Generate list of modules") parser.add_argument('-p', '--protopath', type=str, help="[optional] Path to search for pregenerated Google's python sources") parser.add_argument('-m', '--micropython', action='store_true', help="Use micropython-favoured source code") args = parser.parse_args() if args.indexfile: indexfile = open(args.indexfile, 'a') else: indexfile = None if args.modlist: modlist = open(args.modlist, 'a') else: modlist = None if args.protopath: sys.path.append(args.protopath) # Dynamically load module from argv[1] tmp = __import__('pb2', globals(), locals(), ['%s_pb2' % args.modulename]) mod = getattr(tmp, "%s_pb2" % args.modulename) process_module(mod, args.genpath, indexfile, modlist, args.micropython)