1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-07-05 06:12:34 +00:00
trezor-firmware/common/tools/ethereum_definitions.py
2022-12-07 13:19:09 +01:00

1390 lines
48 KiB
Python
Executable File

#!/usr/bin/env python3
from __future__ import annotations
import copy
import datetime
import io
import json
import logging
import os
import pathlib
import re
import sys
import zipfile
from binascii import hexlify
from collections import defaultdict
from typing import Any, TextIO, cast
from urllib.parse import urlencode
import click
import ed25519
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from coin_info import Coin, Coins, load_json
from trezorlib import ethereum, protobuf
from trezorlib.merkle_tree import MerkleTree
from trezorlib.messages import (
EthereumDefinitionType,
EthereumNetworkInfo,
EthereumTokenInfo,
)
FORMAT_VERSION_BYTES = b"trzd1"
CURRENT_TIME = datetime.datetime.now(datetime.timezone.utc)
TIMESTAMP_FORMAT = "%d.%m.%Y %X%z"
CURRENT_TIMESTAMP_STR = CURRENT_TIME.strftime(TIMESTAMP_FORMAT)
ETHEREUM_TESTNETS_REGEXES = (".*testnet.*", ".*devnet.*")
if os.environ.get("DEFS_DIR"):
DEFS_DIR = pathlib.Path(os.environ.get("DEFS_DIR")).resolve()
else:
DEFS_DIR = pathlib.Path(__file__).resolve().parent.parent / "defs"
LATEST_DEFINITIONS_TIMESTAMP_FILEPATH = (
DEFS_DIR / "ethereum" / "latest_definitions_timestamp.txt"
)
DEFINITIONS_CACHE_FILEPATH = pathlib.Path("definitions-cache.json")
# ====== utils ======
def setup_logging(verbose: bool):
log_level = logging.DEBUG if verbose else logging.WARNING
root = logging.getLogger()
root.setLevel(log_level)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(log_level)
root.addHandler(handler)
def hash_dict_on_keys(
d: dict,
include_keys: list[str] | None = None,
exclude_keys: list[str] | None = None,
) -> int:
"""Get the hash of a dict on selected keys.
Options `include_keys` and `exclude_keys` are exclusive."""
if include_keys is not None and exclude_keys is not None:
raise TypeError("Options `include_keys` and `exclude_keys` are exclusive")
tmp_dict = {}
for k, v in d.items():
if include_keys is not None and k in include_keys:
tmp_dict[k] = v
elif exclude_keys is not None and k not in exclude_keys:
tmp_dict[k] = v
elif include_keys is None and exclude_keys is None:
tmp_dict[k] = v
return hash(json.dumps(tmp_dict, sort_keys=True))
class Cache:
"""Generic cache object that caches to json."""
def __init__(self, cache_filepath: pathlib.Path) -> None:
if cache_filepath.exists() and not cache_filepath.is_file():
raise ValueError(
f'Path for storing cache "{cache_filepath}" exists and is not a file.'
)
self.cache_filepath = cache_filepath
self.cached_data: Any = {}
def is_expired(self) -> bool:
mtime = (
self.cache_filepath.stat().st_mtime if self.cache_filepath.exists() else 0
)
return (
mtime
<= (
datetime.datetime.now(datetime.timezone.utc)
- datetime.timedelta(hours=1)
).timestamp()
)
def load(self) -> None:
self.cached_data = load_json(self.cache_filepath)
def save(self) -> None:
with open(self.cache_filepath, "w+") as f:
json.dump(
obj=self.cached_data, fp=f, ensure_ascii=False, sort_keys=True, indent=1
)
f.write("\n")
def get(self, key: Any, default: Any = None) -> Any:
return self.cached_data.get(key, default)
def set(self, key: Any, data: Any) -> None:
self.cached_data[key] = copy.deepcopy(data)
def __contains__(self, key):
return key in self.cached_data
class EthereumDefinitionsCachedDownloader:
"""Class that handles all the downloading and caching of Ethereum definitions."""
def __init__(self, refresh: bool | None = None) -> None:
force_refresh = refresh is True
disable_refresh = refresh is False
self.running_from_cache = False
self.cache = Cache(DEFINITIONS_CACHE_FILEPATH)
if disable_refresh or (not self.cache.is_expired() and not force_refresh):
logging.info("Loading cached Ethereum definitions data")
self.cache.load()
self.running_from_cache = True
else:
self._init_requests_session()
def save_cache(self):
if not self.running_from_cache:
self.cache.save()
def _download_json(self, url: str, **url_params: Any) -> Any:
params = None
encoded_params = None
key = url
# convert params to lower-case strings (especially for boolean values
# because for CoinGecko API "True" != "true")
if url_params:
params = {key: str(value).lower() for key, value in url_params.items()}
encoded_params = urlencode(sorted(params.items()))
key += "?" + encoded_params
if self.running_from_cache:
return self.cache.get(key)
logging.info(f"Fetching data from {url}")
r = self.session.get(url, params=encoded_params, timeout=60)
r.raise_for_status()
data = r.json()
self.cache.set(key, data)
return data
def _init_requests_session(self) -> requests.Session:
self.session = requests.Session()
retries = Retry(total=5, status_forcelist=[502, 503, 504])
self.session.mount("https://", HTTPAdapter(max_retries=retries))
def get_coingecko_asset_platforms(self) -> Any:
url = "https://api.coingecko.com/api/v3/asset_platforms"
return self._download_json(url)
def get_defillama_chains(self) -> Any:
url = "https://api.llama.fi/chains"
return self._download_json(url)
def get_coingecko_tokens_for_network(self, coingecko_network_id: str) -> Any:
url = f"https://tokens.coingecko.com/{coingecko_network_id}/all.json"
data = None
try:
data = self._download_json(url)
except requests.exceptions.HTTPError as err:
# "Forbidden" is raised by Coingecko if no tokens are available under specified id
if err.response.status_code != requests.codes.forbidden:
raise err
return [] if data is None else data.get("tokens", [])
def get_coingecko_coins_list(self) -> Any:
url = "https://api.coingecko.com/api/v3/coins/list"
return self._download_json(url, include_platform=True)
def get_coingecko_top100(self) -> Any:
url = "https://api.coingecko.com/api/v3/coins/markets"
return self._download_json(
url,
vs_currency="usd",
order="market_cap_desc",
per_page=100,
page=1,
sparkline=False,
)
def get_testnet_status(*strings: str | None) -> bool:
if strings is None:
return False
for r in ETHEREUM_TESTNETS_REGEXES:
for s in strings:
if re.search(r, s.lower(), re.IGNORECASE):
return True
return False
def _load_ethereum_networks_from_repo(repo_dir: pathlib.Path) -> list[dict]:
"""Load ethereum networks from submodule."""
chains_path = repo_dir / "_data" / "chains"
networks = []
for chain in sorted(
chains_path.glob("eip155-*.json"),
key=lambda x: int(x.stem.replace("eip155-", "")),
):
chain_data = load_json(chain)
shortcut = chain_data["nativeCurrency"]["symbol"]
name = chain_data["name"]
title = chain_data.get("title", "")
is_testnet = get_testnet_status(name, title)
if is_testnet:
slip44 = 1
else:
slip44 = chain_data.get("slip44", 60)
if is_testnet and not shortcut.lower().startswith("t"):
shortcut = "t" + shortcut
# strip out bullcrap in network naming
if "mainnet" in name.lower():
name = re.sub(r" mainnet.*$", "", name, flags=re.IGNORECASE)
networks.append(
dict(
chain=chain_data["shortName"],
chain_id=chain_data["chainId"],
is_testnet=is_testnet,
slip44=slip44,
shortcut=shortcut,
name=name,
)
)
return networks
def _create_cropped_token_dict(
complex_token: dict, chain_id: int, chain: str
) -> dict | None:
# simple validation
if complex_token["address"][:2] != "0x" or int(complex_token["decimals"]) < 0:
return None
try:
bytes.fromhex(complex_token["address"][2:])
except ValueError:
return None
return dict(
chain=chain,
chain_id=chain_id,
name=complex_token["name"],
decimals=complex_token["decimals"],
address=str(complex_token["address"]).lower(),
shortcut=complex_token["symbol"],
)
def _load_erc20_tokens_from_coingecko(
downloader: EthereumDefinitionsCachedDownloader, networks: list[dict]
) -> list[dict]:
tokens: list[dict] = []
for network in networks:
if (coingecko_id := network.get("coingecko_id")) is not None:
all_tokens = downloader.get_coingecko_tokens_for_network(coingecko_id)
for token in all_tokens:
t = _create_cropped_token_dict(
token, network["chain_id"], network["chain"]
)
if t is not None:
tokens.append(t)
return tokens
def _load_erc20_tokens_from_repo(
repo_dir: pathlib.Path, networks: list[dict]
) -> list[dict]:
"""Load ERC20 tokens from submodule."""
tokens: list[dict] = []
for network in networks:
chain = network["chain"]
chain_path = repo_dir / "tokens" / chain
for file in sorted(chain_path.glob("*.json")):
token: dict = load_json(file)
t = _create_cropped_token_dict(token, network["chain_id"], network["chain"])
if t is not None:
tokens.append(t)
return tokens
def _set_definition_metadata(
definition: dict,
old_definition: dict | None = None,
keys: str | None = None,
deleted: bool = False,
) -> None:
if "metadata" not in definition:
definition["metadata"] = {}
if deleted:
definition["metadata"]["deleted"] = CURRENT_TIMESTAMP_STR
else:
definition["metadata"].pop("deleted", None)
if old_definition is not None and keys is not None:
for key in keys:
definition["metadata"]["previous_" + key] = old_definition.get(key)
# if metadata are empty, delete them
if len(definition["metadata"]) == 0:
definition.pop("metadata", None)
def print_definition_change(
name: str,
status: str,
old: dict,
new: dict | None = None,
original: dict | None = None,
prompt: bool = False,
use_default: bool = True,
) -> bool | None:
"""Print changes made between definitions and ask for prompt if requested. Returns the prompt result if prompted otherwise None."""
old_deleted_status = get_definition_deleted_status(old)
old_deleted_status_wrapped = (
" (" + old_deleted_status + ")" if old_deleted_status else ""
)
title = f"{old_deleted_status + ' ' if old_deleted_status else ''}{name} PROBABLY {status}"
print(f"== {title} ==")
print(f"OLD{old_deleted_status_wrapped}:")
print(json.dumps(old, sort_keys=True, indent=None))
if new is not None:
print("NEW:")
print(json.dumps(new, sort_keys=True, indent=None))
if original is not None:
original_deleted_status_wrapped = (
" (" + get_definition_deleted_status(original) + ")"
if get_definition_deleted_status(original)
else ""
)
print(f"REPLACING{original_deleted_status_wrapped}:")
print(json.dumps(original, sort_keys=True, indent=None))
if prompt:
answer = click.prompt(
"Confirm change:",
type=click.Choice(["y", "n"]),
show_choices=True,
default="y" if use_default else None,
show_default=use_default,
)
return True if answer == "y" else False
return None
def print_definitions_collision(
name: str,
definitions: list[dict],
old_definitions: list[dict] | None = None,
) -> int | None:
"""Print colliding definitions and ask which one to keep if requested.
Returns an index of selected definition from the prompt result (if prompted) or the default value."""
if old_definitions:
old_defs_hash_no_metadata = [
hash_dict_on_keys(d, exclude_keys=["metadata", "coingecko_id"])
for d in old_definitions
]
default_index = None
print(f"== COLLISION BETWEEN {name}S ==")
for idx, definition in enumerate(definitions):
found = ""
if (
old_definitions
and hash_dict_on_keys(definition, exclude_keys=["metadata", "coingecko_id"])
in old_defs_hash_no_metadata
):
found = " (found in old definitions)"
default_index = idx
print(f"DEFINITION {idx}{found}:")
print(json.dumps(definition, sort_keys=True, indent=None))
answer = int(
click.prompt(
"Which definition do you want to keep? Please enter a valid integer",
type=click.Choice([str(n) for n in range(len(definitions))]),
show_choices=True,
default=str(default_index) if default_index is not None else None,
show_default=default_index is not None,
)
)
return answer
def get_definition_deleted_status(definition: dict) -> str:
return (
"PREVIOUSLY DELETED"
if definition.get("metadata", {}).get("deleted") is not None
else ""
)
def check_tokens_collisions(tokens: list[dict], old_tokens: list[dict] | None) -> None:
collisions: defaultdict = defaultdict(list)
for idx, nd in enumerate(tokens):
collisions[hash_dict_on_keys(nd, ["chain_id", "address"])].append(idx)
no_of_collisions = 0
for _, v in collisions.items():
if len(v) > 1:
no_of_collisions += 1
if no_of_collisions > 0:
logging.info(f"\nNumber of collisions: {no_of_collisions}")
# solve collisions
delete_indexes: list[int] = []
for _, v in collisions.items():
if len(v) > 1:
coliding_networks = [tokens[i] for i in v]
index = print_definitions_collision("TOKEN", coliding_networks, old_tokens)
logging.info(f"Keeping the definition with index {index}.")
v.pop(index)
delete_indexes.extend(v)
# delete collisions
delete_indexes.sort(reverse=True)
for idx in delete_indexes:
tokens.pop(idx)
def check_bytes_size(
actual_size: int, max_size: int, label: str, prompt: bool = True
) -> tuple[bool, bool]:
"""Check the actual size and return tuple - size check result and user response"""
if actual_size > max_size:
title = f"Bytes in {label} definition is too long ({actual_size} > {max_size})"
title += " and will be removed from the results" if not prompt else ""
print(f"== {title} ==")
if prompt:
answer = click.prompt(
"Do you want to remove this definition? If not, this whole process will stop:",
type=click.Choice(["y", "n"]),
show_choices=True,
default="y",
show_default=True,
)
return False, answer == "y"
else:
return False, True
return True, True
def check_string_size(
definition: dict, field_name: str, max_size: int, prompt: bool = True
) -> bool:
"""Check encoded size of a string from \"definition[field_name]\" and return result combined with user response."""
encoded_size = len(definition[field_name].encode())
if encoded_size > max_size - 1:
title = f'Size of encoded string field "{field_name}" is too long ({encoded_size} > {max_size - 1})'
title += " and will be shortened to fit in" if not prompt else ""
print(f"== {title} ==")
print(json.dumps(definition, sort_keys=True, indent=None))
if prompt:
answer = click.prompt(
"Do you want to shorten this string? If not, this whole definition will be removed from the results:",
type=click.Choice(["y", "n"]),
show_choices=True,
default="y",
show_default=True,
)
if answer == "n":
return False
definition[field_name] = definition[field_name][: max_size - 1]
return True
def check_networks_fields_sizes(networks: list[dict], interactive: bool) -> None:
"""Check sizes of embeded network fields for Trezor model 1 based on "legacy/firmware/protob/messages-ethereum.options"."""
# EthereumNetworkInfo.name max_size:256
# EthereumNetworkInfo.shortcut max_size:256
to_remove: list[int] = []
for idx, network in enumerate(networks):
if not check_string_size(
network, "name", 256, interactive
) or not check_string_size(network, "shortcut", 256, interactive):
to_remove.append(idx)
# delete networks with too big field sizes
to_remove.sort(reverse=True)
for idx in to_remove:
networks.pop(idx)
def check_tokens_fields_sizes(tokens: list[dict], interactive: bool) -> bool:
"""Check sizes of embeded token fields for Trezor model 1 based on "legacy/firmware/protob/messages-ethereum.options"."""
# EthereumTokenInfo.name max_size:256
# EthereumTokenInfo.symbol max_size:256 (here stored under "shortcut")
# EthereumTokenInfo.address max_size:20
to_remove: list[int] = []
invalid_address_size_found = False
for idx, token in enumerate(tokens):
check, action = check_bytes_size(
len(bytes.fromhex(token["address"][2:])),
20,
f"token {token['name']} (chain_id={token['chain_id']}, address={token['address']})",
interactive,
)
if not check:
if action:
to_remove.append(idx)
continue
else:
invalid_address_size_found = True
if not check_string_size(
token, "name", 256, interactive
) or not check_string_size(token, "shortcut", 256, interactive):
to_remove.append(idx)
# if we found invalid address size we cannot proceed further
if invalid_address_size_found:
return False
# delete tokens with too big field sizes
to_remove.sort(reverse=True)
for idx in to_remove:
tokens.pop(idx)
return True
def check_definitions_list(
old_defs: list[dict],
new_defs: list[dict],
main_keys: list[str],
def_name: str,
interactive: bool,
force: bool,
top100_coingecko_ids: list[str] | None = None,
) -> bool:
check_ok = True
# store already processed definitions
deleted_definitions: list[dict] = []
modified_definitions: list[dict] = []
moved_definitions: list[tuple] = []
resurrected_definitions: list[tuple] = []
# dicts of new definitions
defs_hash_no_metadata = {}
defs_hash_no_main_keys_and_metadata = {}
defs_hash_only_main_keys = {}
for nd in new_defs:
defs_hash_no_metadata[hash_dict_on_keys(nd, exclude_keys=["metadata"])] = nd
defs_hash_no_main_keys_and_metadata[
hash_dict_on_keys(nd, exclude_keys=main_keys + ["metadata"])
] = nd
defs_hash_only_main_keys[hash_dict_on_keys(nd, main_keys)] = nd
# dict of old definitions
old_defs_hash_only_main_keys = {
hash_dict_on_keys(d, main_keys): d for d in old_defs
}
# mark all resurrected, moved, modified or deleted definitions
for old_def in old_defs:
old_def_hash_only_main_keys = hash_dict_on_keys(old_def, main_keys)
old_def_hash_no_metadata = hash_dict_on_keys(old_def, exclude_keys=["metadata"])
old_def_hash_no_main_keys_and_metadata = hash_dict_on_keys(
old_def, exclude_keys=main_keys + ["metadata"]
)
was_deleted_status = get_definition_deleted_status(old_def)
if old_def_hash_no_metadata in defs_hash_no_metadata:
# same definition found, check if it was not marked as deleted before
if was_deleted_status:
resurrected_definitions.append(old_def)
else:
# definition is unchanged, copy its metadata
old_def_metadata = old_def.get("metadata")
if old_def_metadata:
defs_hash_no_metadata[old_def_hash_no_metadata][
"metadata"
] = old_def_metadata
elif (
old_def_hash_no_main_keys_and_metadata
in defs_hash_no_main_keys_and_metadata
):
# definition was moved
# check if there was something before on this "position"
new_def = defs_hash_no_main_keys_and_metadata[
old_def_hash_no_main_keys_and_metadata
]
new_def_hash_only_main_keys = hash_dict_on_keys(new_def, main_keys)
orig_def = old_defs_hash_only_main_keys.get(new_def_hash_only_main_keys)
# check if the move is valid - "old_def" is not marked as deleted
# and there was a change in the original definition
if (
orig_def is None
or not was_deleted_status
or hash_dict_on_keys(orig_def, exclude_keys=["metadata"])
!= hash_dict_on_keys(new_def, exclude_keys=["metadata"])
):
moved_definitions.append((old_def, new_def, orig_def))
else:
# invalid move - this was an old move so we have to check if anything else
# is coming to "old_def" position
if old_def_hash_only_main_keys in defs_hash_only_main_keys:
modified_definitions.append(
(old_def, defs_hash_only_main_keys[old_def_hash_only_main_keys])
)
else:
# no - so just maintain the "old_def"
new_defs.append(old_def)
elif old_def_hash_only_main_keys in defs_hash_only_main_keys:
# definition was modified
modified_definitions.append(
(old_def, defs_hash_only_main_keys[old_def_hash_only_main_keys])
)
else:
# definition was not found - was it deleted before or just now?
if not was_deleted_status:
# definition was deleted now
deleted_definitions.append(old_def)
else:
# no confirmation needed
new_defs.append(old_def)
# try to pair moved and modified definitions
for old_def, new_def, orig_def in moved_definitions:
# check if there is modified definition, that was modified to "new_def"
# if yes it was because this "old_def" was moved to "orig_def" position
if (orig_def, new_def) in modified_definitions:
modified_definitions.remove((orig_def, new_def))
def any_in_top_100(*definitions) -> bool:
if top100_coingecko_ids is None:
return True
if definitions is not None:
for d in definitions:
if d is not None and d.get("coingecko_id") in top100_coingecko_ids:
return True
return False
# go through changes and ask for confirmation
for old_def, new_def, orig_def in moved_definitions:
accept_change = True
print_change = any_in_top_100(old_def, new_def, orig_def)
# if the change contains symbol change "--force" parameter must be used to be able to accept this change
if (
orig_def is not None
and orig_def.get("shortcut") != new_def.get("shortcut")
and not force
):
logging.error(
"\nERROR: Symbol change in this definition! To be able to approve this change re-run with `--force` argument."
)
accept_change = check_ok = False
print_change = True
answer = (
print_definition_change(
def_name.upper(),
"MOVED",
old_def,
new_def,
orig_def,
prompt=interactive and accept_change,
)
if print_change
else None
)
if answer is False or answer is None and not accept_change:
# revert change - replace "new_def" with "old_def" and "orig_def"
new_defs.remove(new_def)
new_defs.append(old_def)
new_defs.append(orig_def)
else:
_set_definition_metadata(new_def, old_def, main_keys)
# if position of the "old_def" will remain empty leave on its former position a "mark"
# that it has been deleted
old_def_remains_empty = True
for _, nd, _ in moved_definitions:
if hash_dict_on_keys(old_def, main_keys) == hash_dict_on_keys(
nd, main_keys
):
old_def_remains_empty = False
if old_def_remains_empty:
_set_definition_metadata(old_def, deleted=True)
new_defs.append(old_def)
for old_def, new_def in modified_definitions:
accept_change = True
print_change = any_in_top_100(old_def, new_def)
# if the change contains symbol change "--force" parameter must be used to be able to accept this change
if old_def.get("shortcut") != new_def.get("shortcut") and not force:
logging.error(
"\nERROR: Symbol change in this definition! To be able to approve this change re-run with `--force` argument."
)
accept_change = check_ok = False
print_change = True
answer = (
print_definition_change(
def_name.upper(),
"MODIFIED",
old_def,
new_def,
prompt=interactive and accept_change,
)
if print_change
else None
)
if answer is False or answer is None and not accept_change:
# revert change - replace "new_def" with "old_def"
new_defs.remove(new_def)
new_defs.append(old_def)
for definition in deleted_definitions:
if (
any_in_top_100(definition)
and print_definition_change(
def_name.upper(), "DELETED", definition, prompt=interactive
)
is False
):
# revert change - add back the deleted definition
new_defs.append(definition)
else:
_set_definition_metadata(definition, deleted=True)
new_defs.append(definition)
for definition in resurrected_definitions:
if (
any_in_top_100(definition)
and print_definition_change(
def_name.upper(), "RESURRECTED", definition, prompt=interactive
)
is not False
):
# clear deleted mark
_set_definition_metadata(definition)
return check_ok
def _load_prepared_definitions(
definitions_file: pathlib.Path,
) -> tuple[datetime.datetime, list[dict], list[dict]]:
if not definitions_file.is_file():
click.ClickException(
f"File {definitions_file} with prepared definitions does not exists or is not a file."
)
prepared_definitions_data = load_json(definitions_file)
try:
timestamp = datetime.datetime.strptime(
prepared_definitions_data["timestamp"], TIMESTAMP_FORMAT
)
networks_data = prepared_definitions_data["networks"]
tokens_data = prepared_definitions_data["tokens"]
except KeyError:
click.ClickException(
'File with prepared definitions is not complete. Whole "networks" and/or "tokens" section are missing.'
)
networks: Coins = []
for network_data in networks_data:
network_data.update(
chain_id=network_data["chain_id"],
key=f"eth:{network_data['shortcut']}",
)
networks.append(cast(Coin, network_data))
tokens: Coins = []
for token in tokens_data:
token.update(
chain_id=token["chain_id"],
address=token["address"].lower(),
address_bytes=bytes.fromhex(token["address"][2:]),
symbol=token["shortcut"],
key=f"erc20:{token['chain']}:{token['shortcut']}",
)
tokens.append(cast(Coin, token))
return timestamp, networks, tokens
def load_raw_builtin_ethereum_networks() -> list[dict]:
"""Load ethereum networks from `ethereum/networks.json`"""
return load_json("ethereum", "networks.json")
def load_raw_builtin_erc20_tokens() -> list[dict]:
"""Load ERC20 tokens from `ethereum/tokens.json`."""
tokens_data = load_json("ethereum", "tokens.json")
all_tokens: list[dict] = []
for chain_id_and_chain, tokens in tokens_data.items():
chain_id, chain = chain_id_and_chain.split(";", maxsplit=1)
for token in tokens:
token.update(
chain=chain,
chain_id=int(chain_id),
)
all_tokens.append(token)
return all_tokens
def check_builtin_defs(networks: list[dict], tokens: list[dict]) -> bool:
check_ok = True
builtin_networks_hashes_to_dict = {
hash_dict_on_keys(
cast(dict, network), exclude_keys=["metadata", "coingecko_id"]
): cast(dict, network)
for network in load_raw_builtin_ethereum_networks()
}
builtin_tokens_hashes_to_dict = {
hash_dict_on_keys(
cast(dict, token), exclude_keys=["metadata", "coingecko_id"]
): cast(dict, token)
for token in load_raw_builtin_erc20_tokens()
}
networks_hashes = [
hash_dict_on_keys(network, exclude_keys=["metadata", "coingecko_id"])
for network in networks
]
tokens_hashes = [
hash_dict_on_keys(token, exclude_keys=["metadata", "coingecko_id"])
for token in tokens
]
for hash, definition in builtin_networks_hashes_to_dict.items():
if hash not in networks_hashes:
check_ok = False
print("== BUILT-IN NETWORK DEFINITION OUTDATED ==")
print(json.dumps(definition, sort_keys=True, indent=None))
for hash, definition in builtin_tokens_hashes_to_dict.items():
if hash not in tokens_hashes:
check_ok = False
print("== BUILT-IN TOKEN DEFINITION OUTDATED ==")
print(json.dumps(definition, sort_keys=True, indent=None))
return check_ok
# ====== definitions tools ======
def eth_info_from_dict(
coin: Coin, msg_type: EthereumNetworkInfo | EthereumTokenInfo
) -> EthereumNetworkInfo | EthereumTokenInfo:
attributes: dict[str, Any] = {}
for field in msg_type.FIELDS.values():
val = coin.get(field.name)
if field.name in ("chain_id", "slip44"):
attributes[field.name] = int(val)
elif msg_type == EthereumTokenInfo and field.name == "address":
attributes[field.name] = coin.get("address_bytes")
else:
attributes[field.name] = val
proto = msg_type(**attributes)
return proto
def serialize_eth_info(
info: EthereumNetworkInfo | EthereumTokenInfo,
data_type_num: EthereumDefinitionType,
timestamp: datetime.datetime,
) -> bytes:
ser = FORMAT_VERSION_BYTES
ser += data_type_num.to_bytes(1, "big")
ser += int(timestamp.timestamp()).to_bytes(4, "big")
buf = io.BytesIO()
protobuf.dump_message(buf, info)
msg = buf.getvalue()
# write the length of encoded protobuf message
ser += len(msg).to_bytes(2, "big")
ser += msg
return ser
def get_timestamp_from_definition(definition: bytes) -> int:
return int.from_bytes(definition[6:10], "big")
# ====== click command handlers ======
@click.group()
def cli() -> None:
"""Script for handling Ethereum definitions (networks and tokens)."""
@cli.command()
@click.option(
"-r/-R",
"--refresh/--no-refresh",
default=None,
help="Force refresh or no-refresh data. By default tries to load cached data.",
)
@click.option(
"-i",
"--interactive",
is_flag=True,
help="Ask about every change. Without this option script will automatically accept all changes to the definitions "
"(except those in symbols, see `--force` option).",
)
@click.option(
"-f",
"--force",
is_flag=True,
help="Changes to symbols in definitions could be accepted.",
)
@click.option(
"-s",
"--show-all",
is_flag=True,
help="Show the differences of all definitions. By default only changes to top 100 definitions (by Coingecko market cap ranking) are shown.",
)
@click.option(
"-d",
"--deffile",
type=click.Path(resolve_path=True, dir_okay=False, path_type=pathlib.Path),
default="./definitions-latest.json",
help="File where the definitions will be saved in json format. If file already exists, it is used to check "
'the changes in definitions. Default is "./definitions-latest.json".',
)
@click.option(
"-n",
"--networks-dir",
type=click.Path(
exists=True, file_okay=False, resolve_path=True, path_type=pathlib.Path
),
default=DEFS_DIR / "ethereum" / "chains",
help="Directory pointing at cloned networks definition repo (https://github.com/ethereum-lists/chains). "
"Defaults to `$(DEFS_DIR)/ethereum/chains` if env variable `DEFS_DIR` is set, otherwise to "
'`"this script location"/../defs/ethereum/chains`',
)
@click.option(
"-t",
"--tokens-dir",
type=click.Path(
exists=True, file_okay=False, resolve_path=True, path_type=pathlib.Path
),
default=DEFS_DIR / "ethereum" / "tokens",
help="Directory pointing at cloned networks definition repo (https://github.com/ethereum-lists/tokens). "
"Defaults to `$(DEFS_DIR)/ethereum/tokens` if env variable `DEFS_DIR` is set, otherwise to "
'`"this script location"/../defs/ethereum/tokens`',
)
@click.option("-v", "--verbose", is_flag=True, help="Display more info")
def prepare_definitions(
refresh: bool | None,
interactive: bool,
force: bool,
show_all: bool,
deffile: pathlib.Path,
networks_dir: pathlib.Path,
tokens_dir: pathlib.Path,
verbose: bool,
) -> None:
"""Prepare Ethereum definitions."""
setup_logging(verbose)
# init Ethereum definitions downloader
downloader = EthereumDefinitionsCachedDownloader(refresh)
networks = _load_ethereum_networks_from_repo(networks_dir)
# coingecko API
cg_platforms = downloader.get_coingecko_asset_platforms()
cg_platforms_by_chain_id: dict[int, Any] = {}
for chain in cg_platforms:
# We want only informations about chains, that have both chain id and coingecko id,
# otherwise we could not link local and coingecko networks.
if chain["chain_identifier"] is not None and chain["id"] is not None:
cg_platforms_by_chain_id[chain["chain_identifier"]] = chain["id"]
# defillama API
dl_chains = downloader.get_defillama_chains()
dl_chains_by_chain_id: dict[int, Any] = {}
for chain in dl_chains:
# We want only informations about chains, that have both chain id and coingecko id,
# otherwise we could not link local and coingecko networks.
if chain["chainId"] is not None and chain["gecko_id"] is not None:
dl_chains_by_chain_id[chain["chainId"]] = chain["gecko_id"]
# We will try to get as many "coingecko_id"s as possible to be able to use them afterwards
# to load tokens from coingecko. We won't use coingecko networks, because we don't know which
# ones are EVM based.
coingecko_id_to_chain_id = {}
for network in networks:
if network.get("coingecko_id") is None:
# first try to assign coingecko_id to local networks from coingecko via chain_id
if network["chain_id"] in cg_platforms_by_chain_id:
network["coingecko_id"] = cg_platforms_by_chain_id[network["chain_id"]]
# or try to assign coingecko_id to local networks from defillama via chain_id
elif network["chain_id"] in dl_chains_by_chain_id:
network["coingecko_id"] = dl_chains_by_chain_id[network["chain_id"]]
# if we found "coingecko_id" add it to the map - used later to map tokens with coingecko ids
if network.get("coingecko_id") is not None:
coingecko_id_to_chain_id[network["coingecko_id"]] = network["chain_id"]
# get tokens
cg_tokens = _load_erc20_tokens_from_coingecko(downloader, networks)
repo_tokens = _load_erc20_tokens_from_repo(tokens_dir, networks)
# get data used in further processing now to be able to save cache before we do any
# token collision process and others
# get CoinGecko coin list
cg_coin_list = downloader.get_coingecko_coins_list()
# get top 100 coins
cg_top100 = downloader.get_coingecko_top100()
# save cache
downloader.save_cache()
# merge tokens
tokens: list[dict] = []
cg_tokens_chain_id_and_address = []
for t in cg_tokens:
if t not in tokens:
# add only unique tokens
tokens.append(t)
cg_tokens_chain_id_and_address.append((t["chain_id"], t["address"]))
for t in repo_tokens:
if (
t not in tokens
and (t["chain_id"], t["address"]) not in cg_tokens_chain_id_and_address
):
# add only unique tokens and prefer CoinGecko in case of collision of chain id and token address
tokens.append(t)
old_defs = None
if deffile.exists():
# load old definitions
old_defs = load_json(deffile)
# check field sizes here - shortened strings can introduce new collisions
check_networks_fields_sizes(networks, interactive)
if not check_tokens_fields_sizes(tokens, interactive):
return
check_tokens_collisions(
tokens, old_defs["tokens"] if old_defs is not None else None
)
# map coingecko ids to tokens
tokens_by_chain_id_and_address = {(t["chain_id"], t["address"]): t for t in tokens}
for coin in cg_coin_list:
for platform_name, address in coin.get("platforms", {}).items():
key = (coingecko_id_to_chain_id.get(platform_name), address)
if key in tokens_by_chain_id_and_address:
tokens_by_chain_id_and_address[key]["coingecko_id"] = coin["id"]
# get top 100 ids
cg_top100_ids = [d["id"] for d in cg_top100]
# check changes in definitions
save_results = True
if old_defs is not None:
# check networks and tokens
save_results &= check_definitions_list(
old_defs["networks"],
networks,
["chain_id"],
"network",
interactive,
force,
cg_top100_ids if not show_all else None,
)
save_results &= check_definitions_list(
old_defs["tokens"],
tokens,
["chain_id", "address"],
"token",
interactive,
force,
cg_top100_ids if not show_all else None,
)
if save_results:
# check built-in definitions against generated ones
if not check_builtin_defs(networks, tokens):
logging.warning("Built-in definitions differ from the generated ones.")
# sort networks and tokens
networks.sort(key=lambda x: x["chain_id"])
tokens.sort(key=lambda x: (x["chain_id"], x["address"]))
# save results
with open(deffile, "w") as f:
json.dump(
obj=dict(
timestamp=CURRENT_TIMESTAMP_STR, networks=networks, tokens=tokens
),
fp=f,
ensure_ascii=False,
sort_keys=True,
indent=1,
)
f.write("\n")
else:
logging.error("Error occured - results not saved.")
@cli.command()
@click.option(
"-d",
"--deffile",
type=click.Path(resolve_path=True, dir_okay=False, path_type=pathlib.Path),
default="./definitions-latest.json",
help='File where the prepared definitions are saved in json format. Defaults to "./definitions-latest.json".',
)
@click.option(
"-o",
"--outfile",
type=click.Path(
resolve_path=True, dir_okay=False, writable=True, path_type=pathlib.Path
),
default="./definitions-latest.zip",
help='File where the generated definitions will be saved in zip format. Any existing file will be overwritten! Defaults to "./definitions-latest.zip".',
)
@click.option(
"-k",
"--publickey",
type=click.File(mode="r"),
help="File with public key (text, hex formated) used to check the signed Merkle tree root hash. Must be used with `--signedroot` option.",
)
@click.option(
"-s",
"--signedroot",
help="Signed Merkle tree root hash to be added (text, hex formated).",
)
@click.option("-v", "--verbose", is_flag=True, help="Display more info.")
@click.option(
"-p",
"--include-proof",
is_flag=True,
help="Include Merkle tree proofs into binary blobs.",
)
def sign_definitions(
deffile: pathlib.Path,
outfile: pathlib.Path,
publickey: TextIO,
signedroot: str,
verbose: bool,
include_proof: bool,
) -> None:
"""Generate signed Ethereum definitions for python-trezor and others.
If ran without `--publickey` and/or `--signedroot` it prints the computed Merkle tree root hash.
If ran with `--publickey` and `--signedroot` it checks the signed root with generated one and saves the definitions.
"""
setup_logging(verbose)
if (publickey is None) != (signedroot is None):
raise click.ClickException(
"Options `--publickey` and `--signedroot` must be used together."
)
# load prepared definitions
timestamp, networks, tokens = _load_prepared_definitions(deffile)
# serialize definitions
for network in networks:
ser = serialize_eth_info(
eth_info_from_dict(network, EthereumNetworkInfo),
EthereumDefinitionType.NETWORK,
timestamp,
)
network["serialized"] = ser
for token in tokens:
ser = serialize_eth_info(
eth_info_from_dict(token, EthereumTokenInfo),
EthereumDefinitionType.TOKEN,
timestamp,
)
token["serialized"] = ser
# sort encoded definitions
sorted_defs = [network["serialized"] for network in networks] + [
token["serialized"] for token in tokens
]
sorted_defs.sort()
# build Merkle tree
mt = MerkleTree(sorted_defs)
# print or check tree root hash
if publickey is None:
print(f"Merkle tree root hash: {hexlify(mt.get_root_hash())}")
return
verify_key = ed25519.VerifyingKey(
ed25519.from_ascii(publickey.readline(), encoding="hex")
)
try:
verify_key.verify(signedroot, mt.get_root_hash(), encoding="hex")
except ed25519.BadSignatureError:
raise click.ClickException(
f"Provided `--signedroot` value is not valid for computed Merkle tree root hash ({hexlify(mt.get_root_hash())})."
)
def save_definition(
path: pathlib.PurePath, keys: list[str], data: bytes, zip_file: zipfile.ZipFile
):
complete_path = path / ("_".join(keys) + ".dat")
try:
if zip_file.getinfo(str(complete_path)):
logging.warning(
f'Definition "{complete_path}" already generated - attempt to generate another definition.'
)
except KeyError:
pass
zip_file.writestr(str(complete_path), data)
def generate_token_def(
token: Coin, base_path: pathlib.PurePath, zip_file: zipfile.ZipFile
):
if token["address"] is not None and token["chain_id"] is not None:
# save token definition
save_definition(
base_path / "by_chain_id" / str(token["chain_id"]),
["token", token["address"][2:].lower()],
token["serialized"],
zip_file,
)
def generate_network_def(
network: Coin, base_path: pathlib.PurePath, zip_file: zipfile.ZipFile
):
if network["chain_id"] is None:
return
# create path for networks identified by chain and slip44 ids
network_dir = base_path / "by_chain_id" / str(network["chain_id"])
slip44_dir = base_path / "by_slip44" / str(network["slip44"])
# save network definition
save_definition(network_dir, ["network"], network["serialized"], zip_file)
# for slip44 == 60 save only Ethereum and for slip44 == 1 save only Goerli
if network["slip44"] not in (60, 1) or network["chain_id"] in (1, 420):
save_definition(slip44_dir, ["network"], network["serialized"], zip_file)
def add_proof_to_def(definition: dict) -> None:
proof = proofs_dict[definition["serialized"]]
# append number of hashes in proof
definition["serialized"] += len(proof).to_bytes(1, "big")
# append proof itself
for p in proof:
definition["serialized"] += p
# add proofs (if requested) and signed tree root hash, check serialized size of the definitions and add it to a zip
signed_root_bytes = bytes.fromhex(signedroot)
# update definitions
proofs_dict = mt.get_proofs()
base_path = pathlib.PurePath("definitions-latest")
with zipfile.ZipFile(outfile, mode="w") as zip_file:
for network in networks:
if include_proof:
add_proof_to_def(network)
# append signed tree root hash
network["serialized"] += signed_root_bytes
network_serialized_length = len(network["serialized"])
if not include_proof:
# consider size of the proofs that will be added later by user before sending to the device
network_serialized_length += mt.get_tree_height() * 32
check, _ = check_bytes_size(
network_serialized_length,
1024,
f"network {network['name']} (chain_id={network['chain_id']})",
False,
)
if check:
generate_network_def(network, base_path, zip_file)
for token in tokens:
if include_proof:
add_proof_to_def(token)
# append signed tree root hash
token["serialized"] += signed_root_bytes
token_serialized_length = len(token["serialized"])
if not include_proof:
# consider size of the proofs that will be added later by user before sending to the device
token_serialized_length += mt.get_tree_height() * 32
check, _ = check_bytes_size(
token_serialized_length,
1024,
f"token {token['name']} (chain_id={token['chain_id']}, address={token['address']})",
False,
)
if check:
generate_token_def(token, base_path, zip_file)
@cli.command()
@click.option(
"-t",
"--timestamp",
type=int,
help="Unix timestamp to use.",
)
@click.option("-v", "--verbose", is_flag=True, help="Display more info.")
def update_timestamp(
timestamp: int,
verbose: bool,
) -> None:
"""Updates the latest definitions timestamp stored in `DEFS_DIR/ethereum/latest_definitions_timestamp.txt`
to the entered one or to the one, that can be obtained from parsing an online available definitions.
This timestamp is then injected via "mako" files into FW code.
"""
setup_logging(verbose)
if timestamp is None:
zip_bytes = ethereum.download_from_url(
ethereum.DEFS_BASE_URL + ethereum.DEFS_ZIP_FILENAME
)
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
timestamp = get_timestamp_from_definition(zf.read(zf.namelist().pop()))
with open(LATEST_DEFINITIONS_TIMESTAMP_FILEPATH, "w") as f:
logging.info(
f"Setting the timestamp to '{timestamp}' ('{datetime.datetime.fromtimestamp(timestamp)}')."
)
f.write(str(timestamp) + "\n")
if __name__ == "__main__":
cli()