mirror of
https://github.com/trezor/trezor-firmware.git
synced 2025-08-04 21:05:29 +00:00
feat(common): check sizes for network and token definitions
This commit is contained in:
parent
e05839bdd0
commit
c02e73d1dc
@ -18,13 +18,7 @@ import requests
|
||||
from requests.adapters import HTTPAdapter
|
||||
from urllib3.util.retry import Retry
|
||||
|
||||
from coin_info import (
|
||||
Coin,
|
||||
Coins,
|
||||
_load_builtin_erc20_tokens,
|
||||
_load_builtin_ethereum_networks,
|
||||
load_json,
|
||||
)
|
||||
from coin_info import Coin, Coins, load_json
|
||||
from merkle_tree import MerkleTree
|
||||
from trezorlib import protobuf
|
||||
from trezorlib.messages import (
|
||||
@ -359,10 +353,9 @@ def print_definitions_collision(
|
||||
name: str,
|
||||
definitions: List[Dict],
|
||||
old_definitions: List[Dict] | None = None,
|
||||
prompt: bool = True,
|
||||
) -> int | None:
|
||||
"""Print colliding definitions and ask which one to keep if requested.
|
||||
Returns a tuple composed from the prompt result if prompted otherwise None and the default value."""
|
||||
Returns an index of selected definition from the prompt result (if prompted) or the default value."""
|
||||
if old_definitions:
|
||||
old_defs_hash_no_metadata = [
|
||||
hash_dict_on_keys(d, exclude_keys=["metadata", "coingecko_id"])
|
||||
@ -370,8 +363,7 @@ def print_definitions_collision(
|
||||
]
|
||||
|
||||
default_index = None
|
||||
title = f"COLLISION BETWEEN {name}S"
|
||||
print(f"== {title} ==")
|
||||
print(f"== COLLISION BETWEEN {name}S ==")
|
||||
for idx, definition in enumerate(definitions):
|
||||
found = ""
|
||||
if (
|
||||
@ -384,18 +376,16 @@ def print_definitions_collision(
|
||||
print(f"DEFINITION {idx}{found}:")
|
||||
print(json.dumps(definition, sort_keys=True, indent=None))
|
||||
|
||||
answer = None
|
||||
if prompt:
|
||||
answer = int(
|
||||
click.prompt(
|
||||
"Which definition do you want to keep? Please enter a valid integer",
|
||||
type=click.Choice([str(n) for n in range(len(definitions))]),
|
||||
show_choices=True,
|
||||
default=str(default_index) if default_index is not None else None,
|
||||
show_default=default_index is not None,
|
||||
)
|
||||
answer = int(
|
||||
click.prompt(
|
||||
"Which definition do you want to keep? Please enter a valid integer",
|
||||
type=click.Choice([str(n) for n in range(len(definitions))]),
|
||||
show_choices=True,
|
||||
default=str(default_index) if default_index is not None else None,
|
||||
show_default=default_index is not None,
|
||||
)
|
||||
return answer, default_index
|
||||
)
|
||||
return answer
|
||||
|
||||
|
||||
def get_definition_deleted_status(definition: Dict) -> str:
|
||||
@ -420,14 +410,11 @@ def check_tokens_collisions(tokens: List[Dict], old_tokens: List[Dict] | None) -
|
||||
print(f"\nNumber of collisions: {no_of_collisions}")
|
||||
|
||||
# solve collisions
|
||||
delete_indexes = []
|
||||
delete_indexes: list[int] = []
|
||||
for _, v in collisions.items():
|
||||
if len(v) > 1:
|
||||
coliding_networks = [tokens[i] for i in v]
|
||||
choice, default = print_definitions_collision(
|
||||
"TOKEN", coliding_networks, old_tokens
|
||||
)
|
||||
index = choice if choice is not None else default
|
||||
index = print_definitions_collision("TOKEN", coliding_networks, old_tokens)
|
||||
print(f"Keeping the definition with index {index}.")
|
||||
v.pop(index)
|
||||
delete_indexes.extend(v)
|
||||
@ -438,6 +425,113 @@ def check_tokens_collisions(tokens: List[Dict], old_tokens: List[Dict] | None) -
|
||||
tokens.pop(idx)
|
||||
|
||||
|
||||
def check_bytes_size(
|
||||
value: bytes, max_size: int, label: str, prompt: bool = True
|
||||
) -> Tuple[bool, bool]:
|
||||
"""Check value (of type bytes) size and return tuple - size check and user response"""
|
||||
encoded_size = len(value)
|
||||
if encoded_size > max_size:
|
||||
title = f"Bytes in {label} definition is too long ({encoded_size} > {max_size})"
|
||||
title += " and will be removed from the results" if not prompt else ""
|
||||
print(f"== {title} ==")
|
||||
|
||||
if prompt:
|
||||
answer = click.prompt(
|
||||
"Do you want to remove this definition? If not, this whole process will stop:",
|
||||
type=click.Choice(["y", "n"]),
|
||||
show_choices=True,
|
||||
default="y",
|
||||
show_default=True,
|
||||
)
|
||||
return False, answer == "y"
|
||||
else:
|
||||
return False, True
|
||||
|
||||
return True, True
|
||||
|
||||
|
||||
def check_string_size(
|
||||
definition: dict, field_name: str, max_size: int, prompt: bool = True
|
||||
) -> bool:
|
||||
"""Check encoded size of a string from \"definition[field_name]\" and return result combined with user response."""
|
||||
encoded_size = len(definition[field_name].encode())
|
||||
if encoded_size > max_size - 1:
|
||||
title = f'Size of encoded string field "{field_name}" is too long ({encoded_size} > {max_size - 1})'
|
||||
title += " and will be shortened to fit in" if not prompt else ""
|
||||
print(f"== {title} ==")
|
||||
print(json.dumps(definition, sort_keys=True, indent=None))
|
||||
|
||||
if prompt:
|
||||
answer = click.prompt(
|
||||
"Do you want to shorten this string? If not, this whole definition will be removed from the results:",
|
||||
type=click.Choice(["y", "n"]),
|
||||
show_choices=True,
|
||||
default="y",
|
||||
show_default=True,
|
||||
)
|
||||
if answer == "n":
|
||||
return False
|
||||
|
||||
definition[field_name] = definition[field_name][: max_size - 1]
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def check_networks_fields_sizes(networks: list[dict], interactive: bool) -> None:
|
||||
"""Check sizes of embeded network fields for Trezor model 1 based on "legacy/firmware/protob/messages-ethereum.options"."""
|
||||
# EthereumNetworkInfo.name max_size:256
|
||||
# EthereumNetworkInfo.shortcut max_size:256
|
||||
to_remove: list[int] = []
|
||||
for idx, network in enumerate(networks):
|
||||
if not check_string_size(
|
||||
network, "name", 256, interactive
|
||||
) or not check_string_size(network, "shortcut", 256, interactive):
|
||||
to_remove.append(idx)
|
||||
|
||||
# delete networks with too big field sizes
|
||||
to_remove.sort(reverse=True)
|
||||
for idx in to_remove:
|
||||
networks.pop(idx)
|
||||
|
||||
|
||||
def check_tokens_fields_sizes(tokens: list[dict], interactive: bool) -> bool:
|
||||
"""Check sizes of embeded token fields for Trezor model 1 based on "legacy/firmware/protob/messages-ethereum.options"."""
|
||||
# EthereumTokenInfo.name max_size:256
|
||||
# EthereumTokenInfo.symbol max_size:256 (here stored under "shortcut")
|
||||
# EthereumTokenInfo.address max_size:20
|
||||
to_remove: list[int] = []
|
||||
invalid_address_size_found = False
|
||||
for idx, token in enumerate(tokens):
|
||||
check, action = check_bytes_size(
|
||||
bytes.fromhex(token["address"][2:]),
|
||||
20,
|
||||
f"token {token['name']} (chain_id={token['chain_id']}, address={token['address']})",
|
||||
interactive,
|
||||
)
|
||||
if not check:
|
||||
if action:
|
||||
to_remove.append(idx)
|
||||
continue
|
||||
else:
|
||||
invalid_address_size_found = True
|
||||
|
||||
if not check_string_size(
|
||||
token, "name", 256, interactive
|
||||
) or not check_string_size(token, "shortcut", 256, interactive):
|
||||
to_remove.append(idx)
|
||||
|
||||
# if we found invalid address size we cannot proceed further
|
||||
if invalid_address_size_found:
|
||||
return False
|
||||
|
||||
# delete tokens with too big field sizes
|
||||
to_remove.sort(reverse=True)
|
||||
for idx in to_remove:
|
||||
tokens.pop(idx)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def check_definitions_list(
|
||||
old_defs: List[Dict],
|
||||
new_defs: List[Dict],
|
||||
@ -776,7 +870,7 @@ def cli() -> None:
|
||||
type=click.Path(resolve_path=True, dir_okay=False, path_type=pathlib.Path),
|
||||
default="./definitions-latest.json",
|
||||
help="File where the definitions will be saved in json format. If file already exists, it is used to check "
|
||||
"the changes in definitions. Default is \"./definitions-latest.json\".",
|
||||
'the changes in definitions. Default is "./definitions-latest.json".',
|
||||
)
|
||||
@click.option(
|
||||
"-n",
|
||||
@ -884,6 +978,11 @@ def prepare_definitions(
|
||||
# load old definitions
|
||||
old_defs = load_json(deffile)
|
||||
|
||||
# check field sizes here - shortened strings can introduce new collisions
|
||||
check_networks_fields_sizes(networks, interactive)
|
||||
if not check_tokens_fields_sizes(tokens, interactive):
|
||||
return
|
||||
|
||||
check_tokens_collisions(
|
||||
tokens, old_defs["tokens"] if old_defs is not None else None
|
||||
)
|
||||
@ -986,11 +1085,8 @@ def sign_definitions(
|
||||
with open(complete_file_path, mode="wb+") as f:
|
||||
f.write(data)
|
||||
|
||||
def generate_token_defs(tokens: Coins):
|
||||
for token in tokens:
|
||||
if token["address"] is None or token["chain_id"] is None:
|
||||
continue
|
||||
|
||||
def generate_token_def(token: Coin):
|
||||
if token["address"] is not None and token["chain_id"] is not None:
|
||||
# save token definition
|
||||
save_definition(
|
||||
outdir / "by_chain_id" / token["chain_id"],
|
||||
@ -1059,10 +1155,26 @@ def sign_definitions(
|
||||
# append signed tree root hash
|
||||
definition["serialized"] += signed_root_hash
|
||||
|
||||
# check serialized size of the definitions and generate a file for it
|
||||
for network in networks:
|
||||
generate_network_def(network)
|
||||
check, _ = check_bytes_size(
|
||||
network["serialized"],
|
||||
1024,
|
||||
f"network {network['name']} (chain_id={network['chain_id']})",
|
||||
False,
|
||||
)
|
||||
if check:
|
||||
generate_network_def(network)
|
||||
|
||||
generate_token_defs(tokens)
|
||||
for token in tokens:
|
||||
check, _ = check_bytes_size(
|
||||
token["serialized"],
|
||||
1024,
|
||||
f"token {token['name']} (chain_id={token['chain_id']}, address={token['address']})",
|
||||
False,
|
||||
)
|
||||
if check:
|
||||
generate_token_def(token)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
Loading…
Reference in New Issue
Block a user