mirror of
https://github.com/trezor/trezor-firmware.git
synced 2025-07-07 15:18:08 +00:00
993 lines
34 KiB
Python
Executable File
993 lines
34 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import copy
|
|
import datetime
|
|
import io
|
|
import json
|
|
import os
|
|
import pathlib
|
|
import re
|
|
import shutil
|
|
from collections import defaultdict
|
|
from typing import Any, Dict, List, TextIO, Tuple
|
|
|
|
import click
|
|
import ed25519
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
from coin_info import Coin, CoinBuckets, Coins, coin_info, load_json
|
|
from trezorlib import protobuf
|
|
from trezorlib.messages import (
|
|
EthereumDefinitionType,
|
|
EthereumNetworkInfo,
|
|
EthereumTokenInfo,
|
|
)
|
|
|
|
FORMAT_VERSION = "trzd1"
|
|
FORMAT_VERSION_BYTES = FORMAT_VERSION.encode("utf-8").ljust(8, b"\0")
|
|
ACTUAL_TIME = datetime.datetime.now(datetime.timezone.utc)
|
|
ACTUAL_TIMESTAMP_STR = ACTUAL_TIME.strftime("%d.%m.%Y %X%z")
|
|
DATA_VERSION_BYTES = int(ACTUAL_TIME.timestamp()).to_bytes(4, "big")
|
|
|
|
|
|
if os.environ.get("DEFS_DIR"):
|
|
DEFS_DIR = pathlib.Path(os.environ.get("DEFS_DIR")).resolve()
|
|
else:
|
|
DEFS_DIR = pathlib.Path(__file__).resolve().parent.parent / "defs"
|
|
|
|
DEFINITIONS_CACHE_FILEPATH = pathlib.Path().absolute() / "definitions-cache.json"
|
|
|
|
# ====== utils ======
|
|
|
|
|
|
def hash_dict_on_keys(
|
|
d: Dict,
|
|
include_keys: List[str] | None = None,
|
|
exclude_keys: List[str] | None = None,
|
|
) -> int:
|
|
"""Get the hash of a dict on selected keys.
|
|
Options `include_keys` and `exclude_keys` are exclusive."""
|
|
if include_keys is not None and exclude_keys is not None:
|
|
raise TypeError("Options `include_keys` and `exclude_keys` are exclusive")
|
|
|
|
tmp_dict = dict()
|
|
for k, v in d.items():
|
|
if include_keys is not None and k in include_keys:
|
|
tmp_dict[k] = v
|
|
elif exclude_keys is not None and k not in exclude_keys:
|
|
tmp_dict[k] = v
|
|
elif include_keys is None and exclude_keys is None:
|
|
tmp_dict[k] = v
|
|
|
|
return hash(json.dumps(tmp_dict, sort_keys=True))
|
|
|
|
|
|
class Cache:
|
|
"""Generic cache object that caches to json."""
|
|
|
|
def __init__(self, cache_filepath: pathlib.Path) -> None:
|
|
self.cache_filepath = cache_filepath
|
|
self.cached_data: Any = dict()
|
|
|
|
def is_expired(self) -> bool:
|
|
mtime = (
|
|
self.cache_filepath.stat().st_mtime if self.cache_filepath.is_file() else 0
|
|
)
|
|
return mtime <= (ACTUAL_TIME - datetime.timedelta(hours=1)).timestamp()
|
|
|
|
def load(self) -> None:
|
|
self.cached_data = load_json(self.cache_filepath)
|
|
|
|
def save(self) -> None:
|
|
with open(self.cache_filepath, "w+") as f:
|
|
json.dump(
|
|
obj=self.cached_data, fp=f, ensure_ascii=False, sort_keys=True, indent=1
|
|
)
|
|
f.write("\n")
|
|
|
|
def get(self, key: Any, default: Any = None) -> Any:
|
|
return self.cached_data.get(key, default)
|
|
|
|
def set(self, key: Any, data: Any) -> None:
|
|
self.cached_data[key] = copy.deepcopy(data)
|
|
|
|
def __contains__(self, key):
|
|
return key in self.cached_data
|
|
|
|
|
|
class EthereumDefinitionsCachedDownloader:
|
|
"""Class that handles all the downloading and caching of Ethereum definitions."""
|
|
|
|
def __init__(self, refresh: bool | None = None) -> None:
|
|
force_refresh = refresh is True
|
|
disable_refresh = refresh is False
|
|
self.use_cache = False
|
|
self.cache = Cache(DEFINITIONS_CACHE_FILEPATH)
|
|
|
|
if disable_refresh or (not self.cache.is_expired() and not force_refresh):
|
|
print("Loading cached Ethereum definitions data")
|
|
self.cache.load()
|
|
self.use_cache = True
|
|
else:
|
|
self._init_requests_session()
|
|
|
|
def save_cache(self):
|
|
if not self.use_cache:
|
|
self.cache.save()
|
|
|
|
def _download_as_json_from_url(
|
|
self, url: str, url_params: dict[str, Any] | None = None
|
|
) -> Any:
|
|
# convert params to strings
|
|
params = dict()
|
|
if url_params:
|
|
params = {key: str(value).lower() for key, value in url_params.items()}
|
|
|
|
key = url + str(params)
|
|
if self.use_cache:
|
|
return self.cache.get(key)
|
|
|
|
print(f"Fetching data from {url}")
|
|
|
|
r = self.session.get(url, params=params, timeout=60)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
self.cache.set(key, data)
|
|
return data
|
|
|
|
def _init_requests_session(self) -> requests.Session:
|
|
self.session = requests.Session()
|
|
retries = Retry(total=5, status_forcelist=[502, 503, 504])
|
|
self.session.mount("https://", HTTPAdapter(max_retries=retries))
|
|
|
|
def get_coingecko_asset_platforms(self) -> Any:
|
|
url = "https://api.coingecko.com/api/v3/asset_platforms"
|
|
return self._download_as_json_from_url(url)
|
|
|
|
def get_defillama_chains(self) -> Any:
|
|
url = "https://api.llama.fi/chains"
|
|
return self._download_as_json_from_url(url)
|
|
|
|
def get_coingecko_tokens_for_network(self, coingecko_network_id: str) -> Any:
|
|
url = f"https://tokens.coingecko.com/{coingecko_network_id}/all.json"
|
|
data = None
|
|
try:
|
|
data = self._download_as_json_from_url(url)
|
|
except requests.exceptions.HTTPError as err:
|
|
# "Forbidden" is raised by Coingecko if no tokens are available under specified id
|
|
if err.response.status_code != requests.codes.forbidden:
|
|
raise err
|
|
|
|
return [] if data is None else data.get("tokens", [])
|
|
|
|
def get_coingecko_coins_list(self) -> Any:
|
|
url = "https://api.coingecko.com/api/v3/coins/list"
|
|
return self._download_as_json_from_url(url, {"include_platform": "true"})
|
|
|
|
def get_coingecko_top100(self) -> Any:
|
|
url = "https://api.coingecko.com/api/v3/coins/markets"
|
|
return self._download_as_json_from_url(
|
|
url,
|
|
{
|
|
"vs_currency": "usd",
|
|
"order": "market_cap_desc",
|
|
"per_page": 100,
|
|
"page": 1,
|
|
"sparkline": "false",
|
|
},
|
|
)
|
|
|
|
|
|
def _load_ethereum_networks_from_repo(repo_dir: pathlib.Path) -> List[Dict]:
|
|
"""Load ethereum networks from submodule."""
|
|
chains_path = repo_dir / "_data" / "chains"
|
|
networks = []
|
|
for chain in sorted(
|
|
chains_path.glob("eip155-*.json"),
|
|
key=lambda x: int(x.stem.replace("eip155-", "")),
|
|
):
|
|
chain_data = load_json(chain)
|
|
shortcut = chain_data["nativeCurrency"]["symbol"]
|
|
name = chain_data["name"]
|
|
title = chain_data.get("title", "")
|
|
is_testnet = "testnet" in name.lower() or "testnet" in title.lower()
|
|
if is_testnet:
|
|
slip44 = 1
|
|
else:
|
|
slip44 = chain_data.get("slip44", 60)
|
|
|
|
if is_testnet and not shortcut.lower().startswith("t"):
|
|
shortcut = "t" + shortcut
|
|
|
|
rskip60 = shortcut in ("RBTC", "TRBTC")
|
|
|
|
# strip out bullcrap in network naming
|
|
if "mainnet" in name.lower():
|
|
name = re.sub(r" mainnet.*$", "", name, flags=re.IGNORECASE)
|
|
|
|
networks.append(
|
|
dict(
|
|
chain=chain_data["shortName"],
|
|
chain_id=chain_data["chainId"],
|
|
slip44=slip44,
|
|
shortcut=shortcut,
|
|
name=name,
|
|
rskip60=rskip60,
|
|
url=chain_data["infoURL"],
|
|
)
|
|
)
|
|
|
|
return networks
|
|
|
|
|
|
def _create_cropped_token_dict(
|
|
complex_token: dict, chain_id: str, chain: str
|
|
) -> dict | None:
|
|
# simple validation
|
|
if complex_token["address"][:2] != "0x" or int(complex_token["decimals"]) < 0:
|
|
return None
|
|
try:
|
|
bytes.fromhex(complex_token["address"][2:])
|
|
except ValueError:
|
|
return None
|
|
|
|
return dict(
|
|
chain=chain,
|
|
chain_id=chain_id,
|
|
name=complex_token["name"],
|
|
decimals=complex_token["decimals"],
|
|
address=str(complex_token["address"]).lower(),
|
|
shortcut=complex_token["symbol"],
|
|
)
|
|
|
|
|
|
def _load_erc20_tokens_from_coingecko(
|
|
downloader: EthereumDefinitionsCachedDownloader, networks: List[Dict]
|
|
) -> List[Dict]:
|
|
tokens: List[Dict] = []
|
|
for network in networks:
|
|
if network.get("coingecko_id") is not None:
|
|
all_tokens = downloader.get_coingecko_tokens_for_network(
|
|
network.get("coingecko_id")
|
|
)
|
|
|
|
for token in all_tokens:
|
|
t = _create_cropped_token_dict(
|
|
token, network["chain_id"], network["chain"]
|
|
)
|
|
if t is not None:
|
|
tokens.append(t)
|
|
|
|
return tokens
|
|
|
|
|
|
def _load_erc20_tokens_from_repo(
|
|
repo_dir: pathlib.Path, networks: List[Dict]
|
|
) -> List[Dict]:
|
|
"""Load ERC20 tokens from submodule."""
|
|
tokens: List[Dict] = []
|
|
for network in networks:
|
|
chain = network["chain"]
|
|
|
|
chain_path = repo_dir / "tokens" / chain
|
|
for file in sorted(chain_path.glob("*.json")):
|
|
token: dict = load_json(file)
|
|
t = _create_cropped_token_dict(token, network["chain_id"], network["chain"])
|
|
if t is not None:
|
|
tokens.append(t)
|
|
|
|
return tokens
|
|
|
|
|
|
def _set_definition_metadata(
|
|
definition: Dict,
|
|
old_definition: Dict | None = None,
|
|
keys: str | None = None,
|
|
deleted: bool = False,
|
|
) -> None:
|
|
if "metadata" not in definition:
|
|
definition["metadata"] = dict()
|
|
|
|
if deleted:
|
|
definition["metadata"]["deleted"] = ACTUAL_TIMESTAMP_STR
|
|
else:
|
|
definition["metadata"].pop("deleted", None)
|
|
|
|
if old_definition is not None and keys is not None:
|
|
for key in keys:
|
|
definition["metadata"]["previous_" + key] = old_definition.get(key)
|
|
|
|
# if metadata are empty, delete them
|
|
if len(definition["metadata"]) == 0:
|
|
definition.pop("metadata", None)
|
|
|
|
|
|
def print_definition_change(
|
|
name: str,
|
|
status: str,
|
|
old: Dict,
|
|
new: Dict | None = None,
|
|
original: Dict | None = None,
|
|
prompt: bool = False,
|
|
use_default: bool = True,
|
|
) -> bool | None:
|
|
"""Print changes made between definitions and ask for prompt if requested. Returns the prompt result if prompted otherwise None."""
|
|
old_deleted_status = get_definition_deleted_status(old)
|
|
old_deleted_status_wrapped = (
|
|
" (" + old_deleted_status + ")" if old_deleted_status else ""
|
|
)
|
|
|
|
title = f"{old_deleted_status + ' ' if old_deleted_status else ''}{name} PROBABLY {status}"
|
|
print(f"== {title} ==")
|
|
print(f"OLD{old_deleted_status_wrapped}:")
|
|
print(json.dumps(old, sort_keys=True, indent=None))
|
|
if new is not None:
|
|
print("NEW:")
|
|
print(json.dumps(new, sort_keys=True, indent=None))
|
|
if original is not None:
|
|
original_deleted_status_wrapped = (
|
|
" (" + get_definition_deleted_status(original) + ")"
|
|
if get_definition_deleted_status(original)
|
|
else ""
|
|
)
|
|
print(f"REPLACING{original_deleted_status_wrapped}:")
|
|
print(json.dumps(original, sort_keys=True, indent=None))
|
|
|
|
if prompt:
|
|
answer = click.prompt(
|
|
"Confirm change:",
|
|
type=click.Choice(["y", "n"]),
|
|
show_choices=True,
|
|
default="y" if use_default else None,
|
|
show_default=use_default,
|
|
)
|
|
return True if answer == "y" else False
|
|
return None
|
|
|
|
|
|
def print_definitions_collision(
|
|
name: str,
|
|
definitions: List[Dict],
|
|
old_definitions: List[Dict] | None = None,
|
|
prompt: bool = True,
|
|
) -> int | None:
|
|
"""Print colliding definitions and ask which one to keep if requested.
|
|
Returns a tuple composed from the prompt result if prompted otherwise None and the default value."""
|
|
if old_definitions:
|
|
old_defs_hash_no_metadata = [
|
|
hash_dict_on_keys(d, exclude_keys=["metadata"]) for d in old_definitions
|
|
]
|
|
|
|
default_index = None
|
|
title = f"COLLISION BETWEEN {name}S"
|
|
print(f"== {title} ==")
|
|
for idx, definition in enumerate(definitions):
|
|
found = ""
|
|
if (
|
|
old_definitions
|
|
and hash_dict_on_keys(definition, exclude_keys=["metadata"])
|
|
in old_defs_hash_no_metadata
|
|
):
|
|
found = " (found in old definitions)"
|
|
default_index = idx
|
|
print(f"DEFINITION {idx}{found}:")
|
|
print(json.dumps(definition, sort_keys=True, indent=None))
|
|
|
|
answer = None
|
|
if prompt:
|
|
answer = int(
|
|
click.prompt(
|
|
"Which definition do you want to keep? Please enter a valid integer",
|
|
type=click.Choice([str(n) for n in range(len(definitions))]),
|
|
show_choices=True,
|
|
default=str(default_index) if default_index is not None else None,
|
|
show_default=default_index is not None,
|
|
)
|
|
)
|
|
return answer, default_index
|
|
|
|
|
|
def get_definition_deleted_status(definition: Dict) -> str:
|
|
return (
|
|
"PREVIOUSLY DELETED"
|
|
if definition.get("metadata", {}).get("deleted") is not None
|
|
else ""
|
|
)
|
|
|
|
|
|
def check_tokens_collisions(tokens: List[Dict], old_tokens: List[Dict] | None) -> None:
|
|
collisions: defaultdict = defaultdict(list)
|
|
for idx, nd in enumerate(tokens):
|
|
collisions[hash_dict_on_keys(nd, ["chain_id", "address"])].append(idx)
|
|
|
|
no_of_collisions = 0
|
|
for _, v in collisions.items():
|
|
if len(v) > 1:
|
|
no_of_collisions += 1
|
|
|
|
if no_of_collisions > 0:
|
|
print(f"\nNumber of collisions: {no_of_collisions}")
|
|
|
|
# solve collisions
|
|
delete_indexes = []
|
|
for _, v in collisions.items():
|
|
if len(v) > 1:
|
|
coliding_networks = [tokens[i] for i in v]
|
|
choice, default = print_definitions_collision(
|
|
"TOKEN", coliding_networks, old_tokens
|
|
)
|
|
index = choice if choice is not None else default
|
|
print(f"Keeping the definition with index {index}.")
|
|
v.pop(index)
|
|
delete_indexes.extend(v)
|
|
|
|
# delete collisions
|
|
delete_indexes.sort(reverse=True)
|
|
for idx in delete_indexes:
|
|
tokens.pop(idx)
|
|
|
|
|
|
def check_definitions_list(
|
|
old_defs: List[Dict],
|
|
new_defs: List[Dict],
|
|
main_keys: List[str],
|
|
def_name: str,
|
|
interactive: bool,
|
|
force: bool,
|
|
) -> None:
|
|
# store already processed definitions
|
|
deleted_definitions: List[Dict] = []
|
|
modified_definitions: List[Dict] = []
|
|
moved_definitions: List[Tuple] = []
|
|
resurrected_definitions: List[Tuple] = []
|
|
|
|
# dicts of new definitions
|
|
defs_hash_no_metadata = dict()
|
|
defs_hash_no_main_keys_and_metadata = dict()
|
|
defs_hash_only_main_keys = dict()
|
|
for nd in new_defs:
|
|
defs_hash_no_metadata[hash_dict_on_keys(nd, exclude_keys=["metadata"])] = nd
|
|
defs_hash_no_main_keys_and_metadata[
|
|
hash_dict_on_keys(nd, exclude_keys=main_keys + ["metadata"])
|
|
] = nd
|
|
defs_hash_only_main_keys[hash_dict_on_keys(nd, main_keys)] = nd
|
|
|
|
# dict of old definitions
|
|
old_defs_hash_only_main_keys = {
|
|
hash_dict_on_keys(d, main_keys): d for d in old_defs
|
|
}
|
|
|
|
# mark all deleted, moved or changed definitions
|
|
for old_def in old_defs:
|
|
old_def_hash_only_main_keys = hash_dict_on_keys(old_def, main_keys)
|
|
old_def_hash_no_metadata = hash_dict_on_keys(old_def, exclude_keys=["metadata"])
|
|
old_def_hash_no_main_keys_and_metadata = hash_dict_on_keys(
|
|
old_def, exclude_keys=main_keys + ["metadata"]
|
|
)
|
|
|
|
was_deleted_status = get_definition_deleted_status(old_def)
|
|
|
|
if old_def_hash_no_metadata in defs_hash_no_metadata:
|
|
# same definition found, check if it was not marked as deleted before
|
|
if was_deleted_status:
|
|
resurrected_definitions.append(old_def)
|
|
else:
|
|
# definition is unchanged, copy its metadata
|
|
old_def_metadata = old_def.get("metadata")
|
|
if old_def_metadata:
|
|
defs_hash_no_metadata[old_def_hash_no_metadata][
|
|
"metadata"
|
|
] = old_def_metadata
|
|
elif (
|
|
old_def_hash_no_main_keys_and_metadata
|
|
in defs_hash_no_main_keys_and_metadata
|
|
):
|
|
# definition was moved
|
|
# check if there was something before on this "position"
|
|
new_def = defs_hash_no_main_keys_and_metadata[
|
|
old_def_hash_no_main_keys_and_metadata
|
|
]
|
|
new_def_hash_only_main_keys = hash_dict_on_keys(new_def, main_keys)
|
|
orig_def = old_defs_hash_only_main_keys.get(new_def_hash_only_main_keys)
|
|
|
|
# check if the move is valid - "old_def" is not marked as deleted
|
|
# and there was a change in the original definition
|
|
if (
|
|
orig_def is None
|
|
or not was_deleted_status
|
|
or hash_dict_on_keys(orig_def, exclude_keys=["metadata"])
|
|
!= hash_dict_on_keys(new_def, exclude_keys=["metadata"])
|
|
):
|
|
moved_definitions.append((old_def, new_def, orig_def))
|
|
else:
|
|
# invalid move - this was an old move so we have to check if anything else
|
|
# is coming to "old_def" position
|
|
if old_def_hash_only_main_keys in defs_hash_only_main_keys:
|
|
modified_definitions.append(
|
|
(old_def, defs_hash_only_main_keys[old_def_hash_only_main_keys])
|
|
)
|
|
else:
|
|
# no - so just maintain the "old_def"
|
|
new_defs.append(old_def)
|
|
elif old_def_hash_only_main_keys in defs_hash_only_main_keys:
|
|
# definition was modified
|
|
modified_definitions.append(
|
|
(old_def, defs_hash_only_main_keys[old_def_hash_only_main_keys])
|
|
)
|
|
else:
|
|
# definition was not found - was it deleted before or just now?
|
|
if not was_deleted_status:
|
|
# definition was deleted now
|
|
deleted_definitions.append(old_def)
|
|
else:
|
|
# no confirmation needed
|
|
new_defs.append(old_def)
|
|
|
|
# try to pair moved and modified definitions
|
|
for old_def, new_def, orig_def in moved_definitions:
|
|
# check if there is modified definition, that was modified to "new_def"
|
|
# if yes it was because this "old_def" was moved to "orig_def" position
|
|
if (orig_def, new_def) in modified_definitions:
|
|
modified_definitions.remove((orig_def, new_def))
|
|
|
|
no_of_changes = (
|
|
len(moved_definitions)
|
|
+ len(modified_definitions)
|
|
+ len(deleted_definitions)
|
|
+ len(resurrected_definitions)
|
|
)
|
|
if no_of_changes > 0:
|
|
print(f"Number of changes: {no_of_changes}")
|
|
|
|
# go through changes and ask for confirmation
|
|
for old_def, new_def, orig_def in moved_definitions:
|
|
accept_change = True
|
|
# if the change contains symbol change "--force" parameter must be used to be able to accept this change
|
|
if (
|
|
orig_def is not None
|
|
and orig_def.get("shortcut") != new_def.get("shortcut")
|
|
and not force
|
|
):
|
|
print(
|
|
"\nERROR: Symbol change in this definition! To be able to approve this change re-run with `--force` argument."
|
|
)
|
|
accept_change = False
|
|
|
|
answer = print_definition_change(
|
|
def_name.upper(),
|
|
"MOVED",
|
|
old_def,
|
|
new_def,
|
|
orig_def,
|
|
prompt=interactive and accept_change,
|
|
)
|
|
if answer is False or answer is None and not accept_change:
|
|
# revert change - replace "new_def" with "old_def" and "orig_def"
|
|
new_defs.remove(new_def)
|
|
new_defs.append(old_def)
|
|
new_defs.append(orig_def)
|
|
else:
|
|
_set_definition_metadata(new_def, old_def, main_keys)
|
|
|
|
# if position of the "old_def" will remain empty leave on its former position a "mark"
|
|
# that it has been deleted
|
|
old_def_remains_empty = True
|
|
for _, nd, _ in moved_definitions:
|
|
if hash_dict_on_keys(old_def, main_keys) == hash_dict_on_keys(
|
|
nd, main_keys
|
|
):
|
|
old_def_remains_empty = False
|
|
|
|
if old_def_remains_empty:
|
|
_set_definition_metadata(old_def, deleted=True)
|
|
new_defs.append(old_def)
|
|
|
|
for old_def, new_def in modified_definitions:
|
|
accept_change = True
|
|
# if the change contains symbol change "--force" parameter must be used to be able to accept this change
|
|
if old_def.get("shortcut") != new_def.get("shortcut") and not force:
|
|
print(
|
|
"\nERROR: Symbol change in this definition! To be able to approve this change re-run with `--force` argument."
|
|
)
|
|
accept_change = False
|
|
|
|
answer = print_definition_change(
|
|
def_name.upper(),
|
|
"MODIFIED",
|
|
old_def,
|
|
new_def,
|
|
prompt=interactive and accept_change,
|
|
)
|
|
if answer is False or answer is None and not accept_change:
|
|
# revert change - replace "new_def" with "old_def"
|
|
new_defs.remove(new_def)
|
|
new_defs.append(old_def)
|
|
|
|
for definition in deleted_definitions:
|
|
if (
|
|
print_definition_change(
|
|
def_name.upper(), "DELETED", definition, prompt=interactive
|
|
)
|
|
is False
|
|
):
|
|
# revert change - add back the deleted definition
|
|
new_defs.append(definition)
|
|
else:
|
|
_set_definition_metadata(definition, deleted=True)
|
|
new_defs.append(definition)
|
|
|
|
for definition in resurrected_definitions:
|
|
if (
|
|
print_definition_change(
|
|
def_name.upper(), "RESURRECTED", definition, prompt=interactive
|
|
)
|
|
is not False
|
|
):
|
|
# clear deleted mark
|
|
_set_definition_metadata(definition)
|
|
|
|
|
|
def filter_top100(top100: List[Dict], definitions: List[Dict]) -> List[Dict]:
|
|
top100_ids = [d["id"] for d in top100]
|
|
|
|
return {
|
|
"networks": [
|
|
n for n in definitions["networks"] if n.get("coingecko_id") in top100_ids
|
|
],
|
|
"tokens": [
|
|
t for t in definitions["tokens"] if t.get("coingecko_id") in top100_ids
|
|
],
|
|
}
|
|
|
|
|
|
def _load_prepared_definitions():
|
|
# set keys for the networks and tokens
|
|
# networks.append(cast(Coin, network))
|
|
# key=f"eth:{shortcut}",
|
|
|
|
# token.update(
|
|
# chain=chain,
|
|
# chain_id=network["chain_id"],
|
|
# address_bytes=bytes.fromhex(token["address"][2:]),
|
|
# shortcut=token["symbol"],
|
|
# key=f"erc20:{chain}:{token['symbol']}",
|
|
# )
|
|
pass
|
|
|
|
|
|
# ====== coindefs generators ======
|
|
|
|
|
|
def eth_info_from_dict(
|
|
coin: Coin, msg_type: EthereumNetworkInfo | EthereumTokenInfo
|
|
) -> EthereumNetworkInfo | EthereumTokenInfo:
|
|
attributes: Dict[str, Any] = dict()
|
|
for field in msg_type.FIELDS.values():
|
|
val = coin.get(field.name)
|
|
|
|
if field.name in ("chain_id", "slip44"):
|
|
attributes[field.name] = int(val)
|
|
elif msg_type == EthereumTokenInfo and field.name == "address":
|
|
attributes[field.name] = coin.get("address_bytes")
|
|
else:
|
|
attributes[field.name] = val
|
|
|
|
proto = msg_type(**attributes)
|
|
|
|
return proto
|
|
|
|
|
|
def serialize_eth_info(
|
|
info: EthereumNetworkInfo | EthereumTokenInfo, data_type_num: EthereumDefinitionType
|
|
) -> bytes:
|
|
ser = FORMAT_VERSION_BYTES
|
|
ser += data_type_num.to_bytes(1, "big")
|
|
ser += DATA_VERSION_BYTES
|
|
|
|
buf = io.BytesIO()
|
|
protobuf.dump_message(buf, info)
|
|
ser += buf.getvalue()
|
|
|
|
return ser
|
|
|
|
|
|
def sign_data(sign_key: ed25519.SigningKey, data: bytes) -> bytes:
|
|
return sign_key.sign(data)
|
|
|
|
|
|
# ====== click command handlers ======
|
|
|
|
|
|
@click.group()
|
|
def cli() -> None:
|
|
"""Script for handling Ethereum definitions (networks and tokens)."""
|
|
|
|
|
|
@cli.command()
|
|
@click.option(
|
|
"-r/-R",
|
|
"--refresh/--no-refresh",
|
|
default=None,
|
|
help="Force refresh or no-refresh data. By default tries to load cached data.",
|
|
)
|
|
@click.option(
|
|
"-i",
|
|
"--interactive",
|
|
is_flag=True,
|
|
help="Ask about every change. Without this option script will automatically accept all changes to the definitions "
|
|
"(except those in symbols, see `--force` option).",
|
|
)
|
|
@click.option(
|
|
"-f",
|
|
"--force",
|
|
is_flag=True,
|
|
help="Changes to symbols in definitions could be accepted.",
|
|
)
|
|
@click.option(
|
|
"-s",
|
|
"--show-all",
|
|
is_flag=True,
|
|
help="Show the differences of all definitions. By default only changes to top 100 definitions (by Coingecko market cap ranking) are shown.",
|
|
)
|
|
@click.option(
|
|
"-d",
|
|
"--deffile",
|
|
type=click.Path(resolve_path=True, dir_okay=False, path_type=pathlib.Path),
|
|
default="./definitions-latest.json",
|
|
help="File where the definitions will be saved in json format. If file already exists, it is used to check "
|
|
"the changes in definitions.",
|
|
)
|
|
@click.option(
|
|
"-n",
|
|
"--networks-dir",
|
|
type=click.Path(
|
|
exists=True, file_okay=False, resolve_path=True, path_type=pathlib.Path
|
|
),
|
|
default=DEFS_DIR / "ethereum" / "chains",
|
|
help="Directory pointing at cloned networks definition repo (https://github.com/ethereum-lists/chains). "
|
|
"Defaults to `$(DEFS_DIR)/ethereum/chains` if env variable `DEFS_DIR` is set, otherwise to "
|
|
'`"this script location"/../defs/ethereum/chains`',
|
|
)
|
|
@click.option(
|
|
"-t",
|
|
"--tokens-dir",
|
|
type=click.Path(
|
|
exists=True, file_okay=False, resolve_path=True, path_type=pathlib.Path
|
|
),
|
|
default=DEFS_DIR / "ethereum" / "tokens",
|
|
help="Directory pointing at cloned networks definition repo (https://github.com/ethereum-lists/tokens). "
|
|
"Defaults to `$(DEFS_DIR)/ethereum/tokens` if env variable `DEFS_DIR` is set, otherwise to "
|
|
'`"this script location"/../defs/ethereum/tokens`',
|
|
)
|
|
def prepare_definitions(
|
|
refresh: bool | None,
|
|
interactive: bool,
|
|
force: bool,
|
|
show_all: bool,
|
|
deffile: pathlib.Path,
|
|
networks_dir: pathlib.Path,
|
|
tokens_dir: pathlib.Path,
|
|
) -> None:
|
|
"""Prepare Ethereum definitions."""
|
|
# init Ethereum definitions downloader
|
|
downloader = EthereumDefinitionsCachedDownloader(refresh)
|
|
|
|
networks = _load_ethereum_networks_from_repo(networks_dir)
|
|
|
|
# coingecko API
|
|
cg_platforms = downloader.get_coingecko_asset_platforms()
|
|
cg_platforms_by_chain_id: dict[int, Any] = dict()
|
|
for chain in cg_platforms:
|
|
# We want only informations about chains, that have both chain id and coingecko id,
|
|
# otherwise we could not link local and coingecko networks.
|
|
if chain["chain_identifier"] is not None and chain["id"] is not None:
|
|
cg_platforms_by_chain_id[chain["chain_identifier"]] = chain["id"]
|
|
|
|
# defillama API
|
|
dl_chains = downloader.get_defillama_chains()
|
|
dl_chains_by_chain_id: dict[int, Any] = dict()
|
|
for chain in dl_chains:
|
|
# We want only informations about chains, that have both chain id and coingecko id,
|
|
# otherwise we could not link local and coingecko networks.
|
|
if chain["chainId"] is not None and chain["gecko_id"] is not None:
|
|
dl_chains_by_chain_id[chain["chainId"]] = chain["gecko_id"]
|
|
|
|
# We will try to get as many "coingecko_id"s as possible to be able to use them afterwards
|
|
# to load tokens from coingecko. We won't use coingecko networks, because we don't know which
|
|
# ones are EVM based.
|
|
coingecko_id_to_chain_id = dict()
|
|
for network in networks:
|
|
if network.get("coingecko_id") is None:
|
|
# first try to assign coingecko_id to local networks from coingecko via chain_id
|
|
if network["chain_id"] in cg_platforms_by_chain_id:
|
|
network["coingecko_id"] = cg_platforms_by_chain_id[network["chain_id"]]
|
|
# or try to assign coingecko_id to local networks from defillama via chain_id
|
|
elif network["chain_id"] in dl_chains_by_chain_id:
|
|
network["coingecko_id"] = dl_chains_by_chain_id[network["chain_id"]]
|
|
|
|
# if we found "coingecko_id" add it to the map - used later to map tokens with coingecko ids
|
|
if network.get("coingecko_id") is not None:
|
|
coingecko_id_to_chain_id[network["coingecko_id"]] = network["chain_id"]
|
|
|
|
# get tokens
|
|
cg_tokens = _load_erc20_tokens_from_coingecko(downloader, networks)
|
|
repo_tokens = _load_erc20_tokens_from_repo(tokens_dir, networks)
|
|
|
|
# merge tokens
|
|
tokens: List[Dict] = []
|
|
tokens_by_chain_id_and_address = defaultdict(list)
|
|
for t in cg_tokens:
|
|
if t not in tokens:
|
|
# add only unique tokens
|
|
tokens.append(t)
|
|
tokens_by_chain_id_and_address[(t["chain_id"], t["address"])].append(t)
|
|
for t in repo_tokens:
|
|
if (
|
|
t not in tokens
|
|
and (t["chain_id"], t["address"]) not in tokens_by_chain_id_and_address
|
|
):
|
|
# add only unique tokens and CoinGecko tokens are preffered
|
|
tokens.append(t)
|
|
tokens_by_chain_id_and_address[(t["chain_id"], t["address"])].append(t)
|
|
|
|
old_defs = None
|
|
if deffile.exists():
|
|
# load old definitions
|
|
old_defs = load_json(deffile)
|
|
|
|
check_tokens_collisions(
|
|
tokens, old_defs["tokens"] if old_defs is not None else None
|
|
)
|
|
|
|
# map coingecko ids to tokens
|
|
cg_coin_list = downloader.get_coingecko_coins_list()
|
|
for coin in cg_coin_list:
|
|
for platform_name, address in coin.get("platforms", dict()).items():
|
|
key = (coingecko_id_to_chain_id.get(platform_name), address)
|
|
if key in tokens_by_chain_id_and_address:
|
|
for token in tokens_by_chain_id_and_address[key]:
|
|
token["coingecko_id"] = coin["id"]
|
|
|
|
# load top 100 definitions from CoinGecko
|
|
cg_top100 = downloader.get_coingecko_top100()
|
|
|
|
# check changes in definitions
|
|
if old_defs is not None:
|
|
# filter to top 100 based on Coingecko market cap order
|
|
if not show_all:
|
|
old_defs = filter_top100(cg_top100, old_defs)
|
|
|
|
# check networks and tokens
|
|
check_definitions_list(
|
|
old_defs["networks"], networks, ["chain_id"], "network", interactive, force
|
|
)
|
|
check_definitions_list(
|
|
old_defs["tokens"],
|
|
tokens,
|
|
["chain_id", "address"],
|
|
"token",
|
|
interactive,
|
|
force,
|
|
)
|
|
|
|
# sort networks and tokens
|
|
networks.sort(key=lambda x: x["chain_id"])
|
|
tokens.sort(key=lambda x: (x["chain_id"], x["address"]))
|
|
|
|
# save results
|
|
with open(deffile, "w+") as f:
|
|
json.dump(
|
|
obj=dict(networks=networks, tokens=tokens),
|
|
fp=f,
|
|
ensure_ascii=False,
|
|
sort_keys=True,
|
|
indent=1,
|
|
)
|
|
f.write("\n")
|
|
|
|
# save cache
|
|
downloader.save_cache()
|
|
|
|
|
|
# TODO: separate function to generate built-in defs???
|
|
|
|
|
|
@cli.command()
|
|
@click.option(
|
|
"-o",
|
|
"--outdir",
|
|
type=click.Path(resolve_path=True, file_okay=False, path_type=pathlib.Path),
|
|
default="./definitions-latest",
|
|
)
|
|
@click.option(
|
|
"-k",
|
|
"--privatekey",
|
|
type=click.File(mode="r"),
|
|
help="Private key (text, hex formated) to use to sign data. Could be also loaded from `PRIVATE_KEY` env variable. Provided file is preffered over env variable.",
|
|
)
|
|
def coindefs(outdir: pathlib.Path, privatekey: TextIO) -> None:
|
|
"""Generate signed Ethereum definitions for python-trezor and others."""
|
|
hex_key = None
|
|
if privatekey is None:
|
|
# load from env variable
|
|
hex_key = os.getenv("PRIVATE_KEY", default=None)
|
|
else:
|
|
with privatekey:
|
|
hex_key = privatekey.readline()
|
|
|
|
if hex_key is None:
|
|
raise click.ClickException("No private key for signing was provided.")
|
|
|
|
sign_key = ed25519.SigningKey(ed25519.from_ascii(hex_key, encoding="hex"))
|
|
|
|
def save_definition(directory: pathlib.Path, keys: list[str], data: bytes):
|
|
complete_filename = "_".join(keys) + ".dat"
|
|
with open(directory / complete_filename, mode="wb+") as f:
|
|
f.write(data)
|
|
|
|
def generate_token_defs(tokens: Coins, path: pathlib.Path):
|
|
for token in tokens:
|
|
if token["address"] is None:
|
|
continue
|
|
|
|
# generate definition of the token
|
|
keys = ["token", token["address"][2:].lower()]
|
|
ser = serialize_eth_info(
|
|
eth_info_from_dict(token, EthereumTokenInfo),
|
|
EthereumDefinitionType.TOKEN,
|
|
)
|
|
save_definition(path, keys, ser + sign_data(sign_key, ser))
|
|
|
|
def generate_network_def(net: Coin, tokens: Coins):
|
|
if net["chain_id"] is None:
|
|
return
|
|
|
|
# create path for networks identified by chain ids
|
|
network_dir = outdir / "by_chain_id" / str(net["chain_id"])
|
|
try:
|
|
network_dir.mkdir(parents=True)
|
|
except FileExistsError:
|
|
raise click.ClickException(
|
|
f"Network with chain ID {net['chain_id']} already exists - attempt to generate defs for network \"{net['name']}\" ({net['shortcut']})."
|
|
)
|
|
|
|
# generate definition of the network
|
|
keys = ["network"]
|
|
ser = serialize_eth_info(
|
|
eth_info_from_dict(net, EthereumNetworkInfo), EthereumDefinitionType.NETWORK
|
|
)
|
|
complete_data = ser + sign_data(sign_key, ser)
|
|
save_definition(network_dir, keys, complete_data)
|
|
|
|
# generate tokens for the network
|
|
generate_token_defs(tokens, network_dir)
|
|
|
|
# create path for networks identified by slip44 ids
|
|
slip44_dir = outdir / "by_slip44" / str(net["slip44"])
|
|
if not slip44_dir.exists():
|
|
slip44_dir.mkdir(parents=True)
|
|
# TODO: save only first network??
|
|
save_definition(slip44_dir, keys, complete_data)
|
|
|
|
# clear defs directory
|
|
if outdir.exists():
|
|
shutil.rmtree(outdir)
|
|
outdir.mkdir(parents=True)
|
|
|
|
all_coins = coin_info.coin_info()
|
|
|
|
# group tokens by their chain_id
|
|
token_buckets: CoinBuckets = defaultdict(list)
|
|
for token in all_coins.erc20:
|
|
token_buckets[token["chain_id"]].append(token)
|
|
|
|
for network in all_coins.eth:
|
|
generate_network_def(network, token_buckets[network["chain_id"]])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli()
|