From c02e73d1dc0764809da4dcd3018cdd42e7168dcb Mon Sep 17 00:00:00 2001 From: Martin Novak Date: Mon, 10 Oct 2022 09:50:08 +0200 Subject: [PATCH] feat(common): check sizes for network and token definitions --- common/tools/ethereum_definitions.py | 182 +++++++++++++++++++++------ 1 file changed, 147 insertions(+), 35 deletions(-) diff --git a/common/tools/ethereum_definitions.py b/common/tools/ethereum_definitions.py index c3103831f1..2052187616 100755 --- a/common/tools/ethereum_definitions.py +++ b/common/tools/ethereum_definitions.py @@ -18,13 +18,7 @@ import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry -from coin_info import ( - Coin, - Coins, - _load_builtin_erc20_tokens, - _load_builtin_ethereum_networks, - load_json, -) +from coin_info import Coin, Coins, load_json from merkle_tree import MerkleTree from trezorlib import protobuf from trezorlib.messages import ( @@ -359,10 +353,9 @@ def print_definitions_collision( name: str, definitions: List[Dict], old_definitions: List[Dict] | None = None, - prompt: bool = True, ) -> int | None: """Print colliding definitions and ask which one to keep if requested. - Returns a tuple composed from the prompt result if prompted otherwise None and the default value.""" + Returns an index of selected definition from the prompt result (if prompted) or the default value.""" if old_definitions: old_defs_hash_no_metadata = [ hash_dict_on_keys(d, exclude_keys=["metadata", "coingecko_id"]) @@ -370,8 +363,7 @@ def print_definitions_collision( ] default_index = None - title = f"COLLISION BETWEEN {name}S" - print(f"== {title} ==") + print(f"== COLLISION BETWEEN {name}S ==") for idx, definition in enumerate(definitions): found = "" if ( @@ -384,18 +376,16 @@ def print_definitions_collision( print(f"DEFINITION {idx}{found}:") print(json.dumps(definition, sort_keys=True, indent=None)) - answer = None - if prompt: - answer = int( - click.prompt( - "Which definition do you want to keep? Please enter a valid integer", - type=click.Choice([str(n) for n in range(len(definitions))]), - show_choices=True, - default=str(default_index) if default_index is not None else None, - show_default=default_index is not None, - ) + answer = int( + click.prompt( + "Which definition do you want to keep? Please enter a valid integer", + type=click.Choice([str(n) for n in range(len(definitions))]), + show_choices=True, + default=str(default_index) if default_index is not None else None, + show_default=default_index is not None, ) - return answer, default_index + ) + return answer def get_definition_deleted_status(definition: Dict) -> str: @@ -420,14 +410,11 @@ def check_tokens_collisions(tokens: List[Dict], old_tokens: List[Dict] | None) - print(f"\nNumber of collisions: {no_of_collisions}") # solve collisions - delete_indexes = [] + delete_indexes: list[int] = [] for _, v in collisions.items(): if len(v) > 1: coliding_networks = [tokens[i] for i in v] - choice, default = print_definitions_collision( - "TOKEN", coliding_networks, old_tokens - ) - index = choice if choice is not None else default + index = print_definitions_collision("TOKEN", coliding_networks, old_tokens) print(f"Keeping the definition with index {index}.") v.pop(index) delete_indexes.extend(v) @@ -438,6 +425,113 @@ def check_tokens_collisions(tokens: List[Dict], old_tokens: List[Dict] | None) - tokens.pop(idx) +def check_bytes_size( + value: bytes, max_size: int, label: str, prompt: bool = True +) -> Tuple[bool, bool]: + """Check value (of type bytes) size and return tuple - size check and user response""" + encoded_size = len(value) + if encoded_size > max_size: + title = f"Bytes in {label} definition is too long ({encoded_size} > {max_size})" + title += " and will be removed from the results" if not prompt else "" + print(f"== {title} ==") + + if prompt: + answer = click.prompt( + "Do you want to remove this definition? If not, this whole process will stop:", + type=click.Choice(["y", "n"]), + show_choices=True, + default="y", + show_default=True, + ) + return False, answer == "y" + else: + return False, True + + return True, True + + +def check_string_size( + definition: dict, field_name: str, max_size: int, prompt: bool = True +) -> bool: + """Check encoded size of a string from \"definition[field_name]\" and return result combined with user response.""" + encoded_size = len(definition[field_name].encode()) + if encoded_size > max_size - 1: + title = f'Size of encoded string field "{field_name}" is too long ({encoded_size} > {max_size - 1})' + title += " and will be shortened to fit in" if not prompt else "" + print(f"== {title} ==") + print(json.dumps(definition, sort_keys=True, indent=None)) + + if prompt: + answer = click.prompt( + "Do you want to shorten this string? If not, this whole definition will be removed from the results:", + type=click.Choice(["y", "n"]), + show_choices=True, + default="y", + show_default=True, + ) + if answer == "n": + return False + + definition[field_name] = definition[field_name][: max_size - 1] + + return True + + +def check_networks_fields_sizes(networks: list[dict], interactive: bool) -> None: + """Check sizes of embeded network fields for Trezor model 1 based on "legacy/firmware/protob/messages-ethereum.options".""" + # EthereumNetworkInfo.name max_size:256 + # EthereumNetworkInfo.shortcut max_size:256 + to_remove: list[int] = [] + for idx, network in enumerate(networks): + if not check_string_size( + network, "name", 256, interactive + ) or not check_string_size(network, "shortcut", 256, interactive): + to_remove.append(idx) + + # delete networks with too big field sizes + to_remove.sort(reverse=True) + for idx in to_remove: + networks.pop(idx) + + +def check_tokens_fields_sizes(tokens: list[dict], interactive: bool) -> bool: + """Check sizes of embeded token fields for Trezor model 1 based on "legacy/firmware/protob/messages-ethereum.options".""" + # EthereumTokenInfo.name max_size:256 + # EthereumTokenInfo.symbol max_size:256 (here stored under "shortcut") + # EthereumTokenInfo.address max_size:20 + to_remove: list[int] = [] + invalid_address_size_found = False + for idx, token in enumerate(tokens): + check, action = check_bytes_size( + bytes.fromhex(token["address"][2:]), + 20, + f"token {token['name']} (chain_id={token['chain_id']}, address={token['address']})", + interactive, + ) + if not check: + if action: + to_remove.append(idx) + continue + else: + invalid_address_size_found = True + + if not check_string_size( + token, "name", 256, interactive + ) or not check_string_size(token, "shortcut", 256, interactive): + to_remove.append(idx) + + # if we found invalid address size we cannot proceed further + if invalid_address_size_found: + return False + + # delete tokens with too big field sizes + to_remove.sort(reverse=True) + for idx in to_remove: + tokens.pop(idx) + + return True + + def check_definitions_list( old_defs: List[Dict], new_defs: List[Dict], @@ -776,7 +870,7 @@ def cli() -> None: type=click.Path(resolve_path=True, dir_okay=False, path_type=pathlib.Path), default="./definitions-latest.json", help="File where the definitions will be saved in json format. If file already exists, it is used to check " - "the changes in definitions. Default is \"./definitions-latest.json\".", + 'the changes in definitions. Default is "./definitions-latest.json".', ) @click.option( "-n", @@ -884,6 +978,11 @@ def prepare_definitions( # load old definitions old_defs = load_json(deffile) + # check field sizes here - shortened strings can introduce new collisions + check_networks_fields_sizes(networks, interactive) + if not check_tokens_fields_sizes(tokens, interactive): + return + check_tokens_collisions( tokens, old_defs["tokens"] if old_defs is not None else None ) @@ -986,11 +1085,8 @@ def sign_definitions( with open(complete_file_path, mode="wb+") as f: f.write(data) - def generate_token_defs(tokens: Coins): - for token in tokens: - if token["address"] is None or token["chain_id"] is None: - continue - + def generate_token_def(token: Coin): + if token["address"] is not None and token["chain_id"] is not None: # save token definition save_definition( outdir / "by_chain_id" / token["chain_id"], @@ -1059,10 +1155,26 @@ def sign_definitions( # append signed tree root hash definition["serialized"] += signed_root_hash + # check serialized size of the definitions and generate a file for it for network in networks: - generate_network_def(network) + check, _ = check_bytes_size( + network["serialized"], + 1024, + f"network {network['name']} (chain_id={network['chain_id']})", + False, + ) + if check: + generate_network_def(network) - generate_token_defs(tokens) + for token in tokens: + check, _ = check_bytes_size( + token["serialized"], + 1024, + f"token {token['name']} (chain_id={token['chain_id']}, address={token['address']})", + False, + ) + if check: + generate_token_def(token) if __name__ == "__main__":