1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-06-28 19:02:34 +00:00

refactor(core): introduce signature encoding and decoding

[no changelog]
This commit is contained in:
Ondřej Vejpustek 2025-03-19 17:38:41 +01:00
parent 07e90986aa
commit 8c807a16b4
15 changed files with 138 additions and 49 deletions

View File

@ -91,6 +91,8 @@ trezor.crypto.rlp
import trezor.crypto.rlp import trezor.crypto.rlp
trezor.crypto.scripts trezor.crypto.scripts
import trezor.crypto.scripts import trezor.crypto.scripts
trezor.crypto.signature
import trezor.crypto.signature
trezor.crypto.slip39 trezor.crypto.slip39
import trezor.crypto.slip39 import trezor.crypto.slip39
trezor.enums.AmountUnit trezor.enums.AmountUnit

View File

@ -13,6 +13,7 @@ async def sign_tx(envelope: BinanceSignTx, keychain: Keychain) -> BinanceSignedT
from trezor import wire from trezor import wire
from trezor.crypto.curve import secp256k1 from trezor.crypto.curve import secp256k1
from trezor.crypto.hashlib import sha256 from trezor.crypto.hashlib import sha256
from trezor.crypto.signature import encode_raw_signature
from trezor.enums import MessageType from trezor.enums import MessageType
from trezor.messages import ( from trezor.messages import (
BinanceCancelMsg, BinanceCancelMsg,
@ -59,6 +60,8 @@ async def sign_tx(envelope: BinanceSignTx, keychain: Keychain) -> BinanceSignedT
# generate_content_signature # generate_content_signature
msghash = sha256(msg_json.encode()).digest() msghash = sha256(msg_json.encode()).digest()
signature_bytes = secp256k1.sign_recoverable(node.private_key(), msghash)[1:65] signature_bytes = encode_raw_signature(
secp256k1.sign_recoverable(node.private_key(), msghash)
)
return BinanceSignedTx(signature=signature_bytes, public_key=node.public_key()) return BinanceSignedTx(signature=signature_bytes, public_key=node.public_key())

View File

@ -105,11 +105,11 @@ NONSEGWIT_INPUT_SCRIPT_TYPES = (
def ecdsa_sign(node: bip32.HDNode, digest: bytes) -> bytes: def ecdsa_sign(node: bip32.HDNode, digest: bytes) -> bytes:
from trezor.crypto import der
from trezor.crypto.curve import secp256k1 from trezor.crypto.curve import secp256k1
from trezor.crypto.signature import encode_der_signature
sig = secp256k1.sign_recoverable(node.private_key(), digest) sig = secp256k1.sign_recoverable(node.private_key(), digest)
sigder = der.encode_seq((sig[1:33], sig[33:65])) sigder = encode_der_signature(sig)
return sigder return sigder

View File

@ -8,12 +8,13 @@ if TYPE_CHECKING:
from apps.common.coininfo import CoinInfo from apps.common.coininfo import CoinInfo
from apps.common.keychain import Keychain from apps.common.keychain import Keychain
from trezor.crypto.signature import encode_bip137_signature
@with_keychain @with_keychain
async def sign_message( async def sign_message(
msg: SignMessage, keychain: Keychain, coin: CoinInfo msg: SignMessage, keychain: Keychain, coin: CoinInfo
) -> MessageSignature: ) -> MessageSignature:
from trezor import wire
from trezor.crypto.curve import secp256k1 from trezor.crypto.curve import secp256k1
from trezor.enums import InputScriptType from trezor.enums import InputScriptType
from trezor.messages import MessageSignature from trezor.messages import MessageSignature
@ -52,19 +53,9 @@ async def sign_message(
seckey = node.private_key() seckey = node.private_key()
digest = message_digest(coin, message) digest = message_digest(coin, message)
signature = secp256k1.sign_recoverable(seckey, digest) signature = encode_bip137_signature(
secp256k1.sign_recoverable(seckey, digest, False),
if script_type == InputScriptType.SPENDADDRESS: script_type if not msg.no_script_type else InputScriptType.SPENDADDRESS,
script_type_info = 0 )
elif script_type == InputScriptType.SPENDP2SHWITNESS:
script_type_info = 4
elif script_type == InputScriptType.SPENDWITNESS:
script_type_info = 8
else:
raise wire.ProcessError("Unsupported script type")
# Add script type information to the recovery byte.
if script_type_info != 0 and not msg.no_script_type:
signature = bytes([signature[0] + script_type_info]) + signature[1:]
return MessageSignature(address=address, signature=signature) return MessageSignature(address=address, signature=signature)

View File

@ -53,6 +53,7 @@ def _address_to_script_type(address: str, coin: CoinInfo) -> InputScriptType:
async def verify_message(msg: VerifyMessage) -> Success: async def verify_message(msg: VerifyMessage) -> Success:
from trezor import TR, utils from trezor import TR, utils
from trezor.crypto.curve import secp256k1 from trezor.crypto.curve import secp256k1
from trezor.crypto.signature import decode_bip137_signature
from trezor.enums import InputScriptType from trezor.enums import InputScriptType
from trezor.messages import Success from trezor.messages import Success
from trezor.ui.layouts import confirm_signverify, show_success from trezor.ui.layouts import confirm_signverify, show_success
@ -77,32 +78,40 @@ async def verify_message(msg: VerifyMessage) -> Success:
digest = message_digest(coin, message) digest = message_digest(coin, message)
script_type = _address_to_script_type(address, coin) address_script_type = _address_to_script_type(address, coin)
recid = signature[0] signature_script_type, recoverable_signature = decode_bip137_signature(signature)
if 27 <= recid <= 34:
# p2pkh or no script type provided
pass # use the script type from the address
elif 35 <= recid <= 38 and script_type == InputScriptType.SPENDP2SHWITNESS:
# segwit-in-p2sh
signature = bytes([signature[0] - 4]) + signature[1:]
elif 39 <= recid <= 42 and script_type == InputScriptType.SPENDWITNESS:
# native segwit
signature = bytes([signature[0] - 8]) + signature[1:]
else:
raise ProcessError("Invalid signature")
pubkey = secp256k1.verify_recover(signature, digest) if signature_script_type not in (
InputScriptType.SPENDADDRESS,
InputScriptType.SPENDADDRESS_UNCOMPRESSED,
):
if signature_script_type != address_script_type:
raise ProcessError("Invalid signature")
if signature_script_type == InputScriptType.SPENDP2SHWITNESS:
recoverable_signature = (
bytes([recoverable_signature[0] - 4]) + recoverable_signature[1:]
)
if signature_script_type == InputScriptType.SPENDWITNESS:
recoverable_signature = (
bytes([recoverable_signature[0] - 8]) + recoverable_signature[1:]
)
pubkey = secp256k1.verify_recover(
recoverable_signature,
digest,
)
if not pubkey: if not pubkey:
raise ProcessError("Invalid signature") raise ProcessError("Invalid signature")
if script_type == InputScriptType.SPENDADDRESS: if address_script_type == InputScriptType.SPENDADDRESS:
addr = address_pkh(pubkey, coin) addr = address_pkh(pubkey, coin)
if not utils.BITCOIN_ONLY and coin.cashaddr_prefix is not None: if not utils.BITCOIN_ONLY and coin.cashaddr_prefix is not None:
addr = address_to_cashaddr(addr, coin) addr = address_to_cashaddr(addr, coin)
elif script_type == InputScriptType.SPENDP2SHWITNESS: elif address_script_type == InputScriptType.SPENDP2SHWITNESS:
addr = address_p2wpkh_in_p2sh(pubkey, coin) addr = address_p2wpkh_in_p2sh(pubkey, coin)
elif script_type == InputScriptType.SPENDWITNESS: elif address_script_type == InputScriptType.SPENDWITNESS:
addr = address_p2wpkh(pubkey, coin) addr = address_p2wpkh(pubkey, coin)
else: else:
raise ProcessError("Invalid signature") raise ProcessError("Invalid signature")

View File

@ -14,6 +14,17 @@ def base58_encode(prefix: str, sig_prefix: str, data: bytes) -> str:
return prefix + b58 return prefix + b58
def encode_signature(recoverable_signature: bytes) -> str:
from trezor.crypto.signature import encode_bip137_signature
from trezor.enums import InputScriptType
return base58_encode(
"SIG_",
"K1",
encode_bip137_signature(recoverable_signature, InputScriptType.SPENDADDRESS),
)
def eos_name_to_string(value: int) -> str: def eos_name_to_string(value: int) -> str:
charmap = ".12345abcdefghijklmnopqrstuvwxyz" charmap = ".12345abcdefghijklmnopqrstuvwxyz"
tmp = value tmp = value

View File

@ -20,7 +20,7 @@ async def sign_tx(msg: EosSignTx, keychain: Keychain) -> EosSignedTx:
from apps.common import paths from apps.common import paths
from .actions import process_action from .actions import process_action
from .helpers import base58_encode from .helpers import encode_signature
from .layout import require_sign_tx from .layout import require_sign_tx
from .writers import write_bytes_fixed, write_header, write_uvarint from .writers import write_bytes_fixed, write_header, write_uvarint
@ -52,7 +52,7 @@ async def sign_tx(msg: EosSignTx, keychain: Keychain) -> EosSignedTx:
digest = sha.get_digest() digest = sha.get_digest()
signature = secp256k1.sign_recoverable( signature = secp256k1.sign_recoverable(
node.private_key(), digest, True, secp256k1.CANONICAL_SIG_EOS node.private_key(), digest, False, secp256k1.CANONICAL_SIG_EOS
) )
return EosSignedTx(signature=base58_encode("SIG_", "K1", signature)) return EosSignedTx(signature=encode_signature(signature))

View File

@ -221,3 +221,25 @@ def _from_bytes_bigendian_signed(b: bytes) -> int:
return -result - 1 return -result - 1
else: else:
return int.from_bytes(b, "big") return int.from_bytes(b, "big")
def encode_signature(recoverable_signature: bytes) -> bytes:
from trezor.crypto.signature import encode_bip137_signature
from trezor.enums import InputScriptType
signature = encode_bip137_signature(
recoverable_signature, InputScriptType.SPENDADDRESS_UNCOMPRESSED
)
return signature[1:] + signature[0:1]
def decode_signature(signature: bytes) -> bytes:
from trezor.crypto.signature import decode_bip137_signature
from trezor.enums import InputScriptType
script_type, recoverable_signature = decode_bip137_signature(
signature[-1:] + signature[:-1]
)
if script_type != InputScriptType.SPENDADDRESS_UNCOMPRESSED:
raise ValueError("Unsupported script type")
return recoverable_signature

View File

@ -35,7 +35,7 @@ async def sign_message(
from apps.common import paths from apps.common import paths
from apps.common.signverify import decode_message from apps.common.signverify import decode_message
from .helpers import address_from_bytes from .helpers import address_from_bytes, encode_signature
await paths.validate_path(keychain, msg.address_n) await paths.validate_path(keychain, msg.address_n)
@ -55,5 +55,5 @@ async def sign_message(
return EthereumMessageSignature( return EthereumMessageSignature(
address=address, address=address,
signature=signature[1:] + bytearray([signature[0]]), signature=encode_signature(signature),
) )

View File

@ -33,7 +33,7 @@ async def sign_typed_data(
from apps.common import paths from apps.common import paths
from .helpers import address_from_bytes from .helpers import address_from_bytes, encode_signature
from .layout import require_confirm_address from .layout import require_confirm_address
await paths.validate_path(keychain, msg.address_n) await paths.validate_path(keychain, msg.address_n)
@ -54,7 +54,7 @@ async def sign_typed_data(
return EthereumTypedDataSignature( return EthereumTypedDataSignature(
address=address_from_bytes(address_bytes, defs.network), address=address_from_bytes(address_bytes, defs.network),
signature=signature[1:] + signature[0:1], signature=encode_signature(signature),
) )

View File

@ -14,13 +14,13 @@ async def verify_message(msg: EthereumVerifyMessage) -> Success:
from apps.common.signverify import decode_message from apps.common.signverify import decode_message
from .helpers import address_from_bytes, bytes_from_address from .helpers import address_from_bytes, bytes_from_address, decode_signature
from .sign_message import message_digest from .sign_message import message_digest
digest = message_digest(msg.message) digest = message_digest(msg.message)
if len(msg.signature) != 65: if len(msg.signature) != 65:
raise DataError("Invalid signature") raise DataError("Invalid signature")
sig = bytearray([msg.signature[64]]) + msg.signature[:64] sig = decode_signature(msg.signature)
pubkey = secp256k1.verify_recover(sig, digest) pubkey = secp256k1.verify_recover(sig, digest)

View File

@ -11,9 +11,9 @@ if TYPE_CHECKING:
# NOTE: it is one big function because that way it is the most flash-space-efficient # NOTE: it is one big function because that way it is the most flash-space-efficient
@auto_keychain(__name__) @auto_keychain(__name__)
async def sign_tx(msg: RippleSignTx, keychain: Keychain) -> RippleSignedTx: async def sign_tx(msg: RippleSignTx, keychain: Keychain) -> RippleSignedTx:
from trezor.crypto import der
from trezor.crypto.curve import secp256k1 from trezor.crypto.curve import secp256k1
from trezor.crypto.hashlib import sha512 from trezor.crypto.hashlib import sha512
from trezor.crypto.signature import encode_der_signature
from trezor.messages import RippleSignedTx from trezor.messages import RippleSignedTx
from trezor.wire import ProcessError from trezor.wire import ProcessError
@ -55,7 +55,7 @@ async def sign_tx(msg: RippleSignTx, keychain: Keychain) -> RippleSignedTx:
# Signs and encodes signature into DER format # Signs and encodes signature into DER format
first_half_of_sha512 = sha512(to_sign).digest()[:32] first_half_of_sha512 = sha512(to_sign).digest()[:32]
sig = secp256k1.sign_recoverable(node.private_key(), first_half_of_sha512) sig = secp256k1.sign_recoverable(node.private_key(), first_half_of_sha512)
sig_encoded = der.encode_seq((sig[1:33], sig[33:65])) sig_encoded = encode_der_signature(sig)
tx = serialize(msg, source_address, node.public_key(), sig_encoded) tx = serialize(msg, source_address, node.public_key(), sig_encoded)
return RippleSignedTx(signature=sig_encoded, serialized_tx=tx) return RippleSignedTx(signature=sig_encoded, serialized_tx=tx)

View File

@ -7,6 +7,7 @@ import storage.device as storage_device
from trezor import utils from trezor import utils
from trezor.crypto import chacha20poly1305, der, hashlib, hmac, random from trezor.crypto import chacha20poly1305, der, hashlib, hmac, random
from trezor.crypto.curve import ed25519, nist256p1 from trezor.crypto.curve import ed25519, nist256p1
from trezor.crypto.signature import encode_der_signature
from apps.common import cbor, seed from apps.common import cbor, seed
from apps.common.paths import HARDENED from apps.common.paths import HARDENED
@ -95,7 +96,7 @@ class Credential:
for segment in data: for segment in data:
dig.update(segment) dig.update(segment)
sig = nist256p1.sign_recoverable(self._private_key(), dig.digest(), False) sig = nist256p1.sign_recoverable(self._private_key(), dig.digest(), False)
return der.encode_seq((sig[1:33], sig[33:])) return encode_der_signature(sig)
def bogus_signature(self) -> bytes: def bogus_signature(self) -> bytes:
raise NotImplementedError raise NotImplementedError

View File

@ -1304,13 +1304,13 @@ def _msg_register(req: Msg, dialog_mgr: DialogManager) -> Cmd:
def basic_attestation_sign(data: Iterable[bytes]) -> bytes: def basic_attestation_sign(data: Iterable[bytes]) -> bytes:
from trezor.crypto import der from trezor.crypto.signature import encode_der_signature
dig = hashlib.sha256() dig = hashlib.sha256()
for segment in data: for segment in data:
dig.update(segment) dig.update(segment)
sig = nist256p1.sign_recoverable(_FIDO_ATT_PRIV_KEY, dig.digest(), False) sig = nist256p1.sign_recoverable(_FIDO_ATT_PRIV_KEY, dig.digest(), False)
return der.encode_seq((sig[1:33], sig[33:])) return encode_der_signature(sig)
def _msg_register_sign(challenge: bytes, cred: U2fCredential) -> bytes: def _msg_register_sign(challenge: bytes, cred: U2fCredential) -> bytes:

View File

@ -0,0 +1,50 @@
from trezor.crypto.der import encode_seq
from trezor.enums import InputScriptType
def encode_der_signature(recoverable_signature: bytes) -> bytes:
assert len(recoverable_signature) == 65
return encode_seq((recoverable_signature[1:33], recoverable_signature[33:65]))
def encode_raw_signature(recoverable_signature: bytes) -> bytes:
assert len(recoverable_signature) == 65
return recoverable_signature[1:65]
def encode_bip137_signature(
recoverable_signature: bytes, script_type: InputScriptType
) -> bytes:
def get_script_type_number(script_type: InputScriptType) -> int:
if script_type == InputScriptType.SPENDADDRESS_UNCOMPRESSED:
return 0
if script_type == InputScriptType.SPENDADDRESS:
return 4
elif script_type == InputScriptType.SPENDP2SHWITNESS:
return 8
elif script_type == InputScriptType.SPENDWITNESS:
return 12
else:
raise ValueError("Unsupported script type")
assert len(recoverable_signature) == 65
header = get_script_type_number(script_type) + recoverable_signature[0]
return bytes([header]) + recoverable_signature[1:]
def decode_bip137_signature(signature: bytes) -> tuple[InputScriptType, bytes]:
def get_script_type(header: int) -> InputScriptType:
if 27 <= header <= 30:
return InputScriptType.SPENDADDRESS_UNCOMPRESSED
elif 31 <= header <= 34:
return InputScriptType.SPENDADDRESS
elif 35 <= header <= 38:
return InputScriptType.SPENDP2SHWITNESS
elif 39 <= header <= 42:
return InputScriptType.SPENDWITNESS
else:
raise ValueError("Unsupported script type")
assert len(signature) == 65
header = signature[0]
return get_script_type(header), bytes([header]) + signature[1:]