#!/usr/bin/env python import sys import os from google.protobuf.internal.enum_type_wrapper import EnumTypeWrapper def process_type(t, cls, msg_id, indexfile): print(" * type %s" % t) imports = ["import protobuf as p", "from micropython import const", ] out = ["", "class %s(p.MessageType):" % t, ] if cls.DESCRIPTOR.fields_by_name: out.append(" FIELDS = {") elif msg_id is None: out.append(" pass") for v in sorted(cls.DESCRIPTOR.fields_by_name.values(), key=lambda x: x.number): number = v.number fieldname = v.name type = None repeated = v.label == 3 required = v.label == 2 # print v.has_default_value, v.default_value if v.type in (4, 13, 14): # TYPE_UINT64 = 4 # TYPE_UINT32 = 13 # TYPE_ENUM = 14 type = 'p.UVarintType' elif v.type == 9: # TYPE_STRING = 9 type = 'p.UnicodeType' elif v.type == 8: # TYPE_BOOL = 8 type = 'p.BoolType' elif v.type == 12: # TYPE_BYTES = 12 type = 'p.BytesType' elif v.type == 11: # TYPE_MESSAGE = 1 type = v.message_type.name imports.append("from .%s import %s" % (v.message_type.name, v.message_type.name)) else: raise Exception("Unknown field type %s for field %s" % (v.type, fieldname)) if required: comment = ' # required' elif v.has_default_value: comment = ' # default=%s' % repr(v.default_value) else: comment = '' if repeated: flags = 'p.FLAG_REPEATED' else: flags = '0' out.append(" %d: ('%s', %s, %s),%s" % (number, fieldname, type, flags, comment)) # print fieldname, number, type, repeated, comment # print v.__dict__ # print v.CPPTYPE_STRING # print v.LABEL_REPEATED # print v.enum_type # v.has_default_value, v.default_value # v.label == 3 # repeated # print v.number if cls.DESCRIPTOR.fields_by_name: out.append(" }") if msg_id is not None: out.append(" MESSAGE_WIRE_TYPE = %d" % msg_id) if indexfile is not None: indexfile.write("%s = const(%d)\n" % (t, msg_id)) return imports + out def process_enum(t, cls): out = [] print(" * enum %s" % t) for k, v in cls.items(): # Remove type name from the beginning of the constant # For example "PinMatrixRequestType_Current" -> "Current" if k.startswith("%s_" % t): k = k.replace("%s_" % t, '') # If type ends with *Type, but constant use type name without *Type, remove it too :) # For example "ButtonRequestType & ButtonRequest_Other" => "Other" if t.endswith("Type") and k.startswith("%s_" % t.replace("Type", '')): k = k.replace("%s_" % t.replace("Type", ''), '') out.append("%s = const(%s)" % (k, v)) return out def find_msg_type(msg_types, t): for k, v in msg_types: msg_name = k.replace('MessageType_', '') if msg_name == t: return v def process_module(mod, genpath, indexfile): print("Processing module %s" % mod.__name__) types = dict([(name, cls) for name, cls in mod.__dict__.items() if isinstance(cls, type)]) msg_types = __import__('pb2', globals(), locals(), [ 'messages_pb2', ]).messages_pb2.MessageType.items() for t, cls in types.items(): # Find message type for given class msg_id = find_msg_type(msg_types, t) out = process_type(t, cls, msg_id, indexfile) write_to_file(genpath, t, out) enums = dict([(name, cls) for name, cls in mod.__dict__.items() if isinstance(cls, EnumTypeWrapper)]) for t, cls in enums.items(): out = process_enum(t, cls) write_to_file(genpath, t, out) def write_to_file(genpath, t, out): # Write generated sourcecode to given file f = open(os.path.join(genpath, "%s.py" % t), 'w') out = ["# Automatically generated by pb2py"] + out data = "\n".join(out) + "\n" f.write(data) f.close() if __name__ == '__main__': if len(sys.argv) < 2: print("Usage: ./pb2py modulename genpath indexfile") sys.exit() modulename = sys.argv[1] genpath = sys.argv[2] if len(sys.argv) > 2: indexfile = open(sys.argv[3], 'a') else: indexfile = None # Dynamically load module from argv[1] tmp = __import__('pb2', globals(), locals(), ['%s_pb2' % modulename]) mod = getattr(tmp, "%s_pb2" % modulename) process_module(mod, genpath, indexfile)