mirror of
https://github.com/trezor/trezor-firmware.git
synced 2024-11-25 08:58:14 +00:00
266955ba65
This also includes the capability to build Rust protobuf blobs.
802 lines
26 KiB
Python
Executable File
802 lines
26 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# Converts Google's protobuf python definitions of Trezor wire messages
|
|
# to plain-python objects as used in Trezor Core and python-trezor
|
|
|
|
import itertools
|
|
import logging
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
from typing import List, Optional
|
|
|
|
import attr
|
|
import click
|
|
import construct as c
|
|
import mako
|
|
import mako.template
|
|
|
|
from google.protobuf import descriptor_pb2
|
|
|
|
FieldDescriptor = descriptor_pb2.FieldDescriptorProto
|
|
|
|
FIELD_TYPES_PYTHON = {
|
|
FieldDescriptor.TYPE_UINT64: "int",
|
|
FieldDescriptor.TYPE_UINT32: "int",
|
|
FieldDescriptor.TYPE_SINT64: "int",
|
|
FieldDescriptor.TYPE_SINT32: "int",
|
|
FieldDescriptor.TYPE_BOOL: "bool",
|
|
FieldDescriptor.TYPE_BYTES: "bytes",
|
|
FieldDescriptor.TYPE_STRING: "str",
|
|
}
|
|
|
|
TYPE_NAMES = {
|
|
FieldDescriptor.TYPE_UINT64: "uint64",
|
|
FieldDescriptor.TYPE_UINT32: "uint32",
|
|
FieldDescriptor.TYPE_SINT64: "sint64",
|
|
FieldDescriptor.TYPE_SINT32: "sint32",
|
|
FieldDescriptor.TYPE_BOOL: "bool",
|
|
FieldDescriptor.TYPE_BYTES: "bytes",
|
|
FieldDescriptor.TYPE_STRING: "string",
|
|
}
|
|
|
|
FIELD_TYPES_RUST_BLOB = {
|
|
FieldDescriptor.TYPE_UINT64: 0,
|
|
FieldDescriptor.TYPE_UINT32: 0,
|
|
FieldDescriptor.TYPE_SINT64: 1,
|
|
FieldDescriptor.TYPE_SINT32: 1,
|
|
FieldDescriptor.TYPE_BOOL: 2,
|
|
FieldDescriptor.TYPE_BYTES: 3,
|
|
FieldDescriptor.TYPE_STRING: 4,
|
|
FieldDescriptor.TYPE_ENUM: 5,
|
|
FieldDescriptor.TYPE_MESSAGE: 6,
|
|
}
|
|
|
|
INT_TYPES = (
|
|
FieldDescriptor.TYPE_UINT64,
|
|
FieldDescriptor.TYPE_UINT32,
|
|
FieldDescriptor.TYPE_SINT64,
|
|
FieldDescriptor.TYPE_SINT32,
|
|
)
|
|
|
|
MESSAGE_TYPE_ENUM = "MessageType"
|
|
|
|
ListOfSimpleValues = c.GreedyRange(
|
|
c.Struct(
|
|
"key" / c.VarInt,
|
|
"value" / c.VarInt,
|
|
)
|
|
)
|
|
|
|
|
|
def parse_protobuf_simple(data):
|
|
"""Micro-parse protobuf-encoded data.
|
|
|
|
Assume every field is of type 0 (varint), and parse to a dict of fieldnum: value.
|
|
"""
|
|
return {v.key >> 3: v.value for v in ListOfSimpleValues.parse(data)}
|
|
|
|
|
|
PROTOC = shutil.which("protoc")
|
|
if not PROTOC:
|
|
print("protoc command not found")
|
|
sys.exit(1)
|
|
|
|
PROTOC_PREFIX = Path(PROTOC).resolve().parent.parent
|
|
|
|
|
|
ENUM_ENTRY = c.PrefixedArray(c.Byte, c.Int16ul)
|
|
|
|
FIELD_STRUCT = c.Struct(
|
|
"tag" / c.Byte,
|
|
"flags_and_type"
|
|
/ c.BitStruct(
|
|
"is_required" / c.Flag,
|
|
"is_repeated" / c.Flag,
|
|
"is_experimental" / c.Flag,
|
|
c.Padding(1),
|
|
"type" / c.BitsInteger(4),
|
|
),
|
|
"enum_or_msg_offset" / c.Int16ul,
|
|
"name" / c.Int16ul,
|
|
)
|
|
|
|
MSG_ENTRY = c.Struct(
|
|
"fields_count" / c.Rebuild(c.Byte, c.len_(c.this.fields)),
|
|
"defaults_size" / c.Rebuild(c.Byte, c.len_(c.this.defaults)),
|
|
# highest bit = is_experimental
|
|
# the rest = wire_id, 0x7FFF iff unset
|
|
"flags_and_wire_type" / c.Int16ul,
|
|
"fields" / c.Array(c.this.fields_count, FIELD_STRUCT),
|
|
"defaults" / c.Bytes(c.this.defaults_size),
|
|
)
|
|
|
|
DEFAULT_VARINT_ENTRY = c.Sequence(c.Byte, c.VarInt)
|
|
DEFAULT_LENGTH_ENTRY = c.Sequence(c.Byte, c.Prefixed(c.VarInt, c.GreedyRange(c.Byte)))
|
|
|
|
NAME_ENTRY = c.Sequence(
|
|
"msg_name" / c.Int16ul,
|
|
"msg_offset" / c.Int16ul,
|
|
)
|
|
|
|
WIRETYPE_ENTRY = c.Sequence(
|
|
"wire_id" / c.Int16ul,
|
|
"msg_offset" / c.Int16ul,
|
|
)
|
|
|
|
# QDEF(MP_QSTR_copysign, (const byte*)"\x33\x14\x08" "copysign")
|
|
QDEF_RE = re.compile(
|
|
r'^QDEF\(MP_QSTR(\S+), \(const byte\*\)"(\\x..\\x..\\x..)" "(.*)"\)$'
|
|
)
|
|
|
|
|
|
@attr.s(auto_attribs=True)
|
|
class ProtoField:
|
|
name: str
|
|
number: int
|
|
type: object
|
|
extensions: dict
|
|
orig: object
|
|
# name of type without package path
|
|
type_name: str
|
|
|
|
descriptor: "Descriptor"
|
|
|
|
@property
|
|
def repeated(self):
|
|
return self.orig.label == FieldDescriptor.LABEL_REPEATED
|
|
|
|
@property
|
|
def required(self):
|
|
return self.orig.label == FieldDescriptor.LABEL_REQUIRED
|
|
|
|
@property
|
|
def optional(self):
|
|
return not self.required and not self.repeated
|
|
|
|
@property
|
|
def experimental(self):
|
|
return bool(self.extensions.get("experimental"))
|
|
|
|
@property
|
|
def is_message(self):
|
|
return self.type == FieldDescriptor.TYPE_MESSAGE
|
|
|
|
@property
|
|
def is_enum(self):
|
|
return self.type == FieldDescriptor.TYPE_ENUM
|
|
|
|
@property
|
|
def python_type(self):
|
|
return FIELD_TYPES_PYTHON.get(self.type, self.type_name)
|
|
|
|
@property
|
|
def default_value(self):
|
|
if not self.orig.HasField("default_value"):
|
|
return None
|
|
return self.orig.default_value
|
|
|
|
@property
|
|
def default_value_repr(self):
|
|
if self.default_value is None:
|
|
return "None"
|
|
|
|
elif self.is_enum:
|
|
selected_enum_value = strip_enum_prefix(self.type_name, self.default_value)
|
|
return f"{self.type_name}.{selected_enum_value}"
|
|
|
|
elif self.type == FieldDescriptor.TYPE_STRING:
|
|
return repr(self.default_value)
|
|
elif self.type == FieldDescriptor.TYPE_BYTES:
|
|
return "b" + repr(self.default_value)
|
|
elif self.type == FieldDescriptor.TYPE_BOOL:
|
|
return "True" if self.default_value == "true" else "False"
|
|
else:
|
|
return str(self.default_value)
|
|
|
|
@property
|
|
def type_object(self):
|
|
if self.is_enum:
|
|
return find_by_name(self.descriptor.enums, self.type_name)
|
|
if self.is_message:
|
|
return find_by_name(self.descriptor.messages, self.type_name)
|
|
return None
|
|
|
|
@classmethod
|
|
def from_field(cls, descriptor, field):
|
|
if not field.type_name:
|
|
type_name = TYPE_NAMES[field.type]
|
|
else:
|
|
type_name = field.type_name.rsplit(".")[-1]
|
|
return cls(
|
|
name=field.name,
|
|
number=field.number,
|
|
type=field.type,
|
|
orig=field,
|
|
extensions=descriptor.get_extensions(field),
|
|
type_name=type_name,
|
|
descriptor=descriptor,
|
|
)
|
|
|
|
|
|
@attr.s(auto_attribs=True)
|
|
class ProtoMessage:
|
|
name: str
|
|
wire_type: Optional[int]
|
|
orig: object
|
|
extensions: dict
|
|
|
|
fields: List[ProtoField]
|
|
|
|
@classmethod
|
|
def from_message(cls, descriptor: "Descriptor", message):
|
|
message_type = find_by_name(descriptor.message_type_enum.value, message.name)
|
|
# use extensions set on the message_type entry (if any)
|
|
extensions = descriptor.get_extensions(message_type)
|
|
# override with extensions set on the message itself
|
|
extensions.update(descriptor.get_extensions(message))
|
|
|
|
if "wire_type" in extensions:
|
|
wire_type = extensions["wire_type"]
|
|
elif message_type is not None:
|
|
wire_type = message_type.number
|
|
else:
|
|
wire_type = None
|
|
|
|
return cls(
|
|
name=message.name,
|
|
wire_type=wire_type,
|
|
orig=message,
|
|
extensions=extensions,
|
|
fields=[
|
|
ProtoField.from_field(descriptor, f)
|
|
for f in descriptor._filter_items(message.field)
|
|
],
|
|
)
|
|
|
|
|
|
def protoc(files):
|
|
"""Compile code with protoc and return the data."""
|
|
include_dirs = set()
|
|
include_dirs.add(str(PROTOC_PREFIX / "include"))
|
|
if "PROTOC_INCLUDE" in os.environ:
|
|
include_dirs.add(os.environ["PROTOC_INCLUDE"])
|
|
|
|
for file in files:
|
|
include_dirs.add(os.path.dirname(file) or ".")
|
|
protoc_includes = ["-I" + dir for dir in include_dirs if dir]
|
|
|
|
return subprocess.check_output(
|
|
[PROTOC, "--descriptor_set_out=/dev/stdout"] + protoc_includes + list(files)
|
|
)
|
|
|
|
|
|
def strip_enum_prefix(enum_name, value_name):
|
|
"""Generate stripped-down enum value name, given the enum type name.
|
|
|
|
There are three kinds of enums in the codebase:
|
|
|
|
(1) New-style:
|
|
|
|
enum SomeEnum {
|
|
First_Value = 1;
|
|
SecondValue = 2;
|
|
}
|
|
|
|
(2) Old-style without "Type":
|
|
|
|
enum SomeEnum {
|
|
SomeEnum_First_Value = 1;
|
|
SomeEnum_SecondValue = 2;
|
|
}
|
|
|
|
(3) Old-style with "Type":
|
|
|
|
enum SomeEnumType {
|
|
SomeEnum_First_Value = 1;
|
|
SomeEnum_SecondValue = 2;
|
|
}
|
|
|
|
This function accepts the name of the enum ("SomeEnum") and the name of the value,
|
|
and returns the name of the value as it would look in the new-style -- i.e.,
|
|
for any variation of the above, the values returned would be "First_Value" and
|
|
"SecondValue".
|
|
"""
|
|
leader = enum_name + "_"
|
|
if value_name.startswith(leader):
|
|
return value_name[len(leader) :]
|
|
|
|
if enum_name.endswith("Type"):
|
|
leader = enum_name[: -len("Type")] + "_"
|
|
if value_name.startswith(leader):
|
|
return value_name[len(leader) :]
|
|
|
|
return value_name
|
|
|
|
|
|
def find_by_name(haystack, name, default=None):
|
|
return next((item for item in haystack if item.name == name), default)
|
|
|
|
|
|
class Descriptor:
|
|
def __init__(self, data, include_deprecated: bool, bitcoin_only: bool):
|
|
self.descriptor = descriptor_pb2.FileDescriptorSet()
|
|
self.descriptor.ParseFromString(data)
|
|
|
|
self.include_deprecated = include_deprecated
|
|
self.bitcoin_only = bitcoin_only
|
|
self.files = self.descriptor.file
|
|
|
|
logging.debug(f"found {len(self.files)} files")
|
|
|
|
# collect extensions across all files
|
|
# this is required for self._get_extension() to work
|
|
self.extensions = {
|
|
ext.name: ext.number for file in self.files for ext in file.extension
|
|
}
|
|
|
|
# find message_type enum
|
|
top_level_enums = itertools.chain.from_iterable(f.enum_type for f in self.files)
|
|
self.message_type_enum = find_by_name(top_level_enums, MESSAGE_TYPE_ENUM, ())
|
|
self.convert_enum_value_names(self.message_type_enum)
|
|
|
|
# top-level message inclusion filter that takes bitcoin_only into account
|
|
def should_include_message(message: ProtoMessage):
|
|
return (
|
|
# include all messages when not in bitcoin_only mode
|
|
not self.bitcoin_only
|
|
# include all non-wire messages
|
|
or message.wire_type is None
|
|
# include messages that are marked bitcoin_only
|
|
or message.extensions.get("bitcoin_only")
|
|
)
|
|
|
|
# find messages and enums
|
|
self.messages = []
|
|
self.enums = []
|
|
|
|
for file in self.files:
|
|
messages = (
|
|
ProtoMessage.from_message(self, m)
|
|
for m in self._filter_items(file.message_type)
|
|
)
|
|
# use exclusion list on top-level messages
|
|
messages = [m for m in messages if should_include_message(m)]
|
|
self.messages += messages
|
|
self.enums += self._filter_items(file.enum_type)
|
|
|
|
for message in messages:
|
|
# recursively search for nested types in newly added messages
|
|
self._nested_types_from_message(message.orig)
|
|
|
|
if not self.messages and not self.enums:
|
|
raise RuntimeError("No messages and no enums found.")
|
|
|
|
for enum in self.enums:
|
|
self.convert_enum_value_names(enum)
|
|
|
|
self.depsort_messages()
|
|
|
|
def depsort_messages(self):
|
|
# sort messages according to dependencies
|
|
messages_dict = {m.name: m for m in reversed(self.messages)}
|
|
messages_sorted = []
|
|
seen_messages = set()
|
|
|
|
# pop first message
|
|
_, message = messages_dict.popitem()
|
|
|
|
while True:
|
|
dependent_type = next(
|
|
(
|
|
f.type_name
|
|
for f in message.fields
|
|
if f.is_message and f.type_name not in seen_messages
|
|
),
|
|
None,
|
|
)
|
|
if dependent_type:
|
|
# return current message to unprocessed
|
|
messages_dict[message.name] = message
|
|
# pop the dependency
|
|
message = messages_dict.pop(dependent_type)
|
|
else:
|
|
# see message
|
|
seen_messages.add(message.name)
|
|
messages_sorted.append(message)
|
|
if not messages_dict:
|
|
break
|
|
else:
|
|
_, message = messages_dict.popitem()
|
|
|
|
assert len(messages_sorted) == len(self.messages)
|
|
self.messages[:] = messages_sorted
|
|
|
|
def _filter_items(self, iter):
|
|
return [
|
|
item
|
|
for item in iter
|
|
# exclude deprecated items unless specified
|
|
if (self.include_deprecated or not item.options.deprecated)
|
|
]
|
|
|
|
def _get_extension(self, something, extension_name, default=None):
|
|
if something is None:
|
|
return default
|
|
if extension_name not in self.extensions:
|
|
return default
|
|
|
|
# There doesn't seem to be a sane way to access extensions on a descriptor
|
|
# via the google.protobuf API.
|
|
# We do have access to descriptors of the extensions...
|
|
extension_num = self.extensions[extension_name]
|
|
# ...and the "options" descriptor _does_ include the extension data. But while
|
|
# the API provides access to unknown fields, it hides the extensions.
|
|
# What we do is re-encode the options descriptor...
|
|
options_bytes = something.options.SerializeToString()
|
|
# ...and re-parse it as a dict of uvarints...
|
|
simple_values = parse_protobuf_simple(options_bytes)
|
|
# ...and extract the value corresponding to the extension we care about.
|
|
return simple_values.get(extension_num, default)
|
|
|
|
def get_extensions(self, something):
|
|
return {
|
|
extension: self._get_extension(something, extension)
|
|
for extension in self.extensions
|
|
if self._get_extension(something, extension) is not None
|
|
}
|
|
|
|
def _nested_types_from_message(self, message):
|
|
nested_messages = [
|
|
ProtoMessage.from_message(self, m)
|
|
for m in self._filter_items(message.nested_type)
|
|
]
|
|
self.messages += nested_messages
|
|
self.enums += self._filter_items(message.enum_type)
|
|
for nested in nested_messages:
|
|
self._nested_types_from_message(nested.orig)
|
|
|
|
def convert_enum_value_names(self, enum):
|
|
for value in enum.value:
|
|
value.name = strip_enum_prefix(enum.name, value.name)
|
|
|
|
|
|
class PythonRenderer:
|
|
def __init__(self, descriptor: Descriptor, out_dir="", python_extension="py"):
|
|
self.descriptor = descriptor
|
|
self.out_dir = Path(out_dir)
|
|
self.python_extension = python_extension
|
|
|
|
def process_message(self, template, message):
|
|
logging.debug(f"Processing message {message.name}")
|
|
return template.render(message=message)
|
|
|
|
def process_enum(self, template, enum):
|
|
logging.debug(f"Processing enum {enum.name}")
|
|
|
|
all_values = self.descriptor._filter_items(enum.value)
|
|
|
|
has_bitcoin_only_values = self.descriptor._get_extension(
|
|
enum, "has_bitcoin_only_values"
|
|
)
|
|
if has_bitcoin_only_values:
|
|
values_always = [
|
|
v
|
|
for v in all_values
|
|
if self.descriptor._get_extension(v, "bitcoin_only")
|
|
]
|
|
values_altcoin = [v for v in all_values if v not in values_always]
|
|
|
|
else:
|
|
values_always = all_values
|
|
values_altcoin = []
|
|
|
|
return template.render(
|
|
enum=enum,
|
|
values_always=values_always,
|
|
values_altcoin=values_altcoin,
|
|
)
|
|
|
|
def write_to_file(self, item_name, content):
|
|
dest = self.out_dir / (item_name + "." + self.python_extension)
|
|
dest.write_text(content)
|
|
|
|
def generate_messages(self, template_src):
|
|
template = mako.template.Template(filename=str(template_src))
|
|
for message in self.descriptor.messages:
|
|
self.write_to_file(message.name, self.process_message(template, message))
|
|
|
|
def generate_enums(self, template_src):
|
|
template = mako.template.Template(filename=str(template_src))
|
|
for enum in self.descriptor.enums:
|
|
self.write_to_file(enum.name, self.process_enum(template, enum))
|
|
|
|
def render_singlefile(self, template_src):
|
|
template = mako.template.Template(filename=str(template_src))
|
|
return template.render(
|
|
messages=self.descriptor.messages,
|
|
enums=self.descriptor.enums,
|
|
)
|
|
|
|
def generate_python(self):
|
|
enum_template = self.out_dir / "_proto_enum_class.mako"
|
|
message_template = self.out_dir / "_proto_message_class.mako"
|
|
init_template = self.out_dir / "_proto_init.mako"
|
|
if enum_template.exists():
|
|
self.generate_enums(enum_template)
|
|
if message_template.exists():
|
|
self.generate_messages(message_template)
|
|
if init_template.exists():
|
|
init_py = self.render_singlefile(init_template)
|
|
self.write_to_file("__init__", init_py)
|
|
|
|
|
|
class RustBlobRenderer:
|
|
def __init__(self, descriptor: Descriptor, qstr_defs: str = None):
|
|
self.descriptor = descriptor
|
|
|
|
self.qstr_map = {}
|
|
self.enum_map = {}
|
|
self.msg_map = {}
|
|
|
|
if qstr_defs:
|
|
self.build_qstr_map(qstr_defs)
|
|
|
|
def write_qstrs(self, qstr_path):
|
|
logging.debug(f"Writing qstrings to {qstr_path}")
|
|
message_names = {m.name for m in self.descriptor.messages}
|
|
field_names = {
|
|
f.name for message in self.descriptor.messages for f in message.fields
|
|
}
|
|
with open(qstr_path, "w") as f:
|
|
for name in sorted(message_names | field_names):
|
|
f.write(f"Q({name})\n")
|
|
|
|
def write_blobs(self, blob_dir):
|
|
logging.debug(f"Writing blobs to {blob_dir}")
|
|
blob_dir = Path(blob_dir)
|
|
|
|
enum_blob = self.build_enums_with_offsets()
|
|
|
|
# build msg entries and fill out map
|
|
msg_entries = self.build_message_entries()
|
|
# fill message offsets
|
|
self.fill_enum_or_msg_offsets(msg_entries)
|
|
# encode blob
|
|
msg_blob = self.build_message_blob(msg_entries)
|
|
|
|
name_blob = self.build_blob_names()
|
|
wire_blob = self.build_blob_wire()
|
|
|
|
(blob_dir / "proto_enums.data").write_bytes(enum_blob)
|
|
(blob_dir / "proto_msgs.data").write_bytes(msg_blob)
|
|
(blob_dir / "proto_names.data").write_bytes(name_blob)
|
|
(blob_dir / "proto_wire.data").write_bytes(wire_blob)
|
|
|
|
def build_qstr_map(self, qstr_defs):
|
|
# QSTR defs are rolled out into an enum in py/qstr.h, the numeric
|
|
# value is simply an incremented integer.
|
|
qstr_counter = 0
|
|
with open(qstr_defs, "r") as f:
|
|
for line in f:
|
|
match = QDEF_RE.match(line)
|
|
if not match:
|
|
continue
|
|
line = match.group(0)
|
|
string = match.group(3)
|
|
self.qstr_map[string] = qstr_counter
|
|
qstr_counter += 1
|
|
logging.debug(f"Found {qstr_counter} Qstr defs")
|
|
|
|
def build_enums_with_offsets(self):
|
|
enums = []
|
|
cursor = 0
|
|
for enum in sorted(self.descriptor.enums, key=lambda e: e.name):
|
|
self.enum_map[enum.name] = cursor
|
|
enum_blob = ENUM_ENTRY.build(sorted(v.number for v in enum.value))
|
|
enums.append(enum_blob)
|
|
cursor += len(enum_blob)
|
|
|
|
return b"".join(enums)
|
|
|
|
def encode_flags_and_wire_type(self, message):
|
|
wire_type = message.wire_type
|
|
if wire_type is None:
|
|
wire_type = 0x7FFF
|
|
if wire_type > 0x7FFF:
|
|
raise ValueError("Unsupported wire type")
|
|
|
|
flags_and_wire_type = wire_type
|
|
if message.extensions.get("unstable"):
|
|
flags_and_wire_type |= 0x8000
|
|
|
|
return flags_and_wire_type
|
|
|
|
def encode_field(self, field):
|
|
return dict(
|
|
tag=field.number,
|
|
flags_and_type=dict(
|
|
is_required=field.required,
|
|
is_repeated=field.repeated,
|
|
is_experimental=field.experimental,
|
|
type=FIELD_TYPES_RUST_BLOB[field.type],
|
|
),
|
|
enum_or_msg_offset=0,
|
|
name=self.qstr_map[field.name],
|
|
orig_field=field,
|
|
)
|
|
|
|
def fill_enum_or_msg_offsets(self, msg_entries):
|
|
for msg_dict in msg_entries:
|
|
for field_dict in msg_dict["fields"]:
|
|
field = field_dict["orig_field"]
|
|
if field.is_enum:
|
|
field_dict["enum_or_msg_offset"] = self.enum_map[field.type_name]
|
|
elif field.is_message:
|
|
field_dict["enum_or_msg_offset"] = self.msg_map[field.type_name]
|
|
|
|
def build_message_entries(self):
|
|
messages = []
|
|
cursor = 0
|
|
|
|
for message in sorted(self.descriptor.messages, key=lambda m: m.name):
|
|
self.msg_map[message.name] = cursor
|
|
fields = sorted(message.fields, key=lambda f: f.number)
|
|
|
|
defaults = b"".join(self.encode_field_default(f) for f in fields)
|
|
flags_and_wire_type = self.encode_flags_and_wire_type(message)
|
|
entry = dict(
|
|
flags_and_wire_type=flags_and_wire_type,
|
|
fields=[self.encode_field(f) for f in fields],
|
|
defaults=defaults,
|
|
)
|
|
|
|
messages.append(entry)
|
|
cursor += len(MSG_ENTRY.build(entry))
|
|
|
|
return messages
|
|
|
|
def build_message_blob(self, msg_entries):
|
|
return b"".join(MSG_ENTRY.build(entry) for entry in msg_entries)
|
|
|
|
def encode_field_default(self, field):
|
|
if field.number > 0xFF:
|
|
raise ValueError("Invalid field number")
|
|
|
|
default = field.default_value
|
|
|
|
if default is None:
|
|
return b""
|
|
|
|
elif field.type in INT_TYPES:
|
|
return DEFAULT_VARINT_ENTRY.build((field.number, int(default)))
|
|
|
|
elif field.type == FieldDescriptor.TYPE_BOOL:
|
|
return DEFAULT_VARINT_ENTRY.build((field.number, int(default == "True")))
|
|
|
|
elif field.type == FieldDescriptor.TYPE_BYTES:
|
|
if default != "":
|
|
raise ValueError(
|
|
"Bytes fields can only have empty bytes for default value"
|
|
)
|
|
return DEFAULT_LENGTH_ENTRY.build((field.number, b""))
|
|
|
|
elif field.type == FieldDescriptor.TYPE_STRING:
|
|
return DEFAULT_LENGTH_ENTRY.build((field.number, default.encode()))
|
|
|
|
elif field.is_enum:
|
|
# find the right value
|
|
value = find_by_name(field.type_object.value, default)
|
|
if value is None:
|
|
raise ValueError(f"Default not found for field {field.name}")
|
|
return DEFAULT_VARINT_ENTRY.build((field.number, value.number))
|
|
|
|
else:
|
|
raise ValueError(f"Cannot encode default value for field {field.name}")
|
|
|
|
def build_blob_names(self):
|
|
# sorting by Qstr value of the message name
|
|
messages = sorted(self.descriptor.messages, key=lambda m: self.qstr_map[m.name])
|
|
return b"".join(
|
|
NAME_ENTRY.build((self.qstr_map[m.name], self.msg_map[m.name]))
|
|
for m in messages
|
|
)
|
|
|
|
def build_blob_wire(self):
|
|
# create wire-type -> message mapping
|
|
wire_messages = [m for m in self.descriptor.messages if m.wire_type is not None]
|
|
# sorting by wire-type
|
|
wire_messages.sort(key=lambda m: m.wire_type)
|
|
return b"".join(
|
|
WIRETYPE_ENTRY.build((m.wire_type, self.msg_map[m.name]))
|
|
for m in wire_messages
|
|
)
|
|
|
|
|
|
ReadableFile = click.Path(exists=True, dir_okay=False, readable=True)
|
|
WritableFile = click.Path(dir_okay=False, writable=True)
|
|
WritableDirectory = click.Path(exists=True, file_okay=False, writable=True)
|
|
|
|
|
|
@click.command()
|
|
# fmt: off
|
|
@click.argument("proto", nargs=-1, type=ReadableFile, required=True)
|
|
@click.option("--python-outdir", type=WritableDirectory, help="Output directory for Python classes (contents will be deleted)")
|
|
@click.option("--python-extension", default="py", help="Use .pyi to generate type stubs")
|
|
@click.option("--outfile", type=WritableFile, help="Output file for single-file generated definitions")
|
|
@click.option("--template", type=ReadableFile, help="Template for single-file entry")
|
|
@click.option("--blob-outdir", type=WritableDirectory, help="Output directory for protobuf blob files")
|
|
@click.option("--qstr-defs", type=ReadableFile, help="Collected Qstr definitions")
|
|
@click.option("--qstr-out", type=WritableFile, help="Output Qstr header")
|
|
@click.option("-v", "--verbose", is_flag=True)
|
|
@click.option("-d", "--include-deprecated", is_flag=True, help="Include deprecated fields, messages and enums")
|
|
@click.option("-b", "--bitcoin-only", type=int, default=0, help="Exclude fields, messages and enums that do not belong to bitcoin_only builds")
|
|
# fmt: on
|
|
def main(
|
|
proto,
|
|
python_outdir,
|
|
python_extension,
|
|
outfile,
|
|
template,
|
|
blob_outdir,
|
|
qstr_defs,
|
|
qstr_out,
|
|
verbose,
|
|
include_deprecated,
|
|
bitcoin_only,
|
|
):
|
|
if verbose:
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
descriptor_proto = protoc(proto)
|
|
descriptor = Descriptor(
|
|
descriptor_proto,
|
|
include_deprecated=include_deprecated,
|
|
bitcoin_only=bitcoin_only,
|
|
)
|
|
|
|
if python_outdir:
|
|
outdir = Path(python_outdir)
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
tmpdir_path = Path(tmpdir)
|
|
for file in outdir.glob("_proto*.mako"):
|
|
shutil.copy(file, tmpdir)
|
|
|
|
renderer = PythonRenderer(descriptor, tmpdir_path, python_extension)
|
|
renderer.generate_python()
|
|
|
|
for file in outdir.glob("*." + python_extension):
|
|
if file.name == "__init__." + python_extension:
|
|
continue
|
|
file.unlink()
|
|
|
|
for file in tmpdir_path.iterdir():
|
|
shutil.copy(file, outdir)
|
|
|
|
if outfile:
|
|
if not template:
|
|
raise click.ClickException("Please specify --template")
|
|
|
|
renderer = PythonRenderer(descriptor)
|
|
with open(outfile, "w") as f:
|
|
f.write(renderer.render_singlefile(template))
|
|
|
|
if qstr_out:
|
|
renderer = RustBlobRenderer(descriptor)
|
|
renderer.write_qstrs(qstr_out)
|
|
|
|
if blob_outdir:
|
|
if not qstr_defs:
|
|
raise click.ClickException("Qstr defs not provided")
|
|
|
|
renderer = RustBlobRenderer(descriptor, qstr_defs)
|
|
renderer.write_blobs(blob_outdir)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|