|
|
|
@ -6,7 +6,7 @@ bytes, string, embedded message and repeated fields.
|
|
|
|
|
from micropython import const
|
|
|
|
|
|
|
|
|
|
if False:
|
|
|
|
|
from typing import Any, Dict, Iterable, List, Optional, Type, TypeVar, Union
|
|
|
|
|
from typing import Any, Dict, Iterable, List, Tuple, Type, TypeVar, Union
|
|
|
|
|
from typing_extensions import Protocol
|
|
|
|
|
|
|
|
|
|
class Reader(Protocol):
|
|
|
|
@ -150,7 +150,7 @@ class MessageType:
|
|
|
|
|
MESSAGE_WIRE_TYPE = -1
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def get_fields(cls) -> Dict:
|
|
|
|
|
def get_fields(cls) -> "FieldDict":
|
|
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
def __init__(self, **kwargs: Any) -> None:
|
|
|
|
@ -181,11 +181,25 @@ class LimitedReader:
|
|
|
|
|
FLAG_REPEATED = const(1)
|
|
|
|
|
|
|
|
|
|
if False:
|
|
|
|
|
MessageTypeDef = Union[
|
|
|
|
|
Type[UVarintType],
|
|
|
|
|
Type[SVarintType],
|
|
|
|
|
Type[BoolType],
|
|
|
|
|
EnumType,
|
|
|
|
|
Type[BytesType],
|
|
|
|
|
Type[UnicodeType],
|
|
|
|
|
Type[MessageType],
|
|
|
|
|
]
|
|
|
|
|
FieldDef = Tuple[str, MessageTypeDef, int]
|
|
|
|
|
FieldDict = Dict[int, FieldDef]
|
|
|
|
|
|
|
|
|
|
FieldCache = Dict[Type[MessageType], FieldDict]
|
|
|
|
|
|
|
|
|
|
LoadedMessageType = TypeVar("LoadedMessageType", bound=MessageType)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_message(
|
|
|
|
|
reader: Reader, msg_type: Type[LoadedMessageType], field_cache: Dict = None
|
|
|
|
|
reader: Reader, msg_type: Type[LoadedMessageType], field_cache: FieldCache = None
|
|
|
|
|
) -> LoadedMessageType:
|
|
|
|
|
|
|
|
|
|
if field_cache is None:
|
|
|
|
@ -264,7 +278,9 @@ def load_message(
|
|
|
|
|
return msg
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def dump_message(writer: Writer, msg: MessageType, field_cache: Dict = None) -> None:
|
|
|
|
|
def dump_message(
|
|
|
|
|
writer: Writer, msg: MessageType, field_cache: FieldCache = None
|
|
|
|
|
) -> None:
|
|
|
|
|
repvalue = [0]
|
|
|
|
|
|
|
|
|
|
if field_cache is None:
|
|
|
|
@ -328,7 +344,7 @@ def dump_message(writer: Writer, msg: MessageType, field_cache: Dict = None) ->
|
|
|
|
|
raise TypeError
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def count_message(msg: MessageType, field_cache: Dict = None) -> int:
|
|
|
|
|
def count_message(msg: MessageType, field_cache: FieldCache = None) -> int:
|
|
|
|
|
nbytes = 0
|
|
|
|
|
repvalue = [0]
|
|
|
|
|
|
|
|
|
@ -337,7 +353,7 @@ def count_message(msg: MessageType, field_cache: Dict = None) -> int:
|
|
|
|
|
fields = field_cache.get(type(msg))
|
|
|
|
|
if fields is None:
|
|
|
|
|
fields = msg.get_fields()
|
|
|
|
|
field_cache[msg] = fields
|
|
|
|
|
field_cache[type(msg)] = fields
|
|
|
|
|
|
|
|
|
|
for ftag in fields:
|
|
|
|
|
fname, ftype, fflags = fields[ftag]
|
|
|
|
@ -387,12 +403,10 @@ def count_message(msg: MessageType, field_cache: Dict = None) -> int:
|
|
|
|
|
nbytes += svalue
|
|
|
|
|
|
|
|
|
|
elif issubclass(ftype, MessageType):
|
|
|
|
|
ffields = ftype.get_fields()
|
|
|
|
|
for svalue in fvalue:
|
|
|
|
|
fsize = count_message(svalue, ffields)
|
|
|
|
|
fsize = count_message(svalue, field_cache)
|
|
|
|
|
nbytes += count_uvarint(fsize)
|
|
|
|
|
nbytes += fsize
|
|
|
|
|
del ffields
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
raise TypeError
|
|
|
|
|