1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-07-16 03:28:09 +00:00

feat(core,python): sending definitions in protobuf

This commit is contained in:
Martin Novak 2022-07-08 18:07:45 +02:00
parent 6fce5e75e3
commit 326f4db6fe
5 changed files with 308 additions and 45 deletions

View File

@ -1,5 +1,5 @@
from collections import defaultdict
from typing import TYPE_CHECKING, Tuple
from ubinascii import unhexlify
from typing import TYPE_CHECKING
from apps.ethereum import tokens
@ -8,7 +8,7 @@ from trezor.crypto.curve import ed25519
from trezor.enums import EthereumDefinitionType
from trezor.messages import EthereumNetworkInfo, EthereumTokenInfo
from . import networks
from . import helpers, networks
if TYPE_CHECKING:
from trezor.protobuf import MessageType
@ -22,7 +22,7 @@ MIN_DATA_VERSION = 1
FORMAT_VERSION = "trzd1"
if __debug__:
DEFINITIONS_DEV_PUBLIC_KEY = b""
DEFINITIONS_DEV_PUBLIC_KEY = unhexlify("db995fe25169d141cab9bbba92baa01f9f2e1ece7df4cb2ac05190f37fcc1f9d")
class EthereumDefinitionParser:
@ -68,7 +68,7 @@ def decode_definition(
# decode it if it's OK
if expected_type == EthereumDefinitionType.NETWORK:
info = protobuf.decode(parsed_definition.payload, EthereumNetworkInfo, True)
info = protobuf.decode(parsed_definition.clean_payload, EthereumNetworkInfo, True)
# TODO: temporarily convert to internal class
if info is not None:
@ -81,7 +81,7 @@ def decode_definition(
rskip60=info.rskip60
)
else:
info = protobuf.decode(parsed_definition.payload, EthereumTokenInfo, True)
info = protobuf.decode(parsed_definition.clean_payload, EthereumTokenInfo, True)
# TODO: temporarily convert to internal class
if info is not None:
@ -112,7 +112,7 @@ def _get_network_definiton(encoded_network_definition: bytes | None, ref_chain_i
network = decode_definition(encoded_network_definition, EthereumDefinitionType.NETWORK)
# check referential chain_id with encoded chain_id
if network.chain_id != ref_chain_id:
if ref_chain_id is not None and network.chain_id != ref_chain_id:
raise wire.DataError("Invalid network definition - chain IDs not equal.")
return network
@ -120,9 +120,9 @@ def _get_network_definiton(encoded_network_definition: bytes | None, ref_chain_i
return None
def _get_token_definiton(encoded_token_definition: bytes | None, ref_chain_id: int | None = None, ref_address: int | None = None) -> TokenInfo:
def _get_token_definiton(encoded_token_definition: bytes | None, ref_chain_id: int | None = None, ref_address: bytes | None = None) -> TokenInfo:
if encoded_token_definition is None and (ref_chain_id is None or ref_address is None):
return None
return tokens.UNKNOWN_TOKEN
# if we have a built-in definition, use it
if ref_chain_id is not None and ref_address is not None:
@ -151,23 +151,23 @@ class EthereumDefinitions:
encoded_network_definition: bytes | None = None,
encoded_token_definition: bytes | None = None,
ref_chain_id: int | None = None,
ref_token_address: int | None = None,
ref_token_address: bytes | None = None,
) -> None:
self.network = _get_network_definiton(encoded_network_definition, ref_chain_id)
self.token_dict: defaultdict[bytes, TokenInfo] = defaultdict(lambda: tokens.UNKNOWN_TOKEN)
self.token_dict: dict[bytes, TokenInfo] = dict()
# if we have some network, we can try to get token
if self.network is not None:
received_token = _get_token_definiton(encoded_token_definition, self.network.chain_id, ref_token_address)
if received_token is not tokens.UNKNOWN_TOKEN:
self.token_dict[received_token.address] = received_token
token = _get_token_definiton(encoded_token_definition, self.network.chain_id, ref_token_address)
if token is not tokens.UNKNOWN_TOKEN:
self.token_dict[token.address] = token
def get_definitions_from_msg(msg: MessageType) -> EthereumDefinitions:
encoded_network_definition: bytes | None = None
encoded_token_definition: bytes | None = None
chain_id: int | None = None
token_address: int | None = None
token_address: str | None = None
# first try to get both definitions
try:
@ -175,7 +175,14 @@ def get_definitions_from_msg(msg: MessageType) -> EthereumDefinitions:
encoded_network_definition = msg.definitions.encoded_network
encoded_token_definition = msg.definitions.encoded_token
except AttributeError:
encoded_network_definition = msg.encoded_network
pass
# check if we have network definition, if not give it a last try
if encoded_network_definition is None:
try:
encoded_network_definition = msg.encoded_network
except AttributeError:
pass
# get chain_id
try:
@ -185,7 +192,7 @@ def get_definitions_from_msg(msg: MessageType) -> EthereumDefinitions:
# get token_address
try:
token_address = msg.to
token_address = helpers.bytes_from_address(msg.to)
except AttributeError:
pass

View File

@ -8,7 +8,6 @@ from .helpers import bytes_from_address
from .keychain import with_keychain_from_chain_id_and_defs
if TYPE_CHECKING:
from collections import defaultdict
from apps.common.keychain import Keychain
from trezor.messages import EthereumSignTx, EthereumTxAck
from trezor.wire import Context
@ -101,7 +100,7 @@ async def sign_tx(
async def handle_erc20(
ctx: Context, msg: EthereumSignTxAny, token_dict: defaultdict[bytes, tokens.TokenInfo]
ctx: Context, msg: EthereumSignTxAny, token_dict: dict[bytes, tokens.TokenInfo]
) -> tuple[tokens.TokenInfo | None, bytes, bytes, int]:
from .layout import require_confirm_unknown_token
from . import tokens
@ -119,7 +118,7 @@ async def handle_erc20(
and data_initial_chunk[:16]
== b"\xa9\x05\x9c\xbb\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
):
token = token_dict[address_bytes]
token = token_dict.get(address_bytes, tokens.UNKNOWN_TOKEN)
recipient = data_initial_chunk[16:36]
value = int.from_bytes(data_initial_chunk[36:68], "big")

View File

@ -15,6 +15,7 @@
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
import json
import pathlib
import re
import sys
from decimal import Decimal
@ -164,34 +165,137 @@ def _format_access_list(
]
def _get_ethereum_definitions(
definitions_dir: pathlib.Path = None,
network_def_file: TextIO = None,
token_def_file: TextIO = None,
download_definitions: bool = False,
chain_id: Optional[int] = None,
slip44_hardened: Optional[int] = None,
token_address: Optional[str] = None,
) -> ethereum.messages.EthereumEncodedDefinitions:
count_of_options_used = sum(
bool(o) for o in (
definitions_dir,
(network_def_file or token_def_file),
download_definitions
)
)
if count_of_options_used > 1:
raise click.ClickException("More than one mutually exclusive option for definitions was used. See --help for more info.")
defs = ethereum.messages.EthereumEncodedDefinitions()
if definitions_dir is not None:
if chain_id is not None or slip44_hardened is not None:
defs.encoded_network = ethereum.network_definition_from_dir(definitions_dir, chain_id, slip44_hardened)
if chain_id is not None and token_address is not None:
defs.encoded_token = ethereum.token_definition_from_dir(definitions_dir, chain_id, token_address)
elif network_def_file is not None or token_def_file is not None:
if network_def_file is not None:
with network_def_file:
defs.encoded_network = network_def_file.read()
if token_def_file is not None:
with token_def_file:
defs.encoded_token = token_def_file.read()
elif download_definitions:
if chain_id is not None or slip44_hardened is not None:
defs.encoded_network = ethereum.download_network_definition(chain_id, slip44_hardened)
if chain_id is not None and token_address is not None:
defs.encoded_token = ethereum.download_token_definition(chain_id, token_address)
return defs
#####################
#
# commands start here
definitions_dir_option = click.option(
"--definitions-dir",
type=click.Path(exists=True, file_okay=False, resolve_path=True, path_type=pathlib.Path),
help="Directory with stored definitions. Directory structure should be the same as it is in downloaded archive from " \
"`https:\\data.trezor.io\definitions\???`. Mutually exclusive with `--network-def`, `--token-def` and " \
"`--download-definitions`.", # TODO: add link?, replace this ur with function used to download defs
)
network_def_option = click.option(
"--network-def",
type=click.File(mode="rb"),
help="Binary file with network definition. Mutually exclusive with `--definitions-dir` and `--download-definitions`."
)
token_def_options = click.option(
"--token-def",
type=click.File(mode="rb"),
help="Binary file with token definition. Mutually exclusive with `--definitions-dir` and `--download-definitions`."
)
download_definitions_option = click.option(
"--download-definitions",
is_flag=True,
help="Automatically download required definitions from `data.trezor.io\definitions` and use them. " \
"Mutually exclusive with `--definitions-dir`, `--network-def` and `--token-def`."
)
@click.group(name="ethereum")
def cli() -> None:
"""Ethereum commands."""
@cli.command()
@click.option("-n", "--address", required=True, help=PATH_HELP)
@click.option("-d", "--show-display", is_flag=True)
@with_client
def get_address(client: "TrezorClient", address: str, show_display: bool) -> str:
"""Get Ethereum address in hex encoding."""
address_n = tools.parse_path(address)
return ethereum.get_address(client, address_n, show_display)
@click.option("-o", "--outdir", type=click.Path(resolve_path=True, file_okay=False, path_type=pathlib.Path), default="./definitions-latest")
@click.option("-u", "--unpack", is_flag=True)
def download_definitions(outdir: pathlib.Path, unpack: bool) -> str:
"""Download all Ethereum network and token definitions and save them."""
archive_filename = "definitions.tar.gz"
# TODO: change once we know the urls
archived_definitions = ethereum.download_from_url("https://data.trezor.io/eth_definitions/" + archive_filename)
# unpack and/or save
if unpack:
# TODO: implement once we know archive format
pass
else:
with open(archive_filename, mode="wb+") as f:
f.write(archived_definitions)
@cli.command()
@click.option("-n", "--address", required=True, help=PATH_HELP)
@click.option("-d", "--show-display", is_flag=True)
@definitions_dir_option
@network_def_option
@download_definitions_option
@with_client
def get_public_node(client: "TrezorClient", address: str, show_display: bool) -> dict:
def get_address(client: "TrezorClient", address: str, show_display: bool, definitions_dir: pathlib.Path, network_def: TextIO, download_definitions: bool) -> str:
"""Get Ethereum address in hex encoding."""
address_n = tools.parse_path(address)
defs = _get_ethereum_definitions(
definitions_dir=definitions_dir,
network_def_file=network_def,
download_definitions=download_definitions,
slip44_hardened=address_n[1],
)
return ethereum.get_address(client, address_n, show_display, defs.encoded_network)
@cli.command()
@click.option("-n", "--address", required=True, help=PATH_HELP)
@click.option("-d", "--show-display", is_flag=True)
@definitions_dir_option
@network_def_option
@download_definitions_option
@with_client
def get_public_node(client: "TrezorClient", address: str, show_display: bool, definitions_dir: pathlib.Path, network_def: TextIO, download_definitions: bool) -> dict:
"""Get Ethereum public node of given path."""
address_n = tools.parse_path(address)
result = ethereum.get_public_node(client, address_n, show_display=show_display)
defs = _get_ethereum_definitions(
definitions_dir=definitions_dir,
network_def_file=network_def,
download_definitions=download_definitions,
slip44_hardened=address_n[1],
)
result = ethereum.get_public_node(client, address_n, show_display=show_display, encoded_network=defs.encoded_network)
return {
"node": {
"depth": result.node.depth,
@ -249,6 +353,10 @@ def get_public_node(client: "TrezorClient", address: str, show_display: bool) ->
)
@click.argument("to_address")
@click.argument("amount", callback=_amount_to_int)
@definitions_dir_option
@network_def_option
@token_def_options
@download_definitions_option
@with_client
def sign_tx(
client: "TrezorClient",
@ -267,6 +375,10 @@ def sign_tx(
max_priority_fee: Optional[int],
access_list: List[ethereum.messages.EthereumAccessList],
eip2718_type: Optional[int],
definitions_dir: pathlib.Path,
network_def: TextIO,
token_def: TextIO,
download_definitions: bool,
) -> str:
"""Sign (and optionally publish) Ethereum transaction.
@ -335,6 +447,15 @@ def sign_tx(
assert gas_limit is not None
assert nonce is not None
defs = _get_ethereum_definitions(
definitions_dir=definitions_dir,
network_def_file=network_def,
token_def_file=token_def,
download_definitions=download_definitions,
chain_id=chain_id,
token_address=to_address
)
if is_eip1559:
assert max_gas_fee is not None
assert max_priority_fee is not None
@ -350,6 +471,7 @@ def sign_tx(
max_gas_fee=max_gas_fee,
max_priority_fee=max_priority_fee,
access_list=access_list,
definitions=defs,
)
else:
if gas_price is None:
@ -366,6 +488,7 @@ def sign_tx(
value=amount,
data=data_bytes,
chain_id=chain_id,
definitions=defs,
)
to = ethereum.decode_hex(to_address)
@ -416,11 +539,20 @@ def sign_tx(
@cli.command()
@click.option("-n", "--address", required=True, help=PATH_HELP)
@click.argument("message")
@definitions_dir_option
@network_def_option
@download_definitions_option
@with_client
def sign_message(client: "TrezorClient", address: str, message: str) -> Dict[str, str]:
def sign_message(client: "TrezorClient", address: str, message: str, definitions_dir: pathlib.Path, network_def: TextIO, download_definitions: bool) -> Dict[str, str]:
"""Sign message with Ethereum address."""
address_n = tools.parse_path(address)
ret = ethereum.sign_message(client, address_n, message)
defs = _get_ethereum_definitions(
definitions_dir=definitions_dir,
network_def_file=network_def,
download_definitions=download_definitions,
slip44_hardened=address_n[1],
)
ret = ethereum.sign_message(client, address_n, message, defs.encoded_network)
output = {
"message": message,
"address": ret.address,
@ -437,9 +569,12 @@ def sign_message(client: "TrezorClient", address: str, message: str) -> Dict[str
help="Be compatible with Metamask's signTypedData_v4 implementation",
)
@click.argument("file", type=click.File("r"))
@definitions_dir_option
@network_def_option
@download_definitions_option
@with_client
def sign_typed_data(
client: "TrezorClient", address: str, metamask_v4_compat: bool, file: TextIO
client: "TrezorClient", address: str, metamask_v4_compat: bool, file: TextIO, definitions_dir: pathlib.Path, network_def: TextIO, download_definitions: bool
) -> Dict[str, str]:
"""Sign typed data (EIP-712) with Ethereum address.
@ -449,8 +584,14 @@ def sign_typed_data(
"""
address_n = tools.parse_path(address)
data = json.loads(file.read())
defs = _get_ethereum_definitions(
definitions_dir=definitions_dir,
network_def_file=network_def,
download_definitions=download_definitions,
slip44_hardened=address_n[1],
)
ret = ethereum.sign_typed_data(
client, address_n, data, metamask_v4_compat=metamask_v4_compat
client, address_n, data, metamask_v4_compat=metamask_v4_compat, encoded_network=defs.encoded_network
)
output = {
"address": ret.address,
@ -463,13 +604,23 @@ def sign_typed_data(
@click.argument("address")
@click.argument("signature")
@click.argument("message")
@definitions_dir_option
@network_def_option
@download_definitions_option
@with_client
def verify_message(
client: "TrezorClient", address: str, signature: str, message: str
client: "TrezorClient", address: str, signature: str, message: str, definitions_dir: pathlib.Path, network_def: TextIO, download_definitions: bool
) -> bool:
"""Verify message signed with Ethereum address."""
chain_id = 1
signature_bytes = ethereum.decode_hex(signature)
return ethereum.verify_message(client, address, signature_bytes, message)
defs = _get_ethereum_definitions(
definitions_dir=definitions_dir,
network_def_file=network_def,
download_definitions=download_definitions,
chain_id=chain_id,
)
return ethereum.verify_message(client, address, signature_bytes, message, chain_id, defs.encoded_network)
@cli.command()

View File

@ -14,11 +14,12 @@
# You should have received a copy of the License along with this library.
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
import re
from typing import TYPE_CHECKING, Any, AnyStr, Dict, List, Optional, Tuple
from itertools import chain
import pathlib, re, requests
from typing import TYPE_CHECKING, Any, AnyStr, Dict, List, Optional, TextIO, Tuple
from . import exceptions, messages
from .tools import expect, prepare_message_bytes, session
from .tools import expect, UH_, prepare_message_bytes, session
if TYPE_CHECKING:
from .client import TrezorClient
@ -26,6 +27,14 @@ if TYPE_CHECKING:
from .protobuf import MessageType
# TODO: change once we know the urls
DEFS_BASE_URL="https://data.trezor.io/eth_definitions/{lookup_type}/{id}/{name}.dat"
DEFS_NETWORK_BY_CHAINID_LOOKUP_TYPE="by_chain_id"
DEFS_NETWORK_BY_SLIP44_LOOKUP_TYPE="by_slip44"
DEFS_NETWORK_URI_NAME="network"
DEFS_TOKEN_URI_NAME="token_{hex_address}"
def int_to_big_endian(value: int) -> bytes:
return value.to_bytes((value.bit_length() + 7) // 8, "big")
@ -141,24 +150,105 @@ def encode_data(value: Any, type_name: str) -> bytes:
raise ValueError(f"Unsupported data type for direct field encoding: {type_name}")
def download_from_url(url: str, error_msg: str = "") -> bytes:
try:
r = requests.get(url)
r.raise_for_status()
return r.content
except requests.exceptions.HTTPError as err:
raise RuntimeError(f"{error_msg}{err}")
def download_network_definition(chain_id: Optional[int] = None, slip44_hardened: Optional[int] = None) -> Optional[bytes]:
if chain_id is None != slip44_hardened is None: # XOR
raise RuntimeError(f"Both/or neither of chain_id and slip44_hardened parameters are needed to download network definition.")
if chain_id is not None:
url = DEFS_BASE_URL.format(
lookup_type=DEFS_NETWORK_BY_CHAINID_LOOKUP_TYPE,
id=chain_id,
name=DEFS_NETWORK_URI_NAME,
)
else:
url = DEFS_BASE_URL.format(
lookup_type=DEFS_NETWORK_BY_SLIP44_LOOKUP_TYPE,
id=UH_(slip44_hardened),
name=DEFS_NETWORK_URI_NAME,
)
error_msg = f"While downloading network definition from \"{url}\" following HTTP error occured: "
return download_from_url(url, error_msg)
def download_token_definition(chain_id: Optional[int] = None, token_address: Optional[str] = None) -> Optional[bytes]:
if chain_id is None or token_address is None:
raise RuntimeError(f"Both chain_id and token_address parameters are needed to download token definition.")
url = DEFS_BASE_URL.format(
lookup_type=DEFS_NETWORK_BY_CHAINID_LOOKUP_TYPE,
id=chain_id,
name=DEFS_TOKEN_URI_NAME.format(hex_address=token_address),
)
error_msg = f"While downloading token definition from \"{url}\" following HTTP error occured: "
return download_from_url(url, error_msg)
def network_definition_from_dir(path: pathlib.Path, chain_id: Optional[int] = None, slip44_hardened: Optional[int] = None) -> Optional[bytes]:
if chain_id is None != slip44_hardened is None: # XOR
raise RuntimeError(f"Both/or neither of chain_id and slip44_hardened parameters are needed to load network definition from directory.")
def read_definition(path: pathlib.Path) -> Optional[bytes]:
if not path.exists() or not path.is_file():
return None
with open(path, mode="rb") as f:
return f.read()
if chain_id is not None:
return read_definition(path / DEFS_NETWORK_BY_CHAINID_LOOKUP_TYPE / str(chain_id) / (DEFS_NETWORK_URI_NAME + ".dat"))
else:
return read_definition(path / DEFS_NETWORK_BY_SLIP44_LOOKUP_TYPE / str(UH_(slip44_hardened)) / (DEFS_NETWORK_URI_NAME + ".dat"))
def token_definition_from_dir(path: pathlib.Path, chain_id: Optional[int] = None, token_address: Optional[str] = None) -> Optional[bytes]:
if chain_id is None or token_address is None:
raise RuntimeError(f"Both chain_id and token_address parameters are needed to load token definition from directory.")
path = path / DEFS_NETWORK_BY_CHAINID_LOOKUP_TYPE / str(chain_id) / (DEFS_TOKEN_URI_NAME.format(hex_address=token_address) + ".dat")
if not path.exists() or not path.is_file():
return None
with open(path, mode="rb") as f:
return f.read()
# ====== Client functions ====== #
@expect(messages.EthereumAddress, field="address", ret_type=str)
def get_address(
client: "TrezorClient", n: "Address", show_display: bool = False
client: "TrezorClient", n: "Address", show_display: bool = False, encoded_network: bytes = None
) -> "MessageType":
return client.call(
messages.EthereumGetAddress(address_n=n, show_display=show_display)
messages.EthereumGetAddress(
address_n=n,
show_display=show_display,
encoded_network=encoded_network,
)
)
@expect(messages.EthereumPublicKey)
def get_public_node(
client: "TrezorClient", n: "Address", show_display: bool = False
client: "TrezorClient", n: "Address", show_display: bool = False, encoded_network: bytes = None
) -> "MessageType":
return client.call(
messages.EthereumGetPublicKey(address_n=n, show_display=show_display)
messages.EthereumGetPublicKey(
address_n=n,
show_display=show_display,
encoded_network=encoded_network,
)
)
@ -174,6 +264,7 @@ def sign_tx(
data: Optional[bytes] = None,
chain_id: Optional[int] = None,
tx_type: Optional[int] = None,
definitions: Optional[messages.EthereumEncodedDefinitions] = None,
) -> Tuple[int, bytes, bytes]:
if chain_id is None:
raise exceptions.TrezorException("Chain ID cannot be undefined")
@ -187,6 +278,7 @@ def sign_tx(
to=to,
chain_id=chain_id,
tx_type=tx_type,
definitions=definitions,
)
if data is None:
@ -231,6 +323,7 @@ def sign_tx_eip1559(
max_gas_fee: int,
max_priority_fee: int,
access_list: Optional[List[messages.EthereumAccessList]] = None,
definitions: Optional[messages.EthereumEncodedDefinitions] = None,
) -> Tuple[int, bytes, bytes]:
length = len(data)
data, chunk = data[1024:], data[:1024]
@ -246,6 +339,7 @@ def sign_tx_eip1559(
access_list=access_list,
data_length=length,
data_initial_chunk=chunk,
definitions=definitions,
)
response = client.call(msg)
@ -265,11 +359,13 @@ def sign_tx_eip1559(
@expect(messages.EthereumMessageSignature)
def sign_message(
client: "TrezorClient", n: "Address", message: AnyStr
client: "TrezorClient", n: "Address", message: AnyStr, encoded_network: bytes = None
) -> "MessageType":
return client.call(
messages.EthereumSignMessage(
address_n=n, message=prepare_message_bytes(message)
address_n=n,
message=prepare_message_bytes(message),
encoded_network=encoded_network,
)
)
@ -281,6 +377,7 @@ def sign_typed_data(
data: Dict[str, Any],
*,
metamask_v4_compat: bool = True,
encoded_network: bytes = None,
) -> "MessageType":
data = sanitize_typed_data(data)
types = data["types"]
@ -289,6 +386,7 @@ def sign_typed_data(
address_n=n,
primary_type=data["primaryType"],
metamask_v4_compat=metamask_v4_compat,
encoded_network=encoded_network,
)
response = client.call(request)
@ -348,7 +446,7 @@ def sign_typed_data(
def verify_message(
client: "TrezorClient", address: str, signature: bytes, message: AnyStr
client: "TrezorClient", address: str, signature: bytes, message: AnyStr, chain_id: int = 1, encoded_network: bytes = None
) -> bool:
try:
resp = client.call(
@ -356,6 +454,7 @@ def verify_message(
address=address,
signature=signature,
message=prepare_message_bytes(message),
encoded_network=encoded_network,
)
)
except exceptions.TrezorFailure:

View File

@ -60,6 +60,13 @@ def H_(x: int) -> int:
return x | HARDENED_FLAG
def UH_(x: int) -> int:
"""
Shortcut function that "un-hardens" a number in a BIP44 path.
"""
return x & ~(HARDENED_FLAG)
def btc_hash(data: bytes) -> bytes:
"""
Double-SHA256 hash as used in BTC