mirror of
https://github.com/trezor/trezor-firmware.git
synced 2025-05-22 16:58:49 +00:00
feat(python): support ethereum transaction signing anti-exfil protocol
This commit is contained in:
parent
a8c8b35b07
commit
9ece9ae6c0
@ -24,6 +24,8 @@ if TYPE_CHECKING:
|
||||
from .client import TrezorClient
|
||||
from .tools import Address
|
||||
|
||||
from .anti_exfil import AntiExfilSignature, commit_entropy, generate_entropy, verify
|
||||
|
||||
|
||||
def int_to_big_endian(value: int) -> bytes:
|
||||
return value.to_bytes((value.bit_length() + 7) // 8, "big")
|
||||
@ -190,6 +192,104 @@ def get_public_node(
|
||||
|
||||
|
||||
@session
|
||||
def sign_tx_common(
|
||||
client: "TrezorClient",
|
||||
n: "Address",
|
||||
nonce: int,
|
||||
gas_price: int,
|
||||
gas_limit: int,
|
||||
to: str,
|
||||
value: int,
|
||||
data: Optional[bytes],
|
||||
chain_id: Optional[int],
|
||||
tx_type: Optional[int],
|
||||
definitions: Optional[messages.EthereumDefinitions],
|
||||
chunkify: bool,
|
||||
use_anti_exfil: bool,
|
||||
entropy: Optional[bytes],
|
||||
) -> Tuple[Optional[int], bytes, bytes, Optional[bytes], Optional[bytes]]:
|
||||
if chain_id is None:
|
||||
raise exceptions.TrezorException("Chain ID cannot be undefined")
|
||||
|
||||
if use_anti_exfil:
|
||||
if entropy is None:
|
||||
entropy = generate_entropy()
|
||||
|
||||
msg = messages.EthereumSignTx(
|
||||
address_n=n,
|
||||
nonce=int_to_big_endian(nonce),
|
||||
gas_price=int_to_big_endian(gas_price),
|
||||
gas_limit=int_to_big_endian(gas_limit),
|
||||
value=int_to_big_endian(value),
|
||||
to=to,
|
||||
chain_id=chain_id,
|
||||
tx_type=tx_type,
|
||||
definitions=definitions,
|
||||
chunkify=chunkify,
|
||||
entropy_commitment=(
|
||||
commit_entropy(entropy) if use_anti_exfil and entropy is not None else None
|
||||
),
|
||||
)
|
||||
|
||||
if data is None:
|
||||
data = b""
|
||||
|
||||
msg.data_length = len(data)
|
||||
data, chunk = data[1024:], data[:1024]
|
||||
msg.data_initial_chunk = chunk
|
||||
|
||||
response = client.call(msg)
|
||||
assert isinstance(response, messages.EthereumTxRequest)
|
||||
|
||||
nonce_commitment: Optional[bytes] = None
|
||||
|
||||
while True:
|
||||
if response.data_length is not None:
|
||||
data_length = response.data_length
|
||||
data, chunk = data[data_length:], data[:data_length]
|
||||
response = client.call(messages.EthereumTxAck(data_chunk=chunk))
|
||||
elif response.nonce_commitment is not None and use_anti_exfil:
|
||||
nonce_commitment = response.nonce_commitment
|
||||
response = client.call(
|
||||
messages.EthereumTxAck(data_chunk=b"", entropy=entropy)
|
||||
)
|
||||
elif response.signature_r is not None and response.signature_s is not None:
|
||||
break
|
||||
else:
|
||||
raise exceptions.TrezorException("Unexpected response")
|
||||
|
||||
assert response.signature_r is not None
|
||||
assert response.signature_s is not None
|
||||
|
||||
if use_anti_exfil:
|
||||
assert entropy is not None
|
||||
|
||||
# This function verifies that the signature includes the host's entropy and that its s value is less than half of the curve's order. However, it does not verify the signature itself, as trezorlib doesn't have the digest. The verification of the signature is the caller's responsibility.
|
||||
if nonce_commitment is None or not verify(
|
||||
None,
|
||||
response.signature_r + response.signature_s,
|
||||
None,
|
||||
entropy,
|
||||
nonce_commitment,
|
||||
):
|
||||
# This is a violation of the anti-exfil protocol.
|
||||
raise exceptions.TrezorException("Invalid signature")
|
||||
else:
|
||||
assert response.signature_v is not None
|
||||
# https://github.com/trezor/trezor-core/pull/311
|
||||
# only signature bit returned. recalculate signature_v
|
||||
if response.signature_v <= 1:
|
||||
response.signature_v += 2 * chain_id + 35
|
||||
|
||||
return (
|
||||
response.signature_v,
|
||||
response.signature_r,
|
||||
response.signature_s,
|
||||
entropy,
|
||||
nonce_commitment,
|
||||
)
|
||||
|
||||
|
||||
def sign_tx(
|
||||
client: "TrezorClient",
|
||||
n: "Address",
|
||||
@ -204,48 +304,164 @@ def sign_tx(
|
||||
definitions: Optional[messages.EthereumDefinitions] = None,
|
||||
chunkify: bool = False,
|
||||
) -> Tuple[int, bytes, bytes]:
|
||||
if chain_id is None:
|
||||
raise exceptions.TrezorException("Chain ID cannot be undefined")
|
||||
signature_v, signature_r, signature_s, entropy, nonce_commitment = sign_tx_common(
|
||||
client,
|
||||
n,
|
||||
nonce,
|
||||
gas_price,
|
||||
gas_limit,
|
||||
to,
|
||||
value,
|
||||
data,
|
||||
chain_id,
|
||||
tx_type,
|
||||
definitions,
|
||||
chunkify,
|
||||
False,
|
||||
None,
|
||||
)
|
||||
assert signature_v is not None
|
||||
return signature_v, signature_r, signature_s
|
||||
|
||||
msg = messages.EthereumSignTx(
|
||||
|
||||
def sign_tx_new(
|
||||
client: "TrezorClient",
|
||||
n: "Address",
|
||||
nonce: int,
|
||||
gas_price: int,
|
||||
gas_limit: int,
|
||||
to: str,
|
||||
value: int,
|
||||
data: Optional[bytes] = None,
|
||||
chain_id: Optional[int] = None,
|
||||
tx_type: Optional[int] = None,
|
||||
definitions: Optional[messages.EthereumDefinitions] = None,
|
||||
chunkify: bool = False,
|
||||
use_anti_exfil: bool = True,
|
||||
entropy: Optional[bytes] = None,
|
||||
) -> AntiExfilSignature:
|
||||
"""
|
||||
If `use_anti_exfil` is set to `True`, the anti-exfilitration protocol will be
|
||||
used. The purpose of this protocol is to prevent the device from leaking
|
||||
its secrets through the signatures. In this case, `SignedInput` objects
|
||||
will have non-emtpy fields `entropy` and `nonce_commitment`. It's the caller
|
||||
responsibility to verify the signature and the nonce commitment. The caller
|
||||
can optionally provide a list of entropies to be used in the protocol. Ideally,
|
||||
the caller should provide the same list of entropies if the signing is repeated
|
||||
due to a error to prevent the device to perform nonce-grinding attacks.
|
||||
"""
|
||||
_, signature_r, signature_s, entropy, nonce_commitment = sign_tx_common(
|
||||
client,
|
||||
n,
|
||||
nonce,
|
||||
gas_price,
|
||||
gas_limit,
|
||||
to,
|
||||
value,
|
||||
data,
|
||||
chain_id,
|
||||
tx_type,
|
||||
definitions,
|
||||
chunkify,
|
||||
use_anti_exfil,
|
||||
entropy,
|
||||
)
|
||||
return AntiExfilSignature(
|
||||
signature=signature_r + signature_s,
|
||||
entropy=entropy,
|
||||
nonce_commitment=nonce_commitment,
|
||||
)
|
||||
|
||||
|
||||
@session
|
||||
def sign_tx_eip1559_common(
|
||||
client: "TrezorClient",
|
||||
n: "Address",
|
||||
nonce: int,
|
||||
gas_limit: int,
|
||||
to: str,
|
||||
value: int,
|
||||
data: bytes,
|
||||
chain_id: int,
|
||||
max_gas_fee: int,
|
||||
max_priority_fee: int,
|
||||
access_list: Optional[List[messages.EthereumAccessList]],
|
||||
definitions: Optional[messages.EthereumDefinitions],
|
||||
chunkify: bool,
|
||||
use_anti_exfil: bool,
|
||||
entropy: Optional[bytes] = None,
|
||||
) -> Tuple[Optional[int], bytes, bytes, Optional[bytes], Optional[bytes]]:
|
||||
if use_anti_exfil:
|
||||
if entropy is None:
|
||||
entropy = generate_entropy()
|
||||
|
||||
length = len(data)
|
||||
data, chunk = data[1024:], data[:1024]
|
||||
msg = messages.EthereumSignTxEIP1559(
|
||||
address_n=n,
|
||||
nonce=int_to_big_endian(nonce),
|
||||
gas_price=int_to_big_endian(gas_price),
|
||||
gas_limit=int_to_big_endian(gas_limit),
|
||||
value=int_to_big_endian(value),
|
||||
to=to,
|
||||
chain_id=chain_id,
|
||||
tx_type=tx_type,
|
||||
max_gas_fee=int_to_big_endian(max_gas_fee),
|
||||
max_priority_fee=int_to_big_endian(max_priority_fee),
|
||||
access_list=access_list,
|
||||
data_length=length,
|
||||
data_initial_chunk=chunk,
|
||||
definitions=definitions,
|
||||
chunkify=chunkify,
|
||||
entropy_commitment=(
|
||||
commit_entropy(entropy) if use_anti_exfil and entropy is not None else None
|
||||
),
|
||||
)
|
||||
|
||||
if data is None:
|
||||
data = b""
|
||||
|
||||
msg.data_length = len(data)
|
||||
data, chunk = data[1024:], data[:1024]
|
||||
msg.data_initial_chunk = chunk
|
||||
|
||||
response = client.call(msg)
|
||||
assert isinstance(response, messages.EthereumTxRequest)
|
||||
|
||||
while response.data_length is not None:
|
||||
nonce_commitment: Optional[bytes] = None
|
||||
|
||||
while True:
|
||||
if response.data_length is not None:
|
||||
data_length = response.data_length
|
||||
data, chunk = data[data_length:], data[:data_length]
|
||||
response = client.call(messages.EthereumTxAck(data_chunk=chunk))
|
||||
assert isinstance(response, messages.EthereumTxRequest)
|
||||
elif response.nonce_commitment is not None and use_anti_exfil:
|
||||
nonce_commitment = response.nonce_commitment
|
||||
response = client.call(
|
||||
messages.EthereumTxAck(data_chunk=b"", entropy=entropy)
|
||||
)
|
||||
elif response.signature_r is not None and response.signature_s is not None:
|
||||
break
|
||||
else:
|
||||
raise exceptions.TrezorException("Unexpected response")
|
||||
|
||||
assert response.signature_v is not None
|
||||
assert response.signature_r is not None
|
||||
assert response.signature_s is not None
|
||||
|
||||
# https://github.com/trezor/trezor-core/pull/311
|
||||
# only signature bit returned. recalculate signature_v
|
||||
if response.signature_v <= 1:
|
||||
response.signature_v += 2 * chain_id + 35
|
||||
if use_anti_exfil:
|
||||
assert entropy is not None
|
||||
|
||||
return response.signature_v, response.signature_r, response.signature_s
|
||||
# This function verifies that the signature includes the host's entropy and that its s value is less than half of the curve's order. However, it does not verify the signature itself, as trezorlib doesn't have the digest. The verification of the signature is the caller's responsibility.
|
||||
if nonce_commitment is None or not verify(
|
||||
None,
|
||||
response.signature_r + response.signature_s,
|
||||
None,
|
||||
entropy,
|
||||
nonce_commitment,
|
||||
):
|
||||
# This is a violation of the anti-exfil protocol.
|
||||
raise exceptions.TrezorException("Invalid signature")
|
||||
else:
|
||||
assert response.signature_v is not None
|
||||
|
||||
return (
|
||||
response.signature_v,
|
||||
response.signature_r,
|
||||
response.signature_s,
|
||||
entropy,
|
||||
nonce_commitment,
|
||||
)
|
||||
|
||||
|
||||
@session
|
||||
@ -265,37 +481,80 @@ def sign_tx_eip1559(
|
||||
definitions: Optional[messages.EthereumDefinitions] = None,
|
||||
chunkify: bool = False,
|
||||
) -> Tuple[int, bytes, bytes]:
|
||||
length = len(data)
|
||||
data, chunk = data[1024:], data[:1024]
|
||||
msg = messages.EthereumSignTxEIP1559(
|
||||
address_n=n,
|
||||
nonce=int_to_big_endian(nonce),
|
||||
gas_limit=int_to_big_endian(gas_limit),
|
||||
value=int_to_big_endian(value),
|
||||
to=to,
|
||||
chain_id=chain_id,
|
||||
max_gas_fee=int_to_big_endian(max_gas_fee),
|
||||
max_priority_fee=int_to_big_endian(max_priority_fee),
|
||||
access_list=access_list,
|
||||
data_length=length,
|
||||
data_initial_chunk=chunk,
|
||||
definitions=definitions,
|
||||
chunkify=chunkify,
|
||||
signature_v, signature_r, signature_s, entropy, nonce_commitment = (
|
||||
sign_tx_eip1559_common(
|
||||
client,
|
||||
n,
|
||||
nonce,
|
||||
gas_limit,
|
||||
to,
|
||||
value,
|
||||
data,
|
||||
chain_id,
|
||||
max_gas_fee,
|
||||
max_priority_fee,
|
||||
access_list,
|
||||
definitions,
|
||||
chunkify,
|
||||
False,
|
||||
None,
|
||||
)
|
||||
)
|
||||
assert signature_v is not None
|
||||
return signature_v, signature_r, signature_s
|
||||
|
||||
response = client.call(msg)
|
||||
assert isinstance(response, messages.EthereumTxRequest)
|
||||
|
||||
while response.data_length is not None:
|
||||
data_length = response.data_length
|
||||
data, chunk = data[data_length:], data[:data_length]
|
||||
response = client.call(messages.EthereumTxAck(data_chunk=chunk))
|
||||
assert isinstance(response, messages.EthereumTxRequest)
|
||||
|
||||
assert response.signature_v is not None
|
||||
assert response.signature_r is not None
|
||||
assert response.signature_s is not None
|
||||
return response.signature_v, response.signature_r, response.signature_s
|
||||
@session
|
||||
def sign_tx_eip1559_new(
|
||||
client: "TrezorClient",
|
||||
n: "Address",
|
||||
*,
|
||||
nonce: int,
|
||||
gas_limit: int,
|
||||
to: str,
|
||||
value: int,
|
||||
data: bytes = b"",
|
||||
chain_id: int,
|
||||
max_gas_fee: int,
|
||||
max_priority_fee: int,
|
||||
access_list: Optional[List[messages.EthereumAccessList]] = None,
|
||||
definitions: Optional[messages.EthereumDefinitions] = None,
|
||||
chunkify: bool = False,
|
||||
use_anti_exfil: bool = True,
|
||||
entropy: Optional[bytes] = None,
|
||||
) -> AntiExfilSignature:
|
||||
"""
|
||||
If `use_anti_exfil` is set to `True`, the anti-exfilitration protocol will be
|
||||
used. The purpose of this protocol is to prevent the device from leaking
|
||||
its secrets through the signatures. In this case, `SignedInput` objects
|
||||
will have non-emtpy fields `entropy` and `nonce_commitment`. It's the caller
|
||||
responsibility to verify the signature and the nonce commitment. The caller
|
||||
can optionally provide a list of entropies to be used in the protocol. Ideally,
|
||||
the caller should provide the same list of entropies if the signing is repeated
|
||||
due to a error to prevent the device to perform nonce-grinding attacks.
|
||||
"""
|
||||
_, signature_r, signature_s, entropy, nonce_commitment = sign_tx_eip1559_common(
|
||||
client,
|
||||
n,
|
||||
nonce,
|
||||
gas_limit,
|
||||
to,
|
||||
value,
|
||||
data,
|
||||
chain_id,
|
||||
max_gas_fee,
|
||||
max_priority_fee,
|
||||
access_list,
|
||||
definitions,
|
||||
chunkify,
|
||||
use_anti_exfil,
|
||||
entropy,
|
||||
)
|
||||
return AntiExfilSignature(
|
||||
signature=signature_r + signature_s,
|
||||
entropy=entropy,
|
||||
nonce_commitment=nonce_commitment,
|
||||
)
|
||||
|
||||
|
||||
def sign_message(
|
||||
|
Loading…
Reference in New Issue
Block a user