mirror of
https://github.com/trezor/trezor-firmware.git
synced 2024-11-27 01:48:17 +00:00
9342b52c6b
`cointool.py coins_json` generated an old-style `coins.json` file with entries on bitcoin-like coins. This is no longe really doing what we need. Instead, the new command `dump` will output _all available data_ in the same format as Python sees it. Much more useful for other kinds of consumers.
666 lines
20 KiB
Python
Executable File
666 lines
20 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import io
|
|
import json
|
|
import logging
|
|
import re
|
|
import sys
|
|
import os
|
|
import glob
|
|
import binascii
|
|
import struct
|
|
import zlib
|
|
from collections import defaultdict
|
|
from hashlib import sha256
|
|
|
|
import click
|
|
|
|
import coin_info
|
|
from coindef import CoinDef
|
|
|
|
try:
|
|
import termcolor
|
|
except ImportError:
|
|
termcolor = None
|
|
|
|
try:
|
|
import mako
|
|
import mako.template
|
|
from munch import Munch
|
|
|
|
CAN_RENDER = True
|
|
except ImportError:
|
|
CAN_RENDER = False
|
|
|
|
try:
|
|
import requests
|
|
except ImportError:
|
|
requests = None
|
|
|
|
try:
|
|
import ed25519
|
|
from PIL import Image
|
|
from trezorlib import protobuf
|
|
|
|
CAN_BUILD_DEFS = True
|
|
except ImportError:
|
|
CAN_BUILD_DEFS = False
|
|
|
|
|
|
# ======= Crayon colors ======
|
|
USE_COLORS = False
|
|
|
|
|
|
def crayon(color, string, bold=False, dim=False):
|
|
if not termcolor or not USE_COLORS:
|
|
return string
|
|
else:
|
|
if bold:
|
|
attrs = ["bold"]
|
|
elif dim:
|
|
attrs = ["dark"]
|
|
else:
|
|
attrs = []
|
|
return termcolor.colored(string, color, attrs=attrs)
|
|
|
|
|
|
def print_log(level, *args, **kwargs):
|
|
prefix = logging.getLevelName(level)
|
|
if level == logging.DEBUG:
|
|
prefix = crayon("blue", prefix, bold=False)
|
|
elif level == logging.INFO:
|
|
prefix = crayon("blue", prefix, bold=True)
|
|
elif level == logging.WARNING:
|
|
prefix = crayon("red", prefix, bold=False)
|
|
elif level == logging.ERROR:
|
|
prefix = crayon("red", prefix, bold=True)
|
|
print(prefix, *args, **kwargs)
|
|
|
|
|
|
# ======= Mako management ======
|
|
|
|
|
|
def c_str_filter(b):
|
|
if b is None:
|
|
return "NULL"
|
|
|
|
def hexescape(c):
|
|
return r"\x{:02x}".format(c)
|
|
|
|
if isinstance(b, bytes):
|
|
return '"' + "".join(map(hexescape, b)) + '"'
|
|
else:
|
|
return json.dumps(b)
|
|
|
|
|
|
def black_repr_filter(val):
|
|
if isinstance(val, str):
|
|
if '"' in val:
|
|
return repr(val)
|
|
else:
|
|
return c_str_filter(val)
|
|
elif isinstance(val, bytes):
|
|
return "b" + c_str_filter(val)
|
|
else:
|
|
return repr(val)
|
|
|
|
|
|
def ascii_filter(s):
|
|
return re.sub("[^ -\x7e]", "_", s)
|
|
|
|
|
|
def make_support_filter(support_info):
|
|
def supported_on(device, coins):
|
|
for coin in coins:
|
|
if support_info[coin.key].get(device):
|
|
yield coin
|
|
|
|
return supported_on
|
|
|
|
|
|
MAKO_FILTERS = {
|
|
"c_str": c_str_filter,
|
|
"ascii": ascii_filter,
|
|
"black_repr": black_repr_filter,
|
|
}
|
|
|
|
|
|
def render_file(src, dst, coins, support_info):
|
|
"""Renders `src` template into `dst`.
|
|
|
|
`src` is a filename, `dst` is an open file object.
|
|
"""
|
|
template = mako.template.Template(filename=src)
|
|
result = template.render(
|
|
support_info=support_info,
|
|
supported_on=make_support_filter(support_info),
|
|
**coins,
|
|
**MAKO_FILTERS,
|
|
)
|
|
dst.write(result)
|
|
|
|
|
|
# ====== validation functions ======
|
|
|
|
|
|
def highlight_key(coin, color):
|
|
"""Return a colorful string where the SYMBOL part is bold."""
|
|
keylist = coin["key"].split(":")
|
|
if keylist[-1].isdigit():
|
|
keylist[-2] = crayon(color, keylist[-2], bold=True)
|
|
else:
|
|
keylist[-1] = crayon(color, keylist[-1], bold=True)
|
|
key = crayon(color, ":".join(keylist))
|
|
name = crayon(None, f"({coin['name']})", dim=True)
|
|
return f"{key} {name}"
|
|
|
|
|
|
def find_address_collisions(coins, field):
|
|
"""Detects collisions in a given field. Returns buckets of colliding coins."""
|
|
collisions = defaultdict(list)
|
|
for coin in coins:
|
|
value = coin[field]
|
|
collisions[value].append(coin)
|
|
return {k: v for k, v in collisions.items() if len(v) > 1}
|
|
|
|
|
|
def check_btc(coins):
|
|
check_passed = True
|
|
support_infos = coin_info.support_info(coins)
|
|
|
|
# validate individual coin data
|
|
for coin in coins:
|
|
errors = coin_info.validate_btc(coin)
|
|
if errors:
|
|
check_passed = False
|
|
print_log(logging.ERROR, "invalid definition for", coin["name"])
|
|
print("\n".join(errors))
|
|
|
|
def collision_str(bucket):
|
|
"""Generate a colorful string out of a bucket of colliding coins."""
|
|
coin_strings = []
|
|
for coin in bucket:
|
|
name = coin["name"]
|
|
prefix = ""
|
|
if name.endswith("Testnet"):
|
|
color = "green"
|
|
elif name == "Bitcoin":
|
|
color = "red"
|
|
elif coin.get("unsupported"):
|
|
color = "grey"
|
|
prefix = crayon("blue", "(X)", bold=True)
|
|
else:
|
|
color = "blue"
|
|
hl = highlight_key(coin, color)
|
|
coin_strings.append(prefix + hl)
|
|
return ", ".join(coin_strings)
|
|
|
|
def print_collision_buckets(buckets, prefix, maxlevel=logging.ERROR):
|
|
"""Intelligently print collision buckets.
|
|
|
|
For each bucket, if there are any collision with a mainnet, print it.
|
|
If the collision is with unsupported networks or testnets, it's just INFO.
|
|
If the collision is with supported mainnets, it's WARNING.
|
|
If the collision with any supported network includes Bitcoin, it's an ERROR.
|
|
"""
|
|
failed = False
|
|
for key, bucket in buckets.items():
|
|
mainnets = [c for c in bucket if not c["name"].endswith("Testnet")]
|
|
|
|
have_bitcoin = False
|
|
for coin in mainnets:
|
|
if coin["name"] == "Bitcoin":
|
|
have_bitcoin = True
|
|
if all(v is False for k, v in support_infos[coin["key"]].items()):
|
|
coin["unsupported"] = True
|
|
|
|
supported_mainnets = [c for c in mainnets if not c.get("unsupported")]
|
|
supported_networks = [c for c in bucket if not c.get("unsupported")]
|
|
|
|
if len(mainnets) > 1:
|
|
if have_bitcoin and len(supported_networks) > 1:
|
|
# ANY collision with Bitcoin is bad
|
|
level = maxlevel
|
|
failed = True
|
|
elif len(supported_mainnets) > 1:
|
|
# collision between supported networks is still pretty bad
|
|
level = logging.WARNING
|
|
else:
|
|
# collision between some unsupported networks is OK
|
|
level = logging.INFO
|
|
print_log(level, f"prefix {key}:", collision_str(bucket))
|
|
|
|
return failed
|
|
|
|
# slip44 collisions
|
|
print("Checking SLIP44 prefix collisions...")
|
|
slip44 = find_address_collisions(coins, "slip44")
|
|
if print_collision_buckets(slip44, "key"):
|
|
check_passed = False
|
|
|
|
# only check address_type on coins that don't use cashaddr
|
|
nocashaddr = [coin for coin in coins if not coin.get("cashaddr_prefix")]
|
|
|
|
print("Checking address_type collisions...")
|
|
address_type = find_address_collisions(nocashaddr, "address_type")
|
|
if print_collision_buckets(address_type, "address type"):
|
|
check_passed = False
|
|
|
|
print("Checking address_type_p2sh collisions...")
|
|
address_type_p2sh = find_address_collisions(nocashaddr, "address_type_p2sh")
|
|
# we ignore failed checks on P2SH, because reasons
|
|
print_collision_buckets(address_type_p2sh, "address type", logging.WARNING)
|
|
|
|
return check_passed
|
|
|
|
|
|
def check_dups(buckets, print_at_level=logging.ERROR):
|
|
"""Analyze and pretty-print results of `coin_info.mark_duplicate_shortcuts`.
|
|
|
|
`print_at_level` can be one of logging levels.
|
|
|
|
The results are buckets of colliding symbols.
|
|
If the collision is only between ERC20 tokens, it's DEBUG.
|
|
If the collision includes one non-token, it's INFO.
|
|
If the collision includes more than one non-token, it's ERROR and printed always.
|
|
"""
|
|
|
|
def coin_str(coin):
|
|
"""Colorize coins. Tokens are cyan, nontokens are red. Coins that are NOT
|
|
marked duplicate get a green asterisk.
|
|
"""
|
|
if coin_info.is_token(coin):
|
|
color = "cyan"
|
|
else:
|
|
color = "red"
|
|
highlighted = highlight_key(coin, color)
|
|
if not coin.get("duplicate"):
|
|
prefix = crayon("green", "*", bold=True)
|
|
else:
|
|
prefix = ""
|
|
return f"{prefix}{highlighted}"
|
|
|
|
check_passed = True
|
|
|
|
for symbol in sorted(buckets.keys()):
|
|
bucket = buckets[symbol]
|
|
if not bucket:
|
|
continue
|
|
|
|
nontokens = [coin for coin in bucket if not coin_info.is_token(coin)]
|
|
|
|
# string generation
|
|
dup_str = ", ".join(coin_str(coin) for coin in bucket)
|
|
if not nontokens:
|
|
level = logging.DEBUG
|
|
elif len(nontokens) == 1:
|
|
level = logging.INFO
|
|
else:
|
|
level = logging.ERROR
|
|
check_passed = False
|
|
|
|
# deciding whether to print
|
|
if level < print_at_level:
|
|
continue
|
|
|
|
if symbol == "_override":
|
|
print_log(level, "force-set duplicates:", dup_str)
|
|
else:
|
|
print_log(level, f"duplicate symbol {symbol}:", dup_str)
|
|
|
|
return check_passed
|
|
|
|
|
|
def check_backends(coins):
|
|
check_passed = True
|
|
for coin in coins:
|
|
genesis_block = coin.get("hash_genesis_block")
|
|
if not genesis_block:
|
|
continue
|
|
backends = coin.get("blockbook", []) + coin.get("bitcore", [])
|
|
for backend in backends:
|
|
print("checking", backend, "... ", end="", flush=True)
|
|
try:
|
|
j = requests.get(backend + "/api/block-index/0").json()
|
|
if j["blockHash"] != genesis_block:
|
|
raise RuntimeError("genesis block mismatch")
|
|
except Exception as e:
|
|
print(e)
|
|
check_passed = False
|
|
else:
|
|
print("OK")
|
|
return check_passed
|
|
|
|
|
|
def check_icons(coins):
|
|
check_passed = True
|
|
for coin in coins:
|
|
key = coin["key"]
|
|
icon_file = coin.get("icon")
|
|
if not icon_file:
|
|
print(key, ": missing icon")
|
|
check_passed = False
|
|
continue
|
|
|
|
try:
|
|
icon = Image.open(icon_file)
|
|
except Exception:
|
|
print(key, ": failed to open icon file", icon_file)
|
|
check_passed = False
|
|
continue
|
|
|
|
if icon.size != (96, 96) or icon.mode != "RGBA":
|
|
print(key, ": bad icon format (must be RGBA 96x96)")
|
|
check_passed = False
|
|
return check_passed
|
|
|
|
|
|
IGNORE_NONUNIFORM_KEYS = frozenset(("unsupported", "duplicate", "notes"))
|
|
|
|
|
|
def check_key_uniformity(coins):
|
|
keysets = defaultdict(list)
|
|
for coin in coins:
|
|
keyset = frozenset(coin.keys()) | IGNORE_NONUNIFORM_KEYS
|
|
keysets[keyset].append(coin)
|
|
|
|
if len(keysets) <= 1:
|
|
return True
|
|
|
|
buckets = list(keysets.values())
|
|
buckets.sort(key=lambda x: len(x))
|
|
majority = buckets[-1]
|
|
rest = sum(buckets[:-1], [])
|
|
reference_keyset = set(majority[0].keys())
|
|
|
|
for coin in rest:
|
|
key = coin["key"]
|
|
keyset = set(coin.keys())
|
|
missing = ", ".join(reference_keyset - keyset)
|
|
if missing:
|
|
print_log(logging.ERROR, f"coin {key} has missing keys: {missing}")
|
|
additional = ", ".join(keyset - reference_keyset)
|
|
if additional:
|
|
print_log(logging.ERROR, f"coin {key} has superfluous keys: {additional}")
|
|
|
|
return False
|
|
|
|
|
|
# ====== coindefs generators ======
|
|
|
|
|
|
def convert_icon(icon):
|
|
"""Convert PIL icon to TOIF format"""
|
|
# TODO: move this to python-trezor at some point
|
|
DIM = 32
|
|
icon = icon.resize((DIM, DIM), Image.LANCZOS)
|
|
# remove alpha channel, replace with black
|
|
bg = Image.new("RGBA", icon.size, (0, 0, 0, 255))
|
|
icon = Image.alpha_composite(bg, icon)
|
|
# process pixels
|
|
pix = icon.load()
|
|
data = bytes()
|
|
for y in range(DIM):
|
|
for x in range(DIM):
|
|
r, g, b, _ = pix[x, y]
|
|
c = ((r & 0xF8) << 8) | ((g & 0xFC) << 3) | ((b & 0xF8) >> 3)
|
|
data += struct.pack(">H", c)
|
|
z = zlib.compressobj(level=9, wbits=10)
|
|
zdata = z.compress(data) + z.flush()
|
|
zdata = zdata[2:-4] # strip header and checksum
|
|
return zdata
|
|
|
|
|
|
def coindef_from_dict(coin):
|
|
proto = CoinDef()
|
|
for fname, _, fflags in CoinDef.FIELDS.values():
|
|
val = coin.get(fname)
|
|
if val is None and fflags & protobuf.FLAG_REPEATED:
|
|
val = []
|
|
elif fname == "signed_message_header":
|
|
val = val.encode("utf-8")
|
|
elif fname == "hash_genesis_block":
|
|
val = binascii.unhexlify(val)
|
|
setattr(proto, fname, val)
|
|
|
|
return proto
|
|
|
|
|
|
def serialize_coindef(proto, icon):
|
|
proto.icon = icon
|
|
buf = io.BytesIO()
|
|
protobuf.dump_message(buf, proto)
|
|
return buf.getvalue()
|
|
|
|
|
|
def sign(data):
|
|
h = sha256(data).digest()
|
|
sign_key = ed25519.SigningKey(b"A" * 32)
|
|
return sign_key.sign(h)
|
|
|
|
|
|
# ====== click command handlers ======
|
|
|
|
|
|
@click.group()
|
|
@click.option(
|
|
"--colors/--no-colors",
|
|
"-c/-C",
|
|
default=sys.stdout.isatty(),
|
|
help="Force colored output on/off",
|
|
)
|
|
def cli(colors):
|
|
global USE_COLORS
|
|
USE_COLORS = colors
|
|
|
|
|
|
@cli.command()
|
|
# fmt: off
|
|
@click.option("--backend/--no-backend", "-b", default=False, help="Check blockbook/bitcore responses")
|
|
@click.option("--icons/--no-icons", default=True, help="Check icon files")
|
|
@click.option("-d", "--show-duplicates", type=click.Choice(("all", "nontoken", "errors")),
|
|
default="errors", help="How much information about duplicate shortcuts should be shown.")
|
|
# fmt: on
|
|
def check(backend, icons, show_duplicates):
|
|
"""Validate coin definitions.
|
|
|
|
Checks that every btc-like coin is properly filled out, reports duplicate symbols,
|
|
missing or invalid icons, backend responses, and uniform key information --
|
|
i.e., that all coins of the same type have the same fields in their JSON data.
|
|
|
|
Uniformity check ignores NEM mosaics and ERC20 tokens, where non-uniformity is
|
|
expected.
|
|
|
|
The `--show-duplicates` option can be set to:
|
|
|
|
- all: all shortcut collisions are shown, including colliding ERC20 tokens
|
|
|
|
- nontoken: only collisions that affect non-ERC20 coins are shown
|
|
|
|
- errors: only collisions between non-ERC20 tokens are shown. This is the default,
|
|
as a collision between two or more non-ERC20 tokens is an error.
|
|
|
|
In the output, duplicate ERC tokens will be shown in cyan; duplicate non-tokens
|
|
in red. An asterisk (*) next to symbol name means that even though it was detected
|
|
as duplicate, it is still included in results.
|
|
|
|
The collision detection checks that SLIP44 numbers don't collide between different
|
|
mainnets (testnet collisions are allowed), that `address_prefix` doesn't collide
|
|
with Bitcoin (other collisions are reported as warnings). `address_prefix_p2sh`
|
|
is also checked but we have a bunch of collisions there and can't do much
|
|
about them, so it's not an error.
|
|
|
|
In the collision checks, Bitcoin is shown in red, other mainnets in blue,
|
|
testnets in green and unsupported networks in gray, marked with `(X)` for
|
|
non-colored output.
|
|
"""
|
|
if backend and requests is None:
|
|
raise click.ClickException("You must install requests for backend check")
|
|
|
|
if icons and not CAN_BUILD_DEFS:
|
|
raise click.ClickException("Missing requirements for icon check")
|
|
|
|
defs, buckets = coin_info.coin_info_with_duplicates()
|
|
all_checks_passed = True
|
|
|
|
print("Checking BTC-like coins...")
|
|
if not check_btc(defs.bitcoin):
|
|
all_checks_passed = False
|
|
|
|
if show_duplicates == "all":
|
|
dup_level = logging.DEBUG
|
|
elif show_duplicates == "nontoken":
|
|
dup_level = logging.INFO
|
|
else:
|
|
dup_level = logging.ERROR
|
|
print("Checking unexpected duplicates...")
|
|
if not check_dups(buckets, dup_level):
|
|
all_checks_passed = False
|
|
|
|
if icons:
|
|
print("Checking icon files...")
|
|
if not check_icons(defs.bitcoin):
|
|
all_checks_passed = False
|
|
|
|
if backend:
|
|
print("Checking backend responses...")
|
|
if not check_backends(defs.bitcoin):
|
|
all_checks_passed = False
|
|
|
|
print("Checking key uniformity...")
|
|
for cointype, coinlist in defs.items():
|
|
if cointype in ("erc20", "nem"):
|
|
continue
|
|
if not check_key_uniformity(coinlist):
|
|
all_checks_passed = False
|
|
|
|
if not all_checks_passed:
|
|
print("Some checks failed.")
|
|
sys.exit(1)
|
|
else:
|
|
print("Everything is OK.")
|
|
|
|
|
|
@cli.command()
|
|
@click.option("-o", "--outfile", type=click.File(mode="w"), default="./coins.json")
|
|
def dump(outfile):
|
|
"""Dump all coin data in a single JSON file.
|
|
|
|
This file is structured the same as the internal data. That is, top-level object
|
|
is a dict with keys: 'bitcoin', 'eth', 'erc20', 'nem' and 'misc'. Value for each
|
|
key is a list of dicts, each describing a known coin.
|
|
|
|
\b
|
|
Fields are category-specific, except for four common ones:
|
|
- 'name' - human-readable name
|
|
- 'shortcut' - currency symbol
|
|
- 'key' - unique identifier, e.g., 'bitcoin:BTC'
|
|
- 'support' - a dict with entries per known device
|
|
"""
|
|
coins = coin_info.coin_info()
|
|
support_info = coin_info.support_info(coins.as_list())
|
|
|
|
for category in coins.values():
|
|
for coin in category:
|
|
coin["support"] = support_info[coin["key"]]
|
|
|
|
# get rid of address_bytes which are bytes which can't be JSON encoded
|
|
for coin in coins.erc20:
|
|
coin.pop("address_bytes", None)
|
|
|
|
with outfile:
|
|
json.dump(coins, outfile, indent=4, sort_keys=True)
|
|
outfile.write("\n")
|
|
|
|
|
|
@cli.command()
|
|
@click.option("-o", "--outfile", type=click.File(mode="w"), default="./coindefs.json")
|
|
def coindefs(outfile):
|
|
"""Generate signed coin definitions for python-trezor and others
|
|
|
|
This is currently unused but should enable us to add new coins without having to
|
|
update firmware.
|
|
"""
|
|
coins = coin_info.coin_info().bitcoin
|
|
coindefs = {}
|
|
for coin in coins:
|
|
key = coin["key"]
|
|
icon = Image.open(coin["icon"])
|
|
ser = serialize_coindef(coindef_from_dict(coin), convert_icon(icon))
|
|
sig = sign(ser)
|
|
definition = binascii.hexlify(sig + ser).decode("ascii")
|
|
coindefs[key] = definition
|
|
|
|
with outfile:
|
|
json.dump(coindefs, outfile, indent=4, sort_keys=True)
|
|
outfile.write("\n")
|
|
|
|
|
|
@cli.command()
|
|
# fmt: off
|
|
@click.argument("paths", metavar="[path]...", nargs=-1)
|
|
@click.option("-o", "--outfile", type=click.File("w"), help="Alternate output file")
|
|
@click.option("-v", "--verbose", is_flag=True, help="Print rendered file names")
|
|
# fmt: on
|
|
def render(paths, outfile, verbose):
|
|
"""Generate source code from Mako templates.
|
|
|
|
For every "foo.bar.mako" filename passed, runs the template and
|
|
saves the result as "foo.bar". For every directory name passed,
|
|
processes all ".mako" files found in that directory.
|
|
|
|
If `-o` is specified, renders a single file into the specified outfile.
|
|
|
|
If no arguments are given, processes the current directory.
|
|
"""
|
|
if not CAN_RENDER:
|
|
raise click.ClickException("Please install 'mako' and 'munch'")
|
|
|
|
if outfile and (len(paths) != 1 or not os.path.isfile(paths[0])):
|
|
raise click.ClickException("Option -o can only be used with single input file")
|
|
|
|
# prepare defs
|
|
defs = coin_info.coin_info()
|
|
support_info = coin_info.support_info(defs)
|
|
|
|
# munch dicts - make them attribute-accessible
|
|
for key, value in defs.items():
|
|
defs[key] = [Munch(coin) for coin in value]
|
|
for key, value in support_info.items():
|
|
support_info[key] = Munch(value)
|
|
|
|
def do_render(src, dst):
|
|
if verbose:
|
|
click.echo("Rendering {} => {}".format(src, dst))
|
|
render_file(src, dst, defs, support_info)
|
|
|
|
# single in-out case
|
|
if outfile:
|
|
do_render(paths[0], outfile)
|
|
return
|
|
|
|
# find files in directories
|
|
if not paths:
|
|
paths = ["."]
|
|
|
|
files = []
|
|
for path in paths:
|
|
if not os.path.exists(path):
|
|
click.echo("Path {} does not exist".format(path))
|
|
elif os.path.isdir(path):
|
|
files += glob.glob(os.path.join(path, "*.mako"))
|
|
else:
|
|
files.append(path)
|
|
|
|
# render each file
|
|
for file in files:
|
|
if not file.endswith(".mako"):
|
|
click.echo("File {} does not end with .mako".format(file))
|
|
else:
|
|
target = file[: -len(".mako")]
|
|
with open(target, "w") as dst:
|
|
do_render(file, dst)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli()
|