diff --git a/python/.changelog.d/15.added b/python/.changelog.d/15.added new file mode 100644 index 000000000..70ce443ce --- /dev/null +++ b/python/.changelog.d/15.added @@ -0,0 +1 @@ +Signed Ethereum network and token definitions from host diff --git a/python/docs/OPTIONS.rst b/python/docs/OPTIONS.rst index 88de938fe..9fcbe6afc 100644 --- a/python/docs/OPTIONS.rst +++ b/python/docs/OPTIONS.rst @@ -251,8 +251,26 @@ Ethereum commands. Ethereum commands. + Most Ethereum commands now require the host to specify definition of a network and possibly an + ERC-20 token. These definitions can be automatically fetched using the `-a` option. + + You can also specify a custom definition source using the `-d` option. Allowable values are: + + - HTTP or HTTPS URL + - path to local directory + - path to local tar archive +  + + For debugging purposes, it is possible to force use a specific network and token definition by + using the `--network` and `--token` options. These options accept either a path to a file with a + binary blob, or a hex-encoded string. + Options: - --help Show this message and exit. + -d, --definitions TEXT Source for Ethereum definition blobs. + -a, --auto-definitions Automatically download required definitions from trezor.io + --network TEXT Network definition blob. + --token TEXT Token definition blob. + --help Show this message and exit. Commands: get-address Get Ethereum address in hex encoding. diff --git a/python/src/trezorlib/cli/ethereum.py b/python/src/trezorlib/cli/ethereum.py index 9ddbd4021..13b36a8dd 100644 --- a/python/src/trezorlib/cli/ethereum.py +++ b/python/src/trezorlib/cli/ethereum.py @@ -17,7 +17,9 @@ import json import re import sys +import tarfile from decimal import Decimal +from pathlib import Path from typing import ( TYPE_CHECKING, Any, @@ -32,7 +34,8 @@ from typing import ( import click -from .. import ethereum, tools +from .. import definitions, ethereum, tools +from ..messages import EthereumDefinitions from . import with_client if TYPE_CHECKING: @@ -164,14 +167,106 @@ def _format_access_list( ] +def _hex_or_file(data: str) -> bytes: + path = Path(data) + if path.is_file(): + return path.read_bytes() + + if data.startswith("0x"): + data = data[2:] + try: + return bytes.fromhex(data) + except ValueError as e: + raise click.ClickException(f"Invalid hex or file path: {data}") from e + + +class CliSource(definitions.Source): + network: Optional[bytes] = None + token: Optional[bytes] = None + delegate: definitions.Source = definitions.NullSource() + + def get_network(self, chain_id: int) -> Optional[bytes]: + if self.network is not None: + return self.network + return self.delegate.get_network(chain_id) + + def get_network_by_slip44(self, slip44: int) -> Optional[bytes]: + if self.network is not None: + return self.network + return self.delegate.get_network_by_slip44(slip44) + + def get_token(self, chain_id: int, address: Any) -> Optional[bytes]: + if self.token is not None: + return self.token + return self.delegate.get_token(chain_id, address) + + +DEFINITIONS_SOURCE = CliSource() + + ##################### # # commands start here @click.group(name="ethereum") -def cli() -> None: - """Ethereum commands.""" +@click.option( + "-d", "--definitions", "defs", help="Source for Ethereum definition blobs." +) +@click.option( + "-a", + "--auto-definitions", + is_flag=True, + help="Automatically download required definitions from trezor.io", +) +@click.option("--network", help="Network definition blob.") +@click.option("--token", help="Token definition blob.") +def cli( + defs: Optional[str], + auto_definitions: Optional[bool], + network: Optional[str], + token: Optional[str], +) -> None: + """Ethereum commands. + + Most Ethereum commands now require the host to specify definition of a network + and possibly an ERC-20 token. These definitions can be automatically fetched + using the `-a` option. + + You can also specify a custom definition source using the `-d` option. Allowable + values are: + + \b + - HTTP or HTTPS URL + - path to local directory + - path to local tar archive + \b + + For debugging purposes, it is possible to force use a specific network and token + definition by using the `--network` and `--token` options. These options accept + either a path to a file with a binary blob, or a hex-encoded string. + """ + if auto_definitions: + if defs is not None: + raise click.ClickException( + "Cannot use --definitions and --auto-definitions at the same time." + ) + DEFINITIONS_SOURCE.delegate = definitions.UrlSource() + elif defs is not None: + path = Path(defs) + if path.is_dir(): + DEFINITIONS_SOURCE.delegate = definitions.FilesystemSource(path) + elif path.is_file() and tarfile.is_tarfile(path): + DEFINITIONS_SOURCE.delegate = definitions.TarSource(path) + elif defs.startswith("http"): + DEFINITIONS_SOURCE.delegate = definitions.UrlSource(defs) + else: + raise click.ClickException("Unrecognized --definitions value.") + + if network is not None: + DEFINITIONS_SOURCE.network = _hex_or_file(network) + if token is not None: + DEFINITIONS_SOURCE.token = _hex_or_file(token) @cli.command() @@ -181,7 +276,8 @@ def cli() -> None: def get_address(client: "TrezorClient", address: str, show_display: bool) -> str: """Get Ethereum address in hex encoding.""" address_n = tools.parse_path(address) - return ethereum.get_address(client, address_n, show_display) + network = ethereum.network_from_address_n(address_n, DEFINITIONS_SOURCE) + return ethereum.get_address(client, address_n, show_display, network) @cli.command() @@ -306,8 +402,11 @@ def sign_tx( click.echo("Can't send tokens and custom data at the same time") sys.exit(1) + encoded_network = DEFINITIONS_SOURCE.get_network(chain_id) address_n = tools.parse_path(address) - from_address = ethereum.get_address(client, address_n) + from_address = ethereum.get_address( + client, address_n, encoded_network=encoded_network + ) if token: data = _erc20_contract(token, to_address, amount) @@ -315,8 +414,14 @@ def sign_tx( amount = 0 if data: + # use token definition regardless of whether the data is an ERC-20 transfer + # -- this might prove useful in the future + encoded_token = DEFINITIONS_SOURCE.get_token(chain_id, to_address) data_bytes = ethereum.decode_hex(data) else: + # force use provided token definition even if no data (that is what the user + # seems to want) + encoded_token = DEFINITIONS_SOURCE.token data_bytes = b"" if gas_limit is None: @@ -335,6 +440,11 @@ def sign_tx( assert gas_limit is not None assert nonce is not None + defs = EthereumDefinitions( + encoded_network=encoded_network, + encoded_token=encoded_token, + ) + if is_eip1559: assert max_gas_fee is not None assert max_priority_fee is not None @@ -350,6 +460,7 @@ def sign_tx( max_gas_fee=max_gas_fee, max_priority_fee=max_priority_fee, access_list=access_list, + definitions=defs, ) else: if gas_price is None: @@ -366,6 +477,7 @@ def sign_tx( value=amount, data=data_bytes, chain_id=chain_id, + definitions=defs, ) to = ethereum.decode_hex(to_address) @@ -420,7 +532,8 @@ def sign_tx( def sign_message(client: "TrezorClient", address: str, message: str) -> Dict[str, str]: """Sign message with Ethereum address.""" address_n = tools.parse_path(address) - ret = ethereum.sign_message(client, address_n, message) + network = ethereum.network_from_address_n(address_n, DEFINITIONS_SOURCE) + ret = ethereum.sign_message(client, address_n, message, network) output = { "message": message, "address": ret.address, @@ -448,9 +561,15 @@ def sign_typed_data( - recursive structs """ address_n = tools.parse_path(address) + network = ethereum.network_from_address_n(address_n, DEFINITIONS_SOURCE) + defs = EthereumDefinitions(encoded_network=network) data = json.loads(file.read()) ret = ethereum.sign_typed_data( - client, address_n, data, metamask_v4_compat=metamask_v4_compat + client, + address_n, + data, + metamask_v4_compat=metamask_v4_compat, + definitions=defs, ) output = { "address": ret.address, @@ -490,7 +609,10 @@ def sign_typed_data_hash( address_n = tools.parse_path(address) domain_hash = ethereum.decode_hex(domain_hash_hex) message_hash = ethereum.decode_hex(message_hash_hex) if message_hash_hex else None - ret = ethereum.sign_typed_data_hash(client, address_n, domain_hash, message_hash) + network = ethereum.network_from_address_n(address_n, DEFINITIONS_SOURCE) + ret = ethereum.sign_typed_data_hash( + client, address_n, domain_hash, message_hash, network + ) output = { "domain_hash": domain_hash_hex, "message_hash": message_hash_hex, diff --git a/python/src/trezorlib/definitions.py b/python/src/trezorlib/definitions.py new file mode 100644 index 000000000..2960678c3 --- /dev/null +++ b/python/src/trezorlib/definitions.py @@ -0,0 +1,145 @@ +import logging +import tarfile +import typing as t +from pathlib import Path + +import construct as c +import requests +from construct_classes import Struct, subcon + +from . import cosi, merkle_tree +from .messages import EthereumDefinitionType +from .tools import EnumAdapter + +LOG = logging.getLogger(__name__) + +FORMAT_MAGIC = b"trzd1" +DEFS_BASE_URL = "https://data.trezor.io/firmware/eth-definitions/" + +DEFINITIONS_DEV_SIGS_REQUIRED = 1 +DEFINITIONS_DEV_PUBLIC_KEYS = [ + bytes.fromhex(key) + for key in ("db995fe25169d141cab9bbba92baa01f9f2e1ece7df4cb2ac05190f37fcc1f9d",) +] + +DEFINITIONS_SIGS_REQUIRED = 2 +DEFINITIONS_PUBLIC_KEYS = [ + bytes.fromhex(key) + for key in ( + "4334996343623e462f0fc93311fef1484ca23d2ff1eec6df1fa8eb7e3573b3db", + "a9a22cc265a0cb1d6cb329bc0e60bc45df76b9ab28fb87b61136feaf8d8fdc96", + "b8d2b21de27124f0511f903ae7e60e07961810a0b8f28ea755fa50367a8a2b8b", + ) +] + + +ProofFormat = c.PrefixedArray(c.Int8ul, c.Bytes(32)) + + +class DefinitionPayload(Struct): + magic: bytes + data_type: EthereumDefinitionType + timestamp: int + data: bytes + + SUBCON = c.Struct( + "magic" / c.Const(FORMAT_MAGIC), + "data_type" / EnumAdapter(c.Int8ul, EthereumDefinitionType), + "timestamp" / c.Int32ul, + "data" / c.Prefixed(c.Int16ul, c.GreedyBytes), + ) + + +class Definition(Struct): + payload: DefinitionPayload = subcon(DefinitionPayload) + proof: t.List[bytes] + sigmask: int + signature: bytes + + SUBCON = c.Struct( + "payload" / DefinitionPayload.SUBCON, + "proof" / ProofFormat, + "sigmask" / c.Int8ul, + "signature" / c.Bytes(64), + ) + + def verify(self, dev: bool = False) -> None: + payload = self.payload.build() + root = merkle_tree.evaluate_proof(payload, self.proof) + cosi.verify( + self.signature, + root, + DEFINITIONS_DEV_SIGS_REQUIRED, + DEFINITIONS_DEV_PUBLIC_KEYS, + self.sigmask, + ) + + +class Source: + def fetch_path(self, *components: str) -> t.Optional[bytes]: + raise NotImplementedError + + def get_network_by_slip44(self, slip44: int) -> t.Optional[bytes]: + return self.fetch_path("slip44", str(slip44), "network.dat") + + def get_network(self, chain_id: int) -> t.Optional[bytes]: + return self.fetch_path("chain-id", str(chain_id), "network.dat") + + def get_token(self, chain_id: int, address: t.AnyStr) -> t.Optional[bytes]: + if isinstance(address, bytes): + address_str = address.hex() + elif address.startswith("0x"): + address_str = address[2:] + else: + address_str = address + + address_str = address_str.lower() + + return self.fetch_path("chain-id", f"{chain_id}", f"token-{address_str}.dat") + + +class NullSource(Source): + def fetch_path(self, *components: str) -> t.Optional[bytes]: + return None + + +class FilesystemSource(Source): + def __init__(self, root: Path) -> None: + self.root = root + + def fetch_path(self, *components: str) -> t.Optional[bytes]: + path = self.root.joinpath(*components) + if not path.exists(): + LOG.info("Requested definition at %s was not found", path) + return None + LOG.info("Reading definition from %s", path) + return path.read_bytes() + + +class UrlSource(Source): + def __init__(self, base_url: str = DEFS_BASE_URL) -> None: + self.base_url = base_url + + def fetch_path(self, *components: str) -> t.Optional[bytes]: + url = self.base_url + "/".join(components) + LOG.info("Downloading definition from %s", url) + r = requests.get(url) + if r.status_code == 404: + LOG.info("Requested definition at %s was not found", url) + return None + r.raise_for_status() + return r.content + + +class TarSource(Source): + def __init__(self, path: Path) -> None: + self.archive = tarfile.open(path) + + def fetch_path(self, *components: str) -> t.Optional[bytes]: + inner_name = "/".join(components) + LOG.info("Extracting definition from %s:%s", self.archive.name, inner_name) + try: + return self.archive.extractfile(inner_name).read() # type: ignore [not a known member] + except Exception: + LOG.info("Requested definition at %s was not found", inner_name) + return None diff --git a/python/src/trezorlib/ethereum.py b/python/src/trezorlib/ethereum.py index 01a9d543c..92c1c2395 100644 --- a/python/src/trezorlib/ethereum.py +++ b/python/src/trezorlib/ethereum.py @@ -17,8 +17,8 @@ import re from typing import TYPE_CHECKING, Any, AnyStr, Dict, List, Optional, Tuple -from . import exceptions, messages -from .tools import expect, prepare_message_bytes, session +from . import definitions, exceptions, messages +from .tools import expect, prepare_message_bytes, session, unharden if TYPE_CHECKING: from .client import TrezorClient @@ -141,15 +141,39 @@ def encode_data(value: Any, type_name: str) -> bytes: raise ValueError(f"Unsupported data type for direct field encoding: {type_name}") +def network_from_address_n( + address_n: "Address", + source: definitions.Source, +) -> Optional[bytes]: + """Get network definition bytes based on address_n. + + Tries to extract the slip44 identifier and lookup the network definition. + Returns None on failure. + """ + if len(address_n) < 2: + return None + + # unharden the slip44 part if needed + slip44 = unharden(address_n[1]) + return source.get_network_by_slip44(slip44) + + # ====== Client functions ====== # @expect(messages.EthereumAddress, field="address", ret_type=str) def get_address( - client: "TrezorClient", n: "Address", show_display: bool = False + client: "TrezorClient", + n: "Address", + show_display: bool = False, + encoded_network: Optional[bytes] = None, ) -> "MessageType": return client.call( - messages.EthereumGetAddress(address_n=n, show_display=show_display) + messages.EthereumGetAddress( + address_n=n, + show_display=show_display, + encoded_network=encoded_network, + ) ) @@ -174,6 +198,7 @@ def sign_tx( data: Optional[bytes] = None, chain_id: Optional[int] = None, tx_type: Optional[int] = None, + definitions: Optional[messages.EthereumDefinitions] = None, ) -> Tuple[int, bytes, bytes]: if chain_id is None: raise exceptions.TrezorException("Chain ID cannot be undefined") @@ -187,6 +212,7 @@ def sign_tx( to=to, chain_id=chain_id, tx_type=tx_type, + definitions=definitions, ) if data is None: @@ -231,6 +257,7 @@ def sign_tx_eip1559( max_gas_fee: int, max_priority_fee: int, access_list: Optional[List[messages.EthereumAccessList]] = None, + definitions: Optional[messages.EthereumDefinitions] = None, ) -> Tuple[int, bytes, bytes]: length = len(data) data, chunk = data[1024:], data[:1024] @@ -246,6 +273,7 @@ def sign_tx_eip1559( access_list=access_list, data_length=length, data_initial_chunk=chunk, + definitions=definitions, ) response = client.call(msg) @@ -265,11 +293,16 @@ def sign_tx_eip1559( @expect(messages.EthereumMessageSignature) def sign_message( - client: "TrezorClient", n: "Address", message: AnyStr + client: "TrezorClient", + n: "Address", + message: AnyStr, + encoded_network: Optional[bytes] = None, ) -> "MessageType": return client.call( messages.EthereumSignMessage( - address_n=n, message=prepare_message_bytes(message) + address_n=n, + message=prepare_message_bytes(message), + encoded_network=encoded_network, ) ) @@ -281,6 +314,7 @@ def sign_typed_data( data: Dict[str, Any], *, metamask_v4_compat: bool = True, + definitions: Optional[messages.EthereumDefinitions] = None, ) -> "MessageType": data = sanitize_typed_data(data) types = data["types"] @@ -289,6 +323,7 @@ def sign_typed_data( address_n=n, primary_type=data["primaryType"], metamask_v4_compat=metamask_v4_compat, + definitions=definitions, ) response = client.call(request) @@ -369,11 +404,13 @@ def sign_typed_data_hash( n: "Address", domain_hash: bytes, message_hash: Optional[bytes], + encoded_network: Optional[bytes] = None, ) -> "MessageType": return client.call( messages.EthereumSignTypedHash( address_n=n, domain_separator_hash=domain_hash, message_hash=message_hash, + encoded_network=encoded_network, ) ) diff --git a/python/src/trezorlib/merkle_tree.py b/python/src/trezorlib/merkle_tree.py new file mode 100755 index 000000000..117382eda --- /dev/null +++ b/python/src/trezorlib/merkle_tree.py @@ -0,0 +1,178 @@ +# This file is part of the Trezor project. +# +# Copyright (C) 2012-2022 SatoshiLabs and contributors +# +# This library is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the License along with this library. +# If not, see . + +import typing as t +from hashlib import sha256 + +from typing_extensions import Protocol + + +def leaf_hash(value: bytes) -> bytes: + """Calculate a hash of a leaf node based on its value. + + See documentation for `MerkleTree` for details. + """ + return sha256(b"\x00" + value).digest() + + +def internal_hash(left: bytes, right: bytes) -> bytes: + """Calculate a hash of an internal node based on its child nodes. + + See documentation for `MerkleTree` for details. + """ + hash_a = min(left, right) + hash_b = max(left, right) + return sha256(b"\x01" + hash_a + hash_b).digest() + + +class NodeType(Protocol): + """Merkle tree node.""" + + tree_hash: bytes + """Merkle root hash of the subtree rooted at this node.""" + + def add_to_proof_list(self, proof_entry: bytes) -> None: + """Add a proof entry to the proof list of this node.""" + ... + + +class Leaf: + """Leaf of a Merkle tree.""" + + def __init__(self, value: bytes) -> None: + self.tree_hash = leaf_hash(value) + self.proof: t.List[bytes] = [] + + def add_to_proof_list(self, proof_entry: bytes) -> None: + self.proof.append(proof_entry) + + +class Node: + """Internal node of a Merkle tree. + + Does not have its own proof, but helps to build the proof of its children by passing + the respective proof entries to them. + """ + + def __init__(self, left: NodeType, right: NodeType) -> None: + self.left = left + self.right = right + self.left.add_to_proof_list(self.right.tree_hash) + self.right.add_to_proof_list(self.left.tree_hash) + self.tree_hash = internal_hash(self.left.tree_hash, self.right.tree_hash) + + def add_to_proof_list(self, proof_entry: bytes) -> None: + self.left.add_to_proof_list(proof_entry) + self.right.add_to_proof_list(proof_entry) + + +class MerkleTree: + """Merkle tree for a list of byte values. + + The tree is built up as follows: + + 1. Order the leaves by their hash. + 2. Build up the next level up by pairing the leaves in the current level from left + to right. + 3. Any left-over odd node at the current level gets pushed to the next level. + 4. Repeat until there is only one node left. + + Values are not saved in the tree, only their hashes. This allows us to construct a + tree with very large values without having to keep them in memory. + + Semantically, the tree operates as a set, but this implementation does not check for + duplicates. If the same value is added multiple times, the resulting tree will be + different from a tree with only one instance of the value. In addition, only one of + the several possible proofs for the repeated value is retrievable. + + Proof hashes are constructed as follows: + + - Leaf node entries are hashes of b"\x00" + value. + - Internal node entries are hashes of b"\x01" + min(left, right) + max(left, right). + + The prefixes function to distinguish leaf nodes from internal nodes. This prevents + two attacks: + + (a) An attacker cannot misuse a proof for an internal node to claim that + is a member of the tree. + (b) An attacker cannot insert a leaf node in the format of + that is itself an internal node of a different tree. This would allow the + attacker to expand the tree with their own subtree. + + Ordering the internal node entry as min(left, right) + max(left, right) simplifies + the proof format and verifier code: when constructing the internal entry, the + verifier does not need to distinguish between left and right subtree. + """ + + entries: t.Dict[bytes, Leaf] + """Map of leaf hash -> leaf node. + + Use `leaf_hash` to calculate the hash of a value, or use `get_proof(value)` + to access the proof directly. + """ + root: NodeType + """Root node of the tree.""" + + def __init__(self, values: t.Iterable[bytes]) -> None: + leaves = [Leaf(value) for value in values] + leaves.sort(key=lambda leaf: leaf.tree_hash) + + if not leaves: + raise ValueError("Merkle tree must have at least one value") + + self.entries = {leaf.tree_hash: leaf for leaf in leaves} + + # build the tree + current_level = leaves + while len(current_level) > 1: + # build one level of the tree + next_level = [] + while len(current_level) >= 2: + left, right, *current_level = current_level + next_level.append(Node(left, right)) + + # add the remaining one or zero nodes to the next level + next_level.extend(current_level) + + # switch levels and continue + current_level = next_level + + assert len(current_level) == 1, "Tree must have exactly one root node" + # save the root + self.root = current_level[0] + + def get_root_hash(self) -> bytes: + return self.root.tree_hash + + def get_proof(self, value: bytes) -> t.List[bytes]: + """Get the proof for a given value.""" + try: + return self.entries[leaf_hash(value)].proof + except KeyError: + raise KeyError("Value not found in Merkle tree") from None + + +def evaluate_proof(value: bytes, proof: t.List[bytes]) -> bytes: + """Evaluate the provided proof of membership. + + Reconstructs the Merkle root hash for a tree that contains `value` as a leaf node, + proving membership in a Merkle tree with the given root hash. The result can be + compared to a statically known root hash, or a signature of it can be verified. + """ + hash = leaf_hash(value) + for proof_entry in proof: + hash = internal_hash(hash, proof_entry) + return hash diff --git a/python/src/trezorlib/messages.py b/python/src/trezorlib/messages.py index a9ecc2810..7eab11392 100644 --- a/python/src/trezorlib/messages.py +++ b/python/src/trezorlib/messages.py @@ -492,6 +492,11 @@ class DebugButton(IntEnum): INFO = 2 +class EthereumDefinitionType(IntEnum): + NETWORK = 0 + TOKEN = 1 + + class EthereumDataType(IntEnum): UINT = 1 INT = 2 @@ -4555,12 +4560,79 @@ class EosActionUnknown(protobuf.MessageType): self.data_chunk = data_chunk +class EthereumNetworkInfo(protobuf.MessageType): + MESSAGE_WIRE_TYPE = None + FIELDS = { + 1: protobuf.Field("chain_id", "uint64", repeated=False, required=True), + 2: protobuf.Field("symbol", "string", repeated=False, required=True), + 3: protobuf.Field("slip44", "uint32", repeated=False, required=True), + 4: protobuf.Field("name", "string", repeated=False, required=True), + } + + def __init__( + self, + *, + chain_id: "int", + symbol: "str", + slip44: "int", + name: "str", + ) -> None: + self.chain_id = chain_id + self.symbol = symbol + self.slip44 = slip44 + self.name = name + + +class EthereumTokenInfo(protobuf.MessageType): + MESSAGE_WIRE_TYPE = None + FIELDS = { + 1: protobuf.Field("address", "bytes", repeated=False, required=True), + 2: protobuf.Field("chain_id", "uint64", repeated=False, required=True), + 3: protobuf.Field("symbol", "string", repeated=False, required=True), + 4: protobuf.Field("decimals", "uint32", repeated=False, required=True), + 5: protobuf.Field("name", "string", repeated=False, required=True), + } + + def __init__( + self, + *, + address: "bytes", + chain_id: "int", + symbol: "str", + decimals: "int", + name: "str", + ) -> None: + self.address = address + self.chain_id = chain_id + self.symbol = symbol + self.decimals = decimals + self.name = name + + +class EthereumDefinitions(protobuf.MessageType): + MESSAGE_WIRE_TYPE = None + FIELDS = { + 1: protobuf.Field("encoded_network", "bytes", repeated=False, required=False, default=None), + 2: protobuf.Field("encoded_token", "bytes", repeated=False, required=False, default=None), + } + + def __init__( + self, + *, + encoded_network: Optional["bytes"] = None, + encoded_token: Optional["bytes"] = None, + ) -> None: + self.encoded_network = encoded_network + self.encoded_token = encoded_token + + class EthereumSignTypedData(protobuf.MessageType): MESSAGE_WIRE_TYPE = 464 FIELDS = { 1: protobuf.Field("address_n", "uint32", repeated=True, required=False, default=None), 2: protobuf.Field("primary_type", "string", repeated=False, required=True), 3: protobuf.Field("metamask_v4_compat", "bool", repeated=False, required=False, default=True), + 4: protobuf.Field("definitions", "EthereumDefinitions", repeated=False, required=False, default=None), } def __init__( @@ -4569,10 +4641,12 @@ class EthereumSignTypedData(protobuf.MessageType): primary_type: "str", address_n: Optional[Sequence["int"]] = None, metamask_v4_compat: Optional["bool"] = True, + definitions: Optional["EthereumDefinitions"] = None, ) -> None: self.address_n: Sequence["int"] = address_n if address_n is not None else [] self.primary_type = primary_type self.metamask_v4_compat = metamask_v4_compat + self.definitions = definitions class EthereumTypedDataStructRequest(protobuf.MessageType): @@ -4710,6 +4784,7 @@ class EthereumGetAddress(protobuf.MessageType): FIELDS = { 1: protobuf.Field("address_n", "uint32", repeated=True, required=False, default=None), 2: protobuf.Field("show_display", "bool", repeated=False, required=False, default=None), + 3: protobuf.Field("encoded_network", "bytes", repeated=False, required=False, default=None), } def __init__( @@ -4717,9 +4792,11 @@ class EthereumGetAddress(protobuf.MessageType): *, address_n: Optional[Sequence["int"]] = None, show_display: Optional["bool"] = None, + encoded_network: Optional["bytes"] = None, ) -> None: self.address_n: Sequence["int"] = address_n if address_n is not None else [] self.show_display = show_display + self.encoded_network = encoded_network class EthereumAddress(protobuf.MessageType): @@ -4752,6 +4829,7 @@ class EthereumSignTx(protobuf.MessageType): 8: protobuf.Field("data_length", "uint32", repeated=False, required=False, default=0), 9: protobuf.Field("chain_id", "uint64", repeated=False, required=True), 10: protobuf.Field("tx_type", "uint32", repeated=False, required=False, default=None), + 12: protobuf.Field("definitions", "EthereumDefinitions", repeated=False, required=False, default=None), } def __init__( @@ -4767,6 +4845,7 @@ class EthereumSignTx(protobuf.MessageType): data_initial_chunk: Optional["bytes"] = b'', data_length: Optional["int"] = 0, tx_type: Optional["int"] = None, + definitions: Optional["EthereumDefinitions"] = None, ) -> None: self.address_n: Sequence["int"] = address_n if address_n is not None else [] self.gas_price = gas_price @@ -4778,6 +4857,7 @@ class EthereumSignTx(protobuf.MessageType): self.data_initial_chunk = data_initial_chunk self.data_length = data_length self.tx_type = tx_type + self.definitions = definitions class EthereumSignTxEIP1559(protobuf.MessageType): @@ -4794,6 +4874,7 @@ class EthereumSignTxEIP1559(protobuf.MessageType): 9: protobuf.Field("data_length", "uint32", repeated=False, required=True), 10: protobuf.Field("chain_id", "uint64", repeated=False, required=True), 11: protobuf.Field("access_list", "EthereumAccessList", repeated=True, required=False, default=None), + 12: protobuf.Field("definitions", "EthereumDefinitions", repeated=False, required=False, default=None), } def __init__( @@ -4810,6 +4891,7 @@ class EthereumSignTxEIP1559(protobuf.MessageType): access_list: Optional[Sequence["EthereumAccessList"]] = None, to: Optional["str"] = '', data_initial_chunk: Optional["bytes"] = b'', + definitions: Optional["EthereumDefinitions"] = None, ) -> None: self.address_n: Sequence["int"] = address_n if address_n is not None else [] self.access_list: Sequence["EthereumAccessList"] = access_list if access_list is not None else [] @@ -4822,6 +4904,7 @@ class EthereumSignTxEIP1559(protobuf.MessageType): self.chain_id = chain_id self.to = to self.data_initial_chunk = data_initial_chunk + self.definitions = definitions class EthereumTxRequest(protobuf.MessageType): @@ -4866,6 +4949,7 @@ class EthereumSignMessage(protobuf.MessageType): FIELDS = { 1: protobuf.Field("address_n", "uint32", repeated=True, required=False, default=None), 2: protobuf.Field("message", "bytes", repeated=False, required=True), + 3: protobuf.Field("encoded_network", "bytes", repeated=False, required=False, default=None), } def __init__( @@ -4873,9 +4957,11 @@ class EthereumSignMessage(protobuf.MessageType): *, message: "bytes", address_n: Optional[Sequence["int"]] = None, + encoded_network: Optional["bytes"] = None, ) -> None: self.address_n: Sequence["int"] = address_n if address_n is not None else [] self.message = message + self.encoded_network = encoded_network class EthereumMessageSignature(protobuf.MessageType): @@ -4921,6 +5007,7 @@ class EthereumSignTypedHash(protobuf.MessageType): 1: protobuf.Field("address_n", "uint32", repeated=True, required=False, default=None), 2: protobuf.Field("domain_separator_hash", "bytes", repeated=False, required=True), 3: protobuf.Field("message_hash", "bytes", repeated=False, required=False, default=None), + 4: protobuf.Field("encoded_network", "bytes", repeated=False, required=False, default=None), } def __init__( @@ -4929,10 +5016,12 @@ class EthereumSignTypedHash(protobuf.MessageType): domain_separator_hash: "bytes", address_n: Optional[Sequence["int"]] = None, message_hash: Optional["bytes"] = None, + encoded_network: Optional["bytes"] = None, ) -> None: self.address_n: Sequence["int"] = address_n if address_n is not None else [] self.domain_separator_hash = domain_separator_hash self.message_hash = message_hash + self.encoded_network = encoded_network class EthereumTypedDataSignature(protobuf.MessageType): diff --git a/python/tests/test_merkle_tree.py b/python/tests/test_merkle_tree.py new file mode 100644 index 000000000..3f441d4c8 --- /dev/null +++ b/python/tests/test_merkle_tree.py @@ -0,0 +1,108 @@ +# This file is part of the Trezor project. +# +# Copyright (C) 2012-2022 SatoshiLabs and contributors +# +# This library is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the License along with this library. +# If not, see . + +import pytest + +import typing as t + +from trezorlib.merkle_tree import ( + MerkleTree, + Leaf, + Node, + leaf_hash, + internal_hash, + evaluate_proof, +) + + +NODE_VECTORS = ( # node, expected_hash + ( # leaf node + Leaf(b"hello"), + "8a2a5c9b768827de5a9552c38a044c66959c68f6d2f21b5260af54d2f87db827", + ), + ( # node with leaf nodes + Node(left=Leaf(b"hello"), right=Leaf(b"world")), + "24233339aadcedf287d262413f03c028eb8db397edd32a2878091151b99bf20f", + ), + ( # asymmetric node with leaf hanging on second level + Node(left=Node(left=Leaf(b"hello"), right=Leaf(b"world")), right=Leaf(b"!")), + "c3727420dc97c0dbd89678ee195957e44cfa69f5759b395a07bc171b21468633", + ), +) + + +MERKLE_TREE_VECTORS = ( + ( # one value + # values + [b"Merkle"], + # expected root hash + leaf_hash(b"Merkle"), + # expected dict of proof lists + { + b"Merkle": [], + }, + ), + ( # two values + # values + [b"Haber", b"Stornetta"], + # expected root hash + internal_hash( + leaf_hash(b"Haber"), + leaf_hash(b"Stornetta"), + ), + # expected dict of proof lists + { + b"Haber": [leaf_hash(b"Stornetta")], + b"Stornetta": [leaf_hash(b"Haber")], + }, + ), + ( # three values + # values + [b"Andersen", b"Wuille", b"Maxwell"], + # expected root hash + internal_hash( + internal_hash( + leaf_hash(b"Maxwell"), + leaf_hash(b"Wuille"), + ), + leaf_hash(b"Andersen"), + ), + # expected dict of proof lists + { + b"Andersen": [internal_hash(leaf_hash(b"Maxwell"), leaf_hash(b"Wuille"))], + b"Maxwell": [leaf_hash(b"Wuille"), leaf_hash(b"Andersen")], + b"Wuille": [leaf_hash(b"Maxwell"), leaf_hash(b"Andersen")], + }, + ), +) + + +@pytest.mark.parametrize("node, expected_hash", NODE_VECTORS) +def test_node(node: t.Union[Node, Leaf], expected_hash: str) -> None: + assert node.tree_hash.hex() == expected_hash + + +@pytest.mark.parametrize("values, root_hash, proofs", MERKLE_TREE_VECTORS) +def test_tree( + values: t.List[bytes], + root_hash: bytes, + proofs: t.Dict[bytes, t.List[bytes]], +) -> None: + mt = MerkleTree(values) + assert mt.get_root_hash() == root_hash + for value, proof in proofs.items(): + assert mt.get_proof(value) == proof + assert evaluate_proof(value, proof) == root_hash diff --git a/tools/snippets/README.md b/tools/snippets/README.md index 29e2180fd..11119d52a 100644 --- a/tools/snippets/README.md +++ b/tools/snippets/README.md @@ -17,3 +17,7 @@ These scripts do not need to have a high standard, but each of those should have ## [monero_unused_functions](./monero_unused_functions.py) - Find functions from Monero cryptography that are not used. + +## [eth_defs_unpack.py](./eth_defs_unpack.py) +- Unpacks the definitions from a `definitions-sparse.zip` that does not contain the + Merkle proofs for space-saving. This format is not currently distributed. diff --git a/tools/snippets/eth_defs_unpack.py b/tools/snippets/eth_defs_unpack.py new file mode 100755 index 000000000..ea96c4812 --- /dev/null +++ b/tools/snippets/eth_defs_unpack.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 + +# This file is part of the Trezor project. +# +# Copyright (C) 2012-2022 SatoshiLabs and contributors +# +# This library is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the License along with this library. +# If not, see . +from __future__ import annotations + +import click +import requests +import zipfile +from pathlib import Path + +from trezorlib import definitions, merkle_tree + +ZIP_FILENAME = "definitions-sparse.zip" + +TOPDIRS = ("chain-id", "slip44") + + +class SparseZipSource(definitions.Source): + def __init__(self, zip: Path | zipfile.ZipFile) -> None: + if isinstance(zip, Path): + self.zip = zipfile.ZipFile(zip) + else: + self.zip = zip + + # extract signature + self.signature = self.read_bytes("signature.dat") + self.root_hash = self.read_bytes("root.dat") + + # construct a Merkle tree + entries = [] + for name in self.zip.namelist(): + if name.startswith("chain-id/"): + entries.append(self.read_bytes(name)) + entries.sort() + self.merkle_tree = merkle_tree.MerkleTree(entries) + + if self.root_hash != self.merkle_tree.get_root_hash(): + raise ValueError("Failed to reconstruct the correct Merkle tree") + + def read_bytes(self, path: str | Path) -> bytes: + with self.zip.open(str(path)) as f: + return f.read() + + def fetch_path(self, *components: str) -> bytes | None: + path = "/".join(components) + data = self.read_bytes(path) + proof = self.merkle_tree.get_proof(data) + proof_bytes = definitions.ProofFormat.build(proof) + return data + proof_bytes + self.signature + + +@click.command() +@click.option( + "-z", + "--definitions-zip", + type=click.Path(exists=True, dir_okay=False, resolve_path=True, path_type=Path), + help="Local zip file with stored definitions.", +) +@click.argument( + "outdir", + type=click.Path(resolve_path=True, file_okay=False, writable=True, path_type=Path), +) +def unpack_definitions(definitions_zip: Path, outdir: Path) -> None: + """Script that unpacks and completes (insert missing Merkle Tree proofs + into the definitions) the Ethereum definitions (networks and tokens). + + If no local zip is provided, the latest one will be downloaded from trezor.io. + """ + if definitions_zip is None: + result = requests.get(definitions.DEFS_BASE_URL + ZIP_FILENAME) + result.raise_for_status() + zip = zipfile.ZipFile(result.raw) + else: + zip = zipfile.ZipFile(definitions_zip) + + source = SparseZipSource(zip) + + if not outdir.exists(): + outdir.mkdir() + + for name in zip.namelist(): + if name == "signature.dat" or not name.endswith(".dat"): + continue + + local_path = outdir / name + local_path.parent.mkdir(parents=True, exist_ok=True) + data = source.fetch_path(name) + assert data is not None, f"Could not read data for: {name}" + local_path.write_bytes(data) + + +if __name__ == "__main__": + unpack_definitions()