feat(python): support external Ethereum definitions

pull/2914/head
Martin Novák 1 year ago committed by matejcik
parent 0f07d74063
commit e6a7b9ccaa

@ -0,0 +1 @@
Signed Ethereum network and token definitions from host

@ -251,8 +251,26 @@ Ethereum commands.
Ethereum commands.
Most Ethereum commands now require the host to specify definition of a network and possibly an
ERC-20 token. These definitions can be automatically fetched using the `-a` option.
You can also specify a custom definition source using the `-d` option. Allowable values are:
- HTTP or HTTPS URL
- path to local directory
- path to local tar archive

For debugging purposes, it is possible to force use a specific network and token definition by
using the `--network` and `--token` options. These options accept either a path to a file with a
binary blob, or a hex-encoded string.
Options:
--help Show this message and exit.
-d, --definitions TEXT Source for Ethereum definition blobs.
-a, --auto-definitions Automatically download required definitions from trezor.io
--network TEXT Network definition blob.
--token TEXT Token definition blob.
--help Show this message and exit.
Commands:
get-address Get Ethereum address in hex encoding.

@ -17,7 +17,9 @@
import json
import re
import sys
import tarfile
from decimal import Decimal
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
@ -32,7 +34,8 @@ from typing import (
import click
from .. import ethereum, tools
from .. import definitions, ethereum, tools
from ..messages import EthereumDefinitions
from . import with_client
if TYPE_CHECKING:
@ -164,14 +167,106 @@ def _format_access_list(
]
def _hex_or_file(data: str) -> bytes:
path = Path(data)
if path.is_file():
return path.read_bytes()
if data.startswith("0x"):
data = data[2:]
try:
return bytes.fromhex(data)
except ValueError as e:
raise click.ClickException(f"Invalid hex or file path: {data}") from e
class CliSource(definitions.Source):
network: Optional[bytes] = None
token: Optional[bytes] = None
delegate: definitions.Source = definitions.NullSource()
def get_network(self, chain_id: int) -> Optional[bytes]:
if self.network is not None:
return self.network
return self.delegate.get_network(chain_id)
def get_network_by_slip44(self, slip44: int) -> Optional[bytes]:
if self.network is not None:
return self.network
return self.delegate.get_network_by_slip44(slip44)
def get_token(self, chain_id: int, address: Any) -> Optional[bytes]:
if self.token is not None:
return self.token
return self.delegate.get_token(chain_id, address)
DEFINITIONS_SOURCE = CliSource()
#####################
#
# commands start here
@click.group(name="ethereum")
def cli() -> None:
"""Ethereum commands."""
@click.option(
"-d", "--definitions", "defs", help="Source for Ethereum definition blobs."
)
@click.option(
"-a",
"--auto-definitions",
is_flag=True,
help="Automatically download required definitions from trezor.io",
)
@click.option("--network", help="Network definition blob.")
@click.option("--token", help="Token definition blob.")
def cli(
defs: Optional[str],
auto_definitions: Optional[bool],
network: Optional[str],
token: Optional[str],
) -> None:
"""Ethereum commands.
Most Ethereum commands now require the host to specify definition of a network
and possibly an ERC-20 token. These definitions can be automatically fetched
using the `-a` option.
You can also specify a custom definition source using the `-d` option. Allowable
values are:
\b
- HTTP or HTTPS URL
- path to local directory
- path to local tar archive
\b
For debugging purposes, it is possible to force use a specific network and token
definition by using the `--network` and `--token` options. These options accept
either a path to a file with a binary blob, or a hex-encoded string.
"""
if auto_definitions:
if defs is not None:
raise click.ClickException(
"Cannot use --definitions and --auto-definitions at the same time."
)
DEFINITIONS_SOURCE.delegate = definitions.UrlSource()
elif defs is not None:
path = Path(defs)
if path.is_dir():
DEFINITIONS_SOURCE.delegate = definitions.FilesystemSource(path)
elif path.is_file() and tarfile.is_tarfile(path):
DEFINITIONS_SOURCE.delegate = definitions.TarSource(path)
elif defs.startswith("http"):
DEFINITIONS_SOURCE.delegate = definitions.UrlSource(defs)
else:
raise click.ClickException("Unrecognized --definitions value.")
if network is not None:
DEFINITIONS_SOURCE.network = _hex_or_file(network)
if token is not None:
DEFINITIONS_SOURCE.token = _hex_or_file(token)
@cli.command()
@ -181,7 +276,8 @@ def cli() -> None:
def get_address(client: "TrezorClient", address: str, show_display: bool) -> str:
"""Get Ethereum address in hex encoding."""
address_n = tools.parse_path(address)
return ethereum.get_address(client, address_n, show_display)
network = ethereum.network_from_address_n(address_n, DEFINITIONS_SOURCE)
return ethereum.get_address(client, address_n, show_display, network)
@cli.command()
@ -306,8 +402,11 @@ def sign_tx(
click.echo("Can't send tokens and custom data at the same time")
sys.exit(1)
encoded_network = DEFINITIONS_SOURCE.get_network(chain_id)
address_n = tools.parse_path(address)
from_address = ethereum.get_address(client, address_n)
from_address = ethereum.get_address(
client, address_n, encoded_network=encoded_network
)
if token:
data = _erc20_contract(token, to_address, amount)
@ -315,8 +414,14 @@ def sign_tx(
amount = 0
if data:
# use token definition regardless of whether the data is an ERC-20 transfer
# -- this might prove useful in the future
encoded_token = DEFINITIONS_SOURCE.get_token(chain_id, to_address)
data_bytes = ethereum.decode_hex(data)
else:
# force use provided token definition even if no data (that is what the user
# seems to want)
encoded_token = DEFINITIONS_SOURCE.token
data_bytes = b""
if gas_limit is None:
@ -335,6 +440,11 @@ def sign_tx(
assert gas_limit is not None
assert nonce is not None
defs = EthereumDefinitions(
encoded_network=encoded_network,
encoded_token=encoded_token,
)
if is_eip1559:
assert max_gas_fee is not None
assert max_priority_fee is not None
@ -350,6 +460,7 @@ def sign_tx(
max_gas_fee=max_gas_fee,
max_priority_fee=max_priority_fee,
access_list=access_list,
definitions=defs,
)
else:
if gas_price is None:
@ -366,6 +477,7 @@ def sign_tx(
value=amount,
data=data_bytes,
chain_id=chain_id,
definitions=defs,
)
to = ethereum.decode_hex(to_address)
@ -420,7 +532,8 @@ def sign_tx(
def sign_message(client: "TrezorClient", address: str, message: str) -> Dict[str, str]:
"""Sign message with Ethereum address."""
address_n = tools.parse_path(address)
ret = ethereum.sign_message(client, address_n, message)
network = ethereum.network_from_address_n(address_n, DEFINITIONS_SOURCE)
ret = ethereum.sign_message(client, address_n, message, network)
output = {
"message": message,
"address": ret.address,
@ -448,9 +561,15 @@ def sign_typed_data(
- recursive structs
"""
address_n = tools.parse_path(address)
network = ethereum.network_from_address_n(address_n, DEFINITIONS_SOURCE)
defs = EthereumDefinitions(encoded_network=network)
data = json.loads(file.read())
ret = ethereum.sign_typed_data(
client, address_n, data, metamask_v4_compat=metamask_v4_compat
client,
address_n,
data,
metamask_v4_compat=metamask_v4_compat,
definitions=defs,
)
output = {
"address": ret.address,
@ -490,7 +609,10 @@ def sign_typed_data_hash(
address_n = tools.parse_path(address)
domain_hash = ethereum.decode_hex(domain_hash_hex)
message_hash = ethereum.decode_hex(message_hash_hex) if message_hash_hex else None
ret = ethereum.sign_typed_data_hash(client, address_n, domain_hash, message_hash)
network = ethereum.network_from_address_n(address_n, DEFINITIONS_SOURCE)
ret = ethereum.sign_typed_data_hash(
client, address_n, domain_hash, message_hash, network
)
output = {
"domain_hash": domain_hash_hex,
"message_hash": message_hash_hex,

@ -0,0 +1,145 @@
import logging
import tarfile
import typing as t
from pathlib import Path
import construct as c
import requests
from construct_classes import Struct, subcon
from . import cosi, merkle_tree
from .messages import EthereumDefinitionType
from .tools import EnumAdapter
LOG = logging.getLogger(__name__)
FORMAT_MAGIC = b"trzd1"
DEFS_BASE_URL = "https://data.trezor.io/firmware/eth-definitions/"
DEFINITIONS_DEV_SIGS_REQUIRED = 1
DEFINITIONS_DEV_PUBLIC_KEYS = [
bytes.fromhex(key)
for key in ("db995fe25169d141cab9bbba92baa01f9f2e1ece7df4cb2ac05190f37fcc1f9d",)
]
DEFINITIONS_SIGS_REQUIRED = 2
DEFINITIONS_PUBLIC_KEYS = [
bytes.fromhex(key)
for key in (
"4334996343623e462f0fc93311fef1484ca23d2ff1eec6df1fa8eb7e3573b3db",
"a9a22cc265a0cb1d6cb329bc0e60bc45df76b9ab28fb87b61136feaf8d8fdc96",
"b8d2b21de27124f0511f903ae7e60e07961810a0b8f28ea755fa50367a8a2b8b",
)
]
ProofFormat = c.PrefixedArray(c.Int8ul, c.Bytes(32))
class DefinitionPayload(Struct):
magic: bytes
data_type: EthereumDefinitionType
timestamp: int
data: bytes
SUBCON = c.Struct(
"magic" / c.Const(FORMAT_MAGIC),
"data_type" / EnumAdapter(c.Int8ul, EthereumDefinitionType),
"timestamp" / c.Int32ul,
"data" / c.Prefixed(c.Int16ul, c.GreedyBytes),
)
class Definition(Struct):
payload: DefinitionPayload = subcon(DefinitionPayload)
proof: t.List[bytes]
sigmask: int
signature: bytes
SUBCON = c.Struct(
"payload" / DefinitionPayload.SUBCON,
"proof" / ProofFormat,
"sigmask" / c.Int8ul,
"signature" / c.Bytes(64),
)
def verify(self, dev: bool = False) -> None:
payload = self.payload.build()
root = merkle_tree.evaluate_proof(payload, self.proof)
cosi.verify(
self.signature,
root,
DEFINITIONS_DEV_SIGS_REQUIRED,
DEFINITIONS_DEV_PUBLIC_KEYS,
self.sigmask,
)
class Source:
def fetch_path(self, *components: str) -> t.Optional[bytes]:
raise NotImplementedError
def get_network_by_slip44(self, slip44: int) -> t.Optional[bytes]:
return self.fetch_path("slip44", str(slip44), "network.dat")
def get_network(self, chain_id: int) -> t.Optional[bytes]:
return self.fetch_path("chain-id", str(chain_id), "network.dat")
def get_token(self, chain_id: int, address: t.AnyStr) -> t.Optional[bytes]:
if isinstance(address, bytes):
address_str = address.hex()
elif address.startswith("0x"):
address_str = address[2:]
else:
address_str = address
address_str = address_str.lower()
return self.fetch_path("chain-id", f"{chain_id}", f"token-{address_str}.dat")
class NullSource(Source):
def fetch_path(self, *components: str) -> t.Optional[bytes]:
return None
class FilesystemSource(Source):
def __init__(self, root: Path) -> None:
self.root = root
def fetch_path(self, *components: str) -> t.Optional[bytes]:
path = self.root.joinpath(*components)
if not path.exists():
LOG.info("Requested definition at %s was not found", path)
return None
LOG.info("Reading definition from %s", path)
return path.read_bytes()
class UrlSource(Source):
def __init__(self, base_url: str = DEFS_BASE_URL) -> None:
self.base_url = base_url
def fetch_path(self, *components: str) -> t.Optional[bytes]:
url = self.base_url + "/".join(components)
LOG.info("Downloading definition from %s", url)
r = requests.get(url)
if r.status_code == 404:
LOG.info("Requested definition at %s was not found", url)
return None
r.raise_for_status()
return r.content
class TarSource(Source):
def __init__(self, path: Path) -> None:
self.archive = tarfile.open(path)
def fetch_path(self, *components: str) -> t.Optional[bytes]:
inner_name = "/".join(components)
LOG.info("Extracting definition from %s:%s", self.archive.name, inner_name)
try:
return self.archive.extractfile(inner_name).read() # type: ignore [not a known member]
except Exception:
LOG.info("Requested definition at %s was not found", inner_name)
return None

@ -17,8 +17,8 @@
import re
from typing import TYPE_CHECKING, Any, AnyStr, Dict, List, Optional, Tuple
from . import exceptions, messages
from .tools import expect, prepare_message_bytes, session
from . import definitions, exceptions, messages
from .tools import expect, prepare_message_bytes, session, unharden
if TYPE_CHECKING:
from .client import TrezorClient
@ -141,15 +141,39 @@ def encode_data(value: Any, type_name: str) -> bytes:
raise ValueError(f"Unsupported data type for direct field encoding: {type_name}")
def network_from_address_n(
address_n: "Address",
source: definitions.Source,
) -> Optional[bytes]:
"""Get network definition bytes based on address_n.
Tries to extract the slip44 identifier and lookup the network definition.
Returns None on failure.
"""
if len(address_n) < 2:
return None
# unharden the slip44 part if needed
slip44 = unharden(address_n[1])
return source.get_network_by_slip44(slip44)
# ====== Client functions ====== #
@expect(messages.EthereumAddress, field="address", ret_type=str)
def get_address(
client: "TrezorClient", n: "Address", show_display: bool = False
client: "TrezorClient",
n: "Address",
show_display: bool = False,
encoded_network: Optional[bytes] = None,
) -> "MessageType":
return client.call(
messages.EthereumGetAddress(address_n=n, show_display=show_display)
messages.EthereumGetAddress(
address_n=n,
show_display=show_display,
encoded_network=encoded_network,
)
)
@ -174,6 +198,7 @@ def sign_tx(
data: Optional[bytes] = None,
chain_id: Optional[int] = None,
tx_type: Optional[int] = None,
definitions: Optional[messages.EthereumDefinitions] = None,
) -> Tuple[int, bytes, bytes]:
if chain_id is None:
raise exceptions.TrezorException("Chain ID cannot be undefined")
@ -187,6 +212,7 @@ def sign_tx(
to=to,
chain_id=chain_id,
tx_type=tx_type,
definitions=definitions,
)
if data is None:
@ -231,6 +257,7 @@ def sign_tx_eip1559(
max_gas_fee: int,
max_priority_fee: int,
access_list: Optional[List[messages.EthereumAccessList]] = None,
definitions: Optional[messages.EthereumDefinitions] = None,
) -> Tuple[int, bytes, bytes]:
length = len(data)
data, chunk = data[1024:], data[:1024]
@ -246,6 +273,7 @@ def sign_tx_eip1559(
access_list=access_list,
data_length=length,
data_initial_chunk=chunk,
definitions=definitions,
)
response = client.call(msg)
@ -265,11 +293,16 @@ def sign_tx_eip1559(
@expect(messages.EthereumMessageSignature)
def sign_message(
client: "TrezorClient", n: "Address", message: AnyStr
client: "TrezorClient",
n: "Address",
message: AnyStr,
encoded_network: Optional[bytes] = None,
) -> "MessageType":
return client.call(
messages.EthereumSignMessage(
address_n=n, message=prepare_message_bytes(message)
address_n=n,
message=prepare_message_bytes(message),
encoded_network=encoded_network,
)
)
@ -281,6 +314,7 @@ def sign_typed_data(
data: Dict[str, Any],
*,
metamask_v4_compat: bool = True,
definitions: Optional[messages.EthereumDefinitions] = None,
) -> "MessageType":
data = sanitize_typed_data(data)
types = data["types"]
@ -289,6 +323,7 @@ def sign_typed_data(
address_n=n,
primary_type=data["primaryType"],
metamask_v4_compat=metamask_v4_compat,
definitions=definitions,
)
response = client.call(request)
@ -369,11 +404,13 @@ def sign_typed_data_hash(
n: "Address",
domain_hash: bytes,
message_hash: Optional[bytes],
encoded_network: Optional[bytes] = None,
) -> "MessageType":
return client.call(
messages.EthereumSignTypedHash(
address_n=n,
domain_separator_hash=domain_hash,
message_hash=message_hash,
encoded_network=encoded_network,
)
)

@ -0,0 +1,178 @@
# This file is part of the Trezor project.
#
# Copyright (C) 2012-2022 SatoshiLabs and contributors
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License version 3
# as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the License along with this library.
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
import typing as t
from hashlib import sha256
from typing_extensions import Protocol
def leaf_hash(value: bytes) -> bytes:
"""Calculate a hash of a leaf node based on its value.
See documentation for `MerkleTree` for details.
"""
return sha256(b"\x00" + value).digest()
def internal_hash(left: bytes, right: bytes) -> bytes:
"""Calculate a hash of an internal node based on its child nodes.
See documentation for `MerkleTree` for details.
"""
hash_a = min(left, right)
hash_b = max(left, right)
return sha256(b"\x01" + hash_a + hash_b).digest()
class NodeType(Protocol):
"""Merkle tree node."""
tree_hash: bytes
"""Merkle root hash of the subtree rooted at this node."""
def add_to_proof_list(self, proof_entry: bytes) -> None:
"""Add a proof entry to the proof list of this node."""
...
class Leaf:
"""Leaf of a Merkle tree."""
def __init__(self, value: bytes) -> None:
self.tree_hash = leaf_hash(value)
self.proof: t.List[bytes] = []
def add_to_proof_list(self, proof_entry: bytes) -> None:
self.proof.append(proof_entry)
class Node:
"""Internal node of a Merkle tree.
Does not have its own proof, but helps to build the proof of its children by passing
the respective proof entries to them.
"""
def __init__(self, left: NodeType, right: NodeType) -> None:
self.left = left
self.right = right
self.left.add_to_proof_list(self.right.tree_hash)
self.right.add_to_proof_list(self.left.tree_hash)
self.tree_hash = internal_hash(self.left.tree_hash, self.right.tree_hash)
def add_to_proof_list(self, proof_entry: bytes) -> None:
self.left.add_to_proof_list(proof_entry)
self.right.add_to_proof_list(proof_entry)
class MerkleTree:
"""Merkle tree for a list of byte values.
The tree is built up as follows:
1. Order the leaves by their hash.
2. Build up the next level up by pairing the leaves in the current level from left
to right.
3. Any left-over odd node at the current level gets pushed to the next level.
4. Repeat until there is only one node left.
Values are not saved in the tree, only their hashes. This allows us to construct a
tree with very large values without having to keep them in memory.
Semantically, the tree operates as a set, but this implementation does not check for
duplicates. If the same value is added multiple times, the resulting tree will be
different from a tree with only one instance of the value. In addition, only one of
the several possible proofs for the repeated value is retrievable.
Proof hashes are constructed as follows:
- Leaf node entries are hashes of b"\x00" + value.
- Internal node entries are hashes of b"\x01" + min(left, right) + max(left, right).
The prefixes function to distinguish leaf nodes from internal nodes. This prevents
two attacks:
(a) An attacker cannot misuse a proof for an internal node to claim that
<internal-node-entry> is a member of the tree.
(b) An attacker cannot insert a leaf node in the format of <internal-node-entry>
that is itself an internal node of a different tree. This would allow the
attacker to expand the tree with their own subtree.
Ordering the internal node entry as min(left, right) + max(left, right) simplifies
the proof format and verifier code: when constructing the internal entry, the
verifier does not need to distinguish between left and right subtree.
"""
entries: t.Dict[bytes, Leaf]
"""Map of leaf hash -> leaf node.
Use `leaf_hash` to calculate the hash of a value, or use `get_proof(value)`
to access the proof directly.
"""
root: NodeType
"""Root node of the tree."""
def __init__(self, values: t.Iterable[bytes]) -> None:
leaves = [Leaf(value) for value in values]
leaves.sort(key=lambda leaf: leaf.tree_hash)
if not leaves:
raise ValueError("Merkle tree must have at least one value")
self.entries = {leaf.tree_hash: leaf for leaf in leaves}
# build the tree
current_level = leaves
while len(current_level) > 1:
# build one level of the tree
next_level = []
while len(current_level) >= 2:
left, right, *current_level = current_level
next_level.append(Node(left, right))
# add the remaining one or zero nodes to the next level
next_level.extend(current_level)
# switch levels and continue
current_level = next_level
assert len(current_level) == 1, "Tree must have exactly one root node"
# save the root
self.root = current_level[0]
def get_root_hash(self) -> bytes:
return self.root.tree_hash
def get_proof(self, value: bytes) -> t.List[bytes]:
"""Get the proof for a given value."""
try:
return self.entries[leaf_hash(value)].proof
except KeyError:
raise KeyError("Value not found in Merkle tree") from None
def evaluate_proof(value: bytes, proof: t.List[bytes]) -> bytes:
"""Evaluate the provided proof of membership.
Reconstructs the Merkle root hash for a tree that contains `value` as a leaf node,
proving membership in a Merkle tree with the given root hash. The result can be
compared to a statically known root hash, or a signature of it can be verified.
"""
hash = leaf_hash(value)
for proof_entry in proof:
hash = internal_hash(hash, proof_entry)
return hash

@ -492,6 +492,11 @@ class DebugButton(IntEnum):
INFO = 2
class EthereumDefinitionType(IntEnum):
NETWORK = 0
TOKEN = 1
class EthereumDataType(IntEnum):
UINT = 1
INT = 2
@ -4555,12 +4560,79 @@ class EosActionUnknown(protobuf.MessageType):
self.data_chunk = data_chunk
class EthereumNetworkInfo(protobuf.MessageType):
MESSAGE_WIRE_TYPE = None
FIELDS = {
1: protobuf.Field("chain_id", "uint64", repeated=False, required=True),
2: protobuf.Field("symbol", "string", repeated=False, required=True),
3: protobuf.Field("slip44", "uint32", repeated=False, required=True),
4: protobuf.Field("name", "string", repeated=False, required=True),
}
def __init__(
self,
*,
chain_id: "int",
symbol: "str",
slip44: "int",
name: "str",
) -> None:
self.chain_id = chain_id
self.symbol = symbol
self.slip44 = slip44
self.name = name
class EthereumTokenInfo(protobuf.MessageType):
MESSAGE_WIRE_TYPE = None
FIELDS = {
1: protobuf.Field("address", "bytes", repeated=False, required=True),
2: protobuf.Field("chain_id", "uint64", repeated=False, required=True),
3: protobuf.Field("symbol", "string", repeated=False, required=True),
4: protobuf.Field("decimals", "uint32", repeated=False, required=True),
5: protobuf.Field("name", "string", repeated=False, required=True),
}
def __init__(
self,
*,
address: "bytes",
chain_id: "int",
symbol: "str",
decimals: "int",
name: "str",
) -> None:
self.address = address
self.chain_id = chain_id
self.symbol = symbol
self.decimals = decimals
self.name = name
class EthereumDefinitions(protobuf.MessageType):
MESSAGE_WIRE_TYPE = None
FIELDS = {
1: protobuf.Field("encoded_network", "bytes", repeated=False, required=False, default=None),
2: protobuf.Field("encoded_token", "bytes", repeated=False, required=False, default=None),
}
def __init__(
self,
*,
encoded_network: Optional["bytes"] = None,
encoded_token: Optional["bytes"] = None,
) -> None:
self.encoded_network = encoded_network
self.encoded_token = encoded_token
class EthereumSignTypedData(protobuf.MessageType):
MESSAGE_WIRE_TYPE = 464
FIELDS = {
1: protobuf.Field("address_n", "uint32", repeated=True, required=False, default=None),
2: protobuf.Field("primary_type", "string", repeated=False, required=True),
3: protobuf.Field("metamask_v4_compat", "bool", repeated=False, required=False, default=True),
4: protobuf.Field("definitions", "EthereumDefinitions", repeated=False, required=False, default=None),
}
def __init__(
@ -4569,10 +4641,12 @@ class EthereumSignTypedData(protobuf.MessageType):
primary_type: "str",
address_n: Optional[Sequence["int"]] = None,
metamask_v4_compat: Optional["bool"] = True,
definitions: Optional["EthereumDefinitions"] = None,
) -> None:
self.address_n: Sequence["int"] = address_n if address_n is not None else []
self.primary_type = primary_type
self.metamask_v4_compat = metamask_v4_compat
self.definitions = definitions
class EthereumTypedDataStructRequest(protobuf.MessageType):
@ -4710,6 +4784,7 @@ class EthereumGetAddress(protobuf.MessageType):
FIELDS = {
1: protobuf.Field("address_n", "uint32", repeated=True, required=False, default=None),
2: protobuf.Field("show_display", "bool", repeated=False, required=False, default=None),
3: protobuf.Field("encoded_network", "bytes", repeated=False, required=False, default=None),
}
def __init__(
@ -4717,9 +4792,11 @@ class EthereumGetAddress(protobuf.MessageType):
*,
address_n: Optional[Sequence["int"]] = None,
show_display: Optional["bool"] = None,
encoded_network: Optional["bytes"] = None,
) -> None:
self.address_n: Sequence["int"] = address_n if address_n is not None else []
self.show_display = show_display
self.encoded_network = encoded_network
class EthereumAddress(protobuf.MessageType):
@ -4752,6 +4829,7 @@ class EthereumSignTx(protobuf.MessageType):
8: protobuf.Field("data_length", "uint32", repeated=False, required=False, default=0),
9: protobuf.Field("chain_id", "uint64", repeated=False, required=True),
10: protobuf.Field("tx_type", "uint32", repeated=False, required=False, default=None),
12: protobuf.Field("definitions", "EthereumDefinitions", repeated=False, required=False, default=None),
}
def __init__(
@ -4767,6 +4845,7 @@ class EthereumSignTx(protobuf.MessageType):
data_initial_chunk: Optional["bytes"] = b'',
data_length: Optional["int"] = 0,
tx_type: Optional["int"] = None,
definitions: Optional["EthereumDefinitions"] = None,
) -> None:
self.address_n: Sequence["int"] = address_n if address_n is not None else []
self.gas_price = gas_price
@ -4778,6 +4857,7 @@ class EthereumSignTx(protobuf.MessageType):
self.data_initial_chunk = data_initial_chunk
self.data_length = data_length
self.tx_type = tx_type
self.definitions = definitions
class EthereumSignTxEIP1559(protobuf.MessageType):
@ -4794,6 +4874,7 @@ class EthereumSignTxEIP1559(protobuf.MessageType):
9: protobuf.Field("data_length", "uint32", repeated=False, required=True),
10: protobuf.Field("chain_id", "uint64", repeated=False, required=True),
11: protobuf.Field("access_list", "EthereumAccessList", repeated=True, required=False, default=None),
12: protobuf.Field("definitions", "EthereumDefinitions", repeated=False, required=False, default=None),
}
def __init__(
@ -4810,6 +4891,7 @@ class EthereumSignTxEIP1559(protobuf.MessageType):
access_list: Optional[Sequence["EthereumAccessList"]] = None,
to: Optional["str"] = '',
data_initial_chunk: Optional["bytes"] = b'',
definitions: Optional["EthereumDefinitions"] = None,
) -> None:
self.address_n: Sequence["int"] = address_n if address_n is not None else []
self.access_list: Sequence["EthereumAccessList"] = access_list if access_list is not None else []
@ -4822,6 +4904,7 @@ class EthereumSignTxEIP1559(protobuf.MessageType):
self.chain_id = chain_id
self.to = to
self.data_initial_chunk = data_initial_chunk
self.definitions = definitions
class EthereumTxRequest(protobuf.MessageType):
@ -4866,6 +4949,7 @@ class EthereumSignMessage(protobuf.MessageType):
FIELDS = {
1: protobuf.Field("address_n", "uint32", repeated=True, required=False, default=None),
2: protobuf.Field("message", "bytes", repeated=False, required=True),
3: protobuf.Field("encoded_network", "bytes", repeated=False, required=False, default=None),
}
def __init__(
@ -4873,9 +4957,11 @@ class EthereumSignMessage(protobuf.MessageType):
*,
message: "bytes",
address_n: Optional[Sequence["int"]] = None,
encoded_network: Optional["bytes"] = None,
) -> None:
self.address_n: Sequence["int"] = address_n if address_n is not None else []
self.message = message
self.encoded_network = encoded_network
class EthereumMessageSignature(protobuf.MessageType):
@ -4921,6 +5007,7 @@ class EthereumSignTypedHash(protobuf.MessageType):
1: protobuf.Field("address_n", "uint32", repeated=True, required=False, default=None),
2: protobuf.Field("domain_separator_hash", "bytes", repeated=False, required=True),
3: protobuf.Field("message_hash", "bytes", repeated=False, required=False, default=None),
4: protobuf.Field("encoded_network", "bytes", repeated=False, required=False, default=None),
}
def __init__(
@ -4929,10 +5016,12 @@ class EthereumSignTypedHash(protobuf.MessageType):
domain_separator_hash: "bytes",
address_n: Optional[Sequence["int"]] = None,
message_hash: Optional["bytes"] = None,
encoded_network: Optional["bytes"] = None,
) -> None:
self.address_n: Sequence["int"] = address_n if address_n is not None else []
self.domain_separator_hash = domain_separator_hash
self.message_hash = message_hash
self.encoded_network = encoded_network
class EthereumTypedDataSignature(protobuf.MessageType):

@ -0,0 +1,108 @@
# This file is part of the Trezor project.
#
# Copyright (C) 2012-2022 SatoshiLabs and contributors
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License version 3
# as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the License along with this library.
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
import pytest
import typing as t
from trezorlib.merkle_tree import (
MerkleTree,
Leaf,
Node,
leaf_hash,
internal_hash,
evaluate_proof,
)
NODE_VECTORS = ( # node, expected_hash
( # leaf node
Leaf(b"hello"),
"8a2a5c9b768827de5a9552c38a044c66959c68f6d2f21b5260af54d2f87db827",
),
( # node with leaf nodes
Node(left=Leaf(b"hello"), right=Leaf(b"world")),
"24233339aadcedf287d262413f03c028eb8db397edd32a2878091151b99bf20f",
),
( # asymmetric node with leaf hanging on second level
Node(left=Node(left=Leaf(b"hello"), right=Leaf(b"world")), right=Leaf(b"!")),
"c3727420dc97c0dbd89678ee195957e44cfa69f5759b395a07bc171b21468633",
),
)
MERKLE_TREE_VECTORS = (
( # one value
# values
[b"Merkle"],
# expected root hash
leaf_hash(b"Merkle"),
# expected dict of proof lists
{
b"Merkle": [],
},
),
( # two values
# values
[b"Haber", b"Stornetta"],
# expected root hash
internal_hash(
leaf_hash(b"Haber"),
leaf_hash(b"Stornetta"),
),
# expected dict of proof lists
{
b"Haber": [leaf_hash(b"Stornetta")],
b"Stornetta": [leaf_hash(b"Haber")],
},
),
( # three values
# values
[b"Andersen", b"Wuille", b"Maxwell"],
# expected root hash
internal_hash(
internal_hash(
leaf_hash(b"Maxwell"),
leaf_hash(b"Wuille"),
),
leaf_hash(b"Andersen"),
),
# expected dict of proof lists
{
b"Andersen": [internal_hash(leaf_hash(b"Maxwell"), leaf_hash(b"Wuille"))],
b"Maxwell": [leaf_hash(b"Wuille"), leaf_hash(b"Andersen")],
b"Wuille": [leaf_hash(b"Maxwell"), leaf_hash(b"Andersen")],
},
),
)
@pytest.mark.parametrize("node, expected_hash", NODE_VECTORS)
def test_node(node: t.Union[Node, Leaf], expected_hash: str) -> None:
assert node.tree_hash.hex() == expected_hash
@pytest.mark.parametrize("values, root_hash, proofs", MERKLE_TREE_VECTORS)
def test_tree(
values: t.List[bytes],
root_hash: bytes,
proofs: t.Dict[bytes, t.List[bytes]],
) -> None:
mt = MerkleTree(values)
assert mt.get_root_hash() == root_hash
for value, proof in proofs.items():
assert mt.get_proof(value) == proof
assert evaluate_proof(value, proof) == root_hash

@ -17,3 +17,7 @@ These scripts do not need to have a high standard, but each of those should have
## [monero_unused_functions](./monero_unused_functions.py)
- Find functions from Monero cryptography that are not used.
## [eth_defs_unpack.py](./eth_defs_unpack.py)
- Unpacks the definitions from a `definitions-sparse.zip` that does not contain the
Merkle proofs for space-saving. This format is not currently distributed.

@ -0,0 +1,107 @@
#!/usr/bin/env python3
# This file is part of the Trezor project.
#
# Copyright (C) 2012-2022 SatoshiLabs and contributors
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License version 3
# as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the License along with this library.
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
from __future__ import annotations
import click
import requests
import zipfile
from pathlib import Path
from trezorlib import definitions, merkle_tree
ZIP_FILENAME = "definitions-sparse.zip"
TOPDIRS = ("chain-id", "slip44")
class SparseZipSource(definitions.Source):
def __init__(self, zip: Path | zipfile.ZipFile) -> None:
if isinstance(zip, Path):
self.zip = zipfile.ZipFile(zip)
else:
self.zip = zip
# extract signature
self.signature = self.read_bytes("signature.dat")
self.root_hash = self.read_bytes("root.dat")
# construct a Merkle tree
entries = []
for name in self.zip.namelist():
if name.startswith("chain-id/"):
entries.append(self.read_bytes(name))
entries.sort()
self.merkle_tree = merkle_tree.MerkleTree(entries)
if self.root_hash != self.merkle_tree.get_root_hash():
raise ValueError("Failed to reconstruct the correct Merkle tree")
def read_bytes(self, path: str | Path) -> bytes:
with self.zip.open(str(path)) as f:
return f.read()
def fetch_path(self, *components: str) -> bytes | None:
path = "/".join(components)
data = self.read_bytes(path)
proof = self.merkle_tree.get_proof(data)
proof_bytes = definitions.ProofFormat.build(proof)
return data + proof_bytes + self.signature
@click.command()
@click.option(
"-z",
"--definitions-zip",
type=click.Path(exists=True, dir_okay=False, resolve_path=True, path_type=Path),
help="Local zip file with stored definitions.",
)
@click.argument(
"outdir",
type=click.Path(resolve_path=True, file_okay=False, writable=True, path_type=Path),
)
def unpack_definitions(definitions_zip: Path, outdir: Path) -> None:
"""Script that unpacks and completes (insert missing Merkle Tree proofs
into the definitions) the Ethereum definitions (networks and tokens).
If no local zip is provided, the latest one will be downloaded from trezor.io.
"""
if definitions_zip is None:
result = requests.get(definitions.DEFS_BASE_URL + ZIP_FILENAME)
result.raise_for_status()
zip = zipfile.ZipFile(result.raw)
else:
zip = zipfile.ZipFile(definitions_zip)
source = SparseZipSource(zip)
if not outdir.exists():
outdir.mkdir()
for name in zip.namelist():
if name == "signature.dat" or not name.endswith(".dat"):
continue
local_path = outdir / name
local_path.parent.mkdir(parents=True, exist_ok=True)
data = source.fetch_path(name)
assert data is not None, f"Could not read data for: {name}"
local_path.write_bytes(data)
if __name__ == "__main__":
unpack_definitions()
Loading…
Cancel
Save