1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-01-11 07:50:57 +00:00

core: use wire errors instead of ValueErrors where applicable

This commit is contained in:
Tomas Susanka 2020-02-25 11:10:40 +00:00
parent 6722215c66
commit 495a59c282
8 changed files with 71 additions and 28 deletions

View File

@ -1,3 +1,4 @@
from trezor import wire
from trezor.crypto.curve import secp256k1
from trezor.crypto.hashlib import sha256
from trezor.messages import MessageType
@ -14,7 +15,7 @@ from apps.common import paths
async def sign_tx(ctx, envelope, keychain):
# create transaction message -> sign it -> create signature/pubkey message -> serialize all
if envelope.msg_count > 1:
raise ValueError("Multiple messages not supported")
raise wire.DataError("Multiple messages not supported.")
await paths.validate_path(
ctx, helpers.validate_full_path, keychain, envelope.address_n, CURVE
)
@ -30,6 +31,9 @@ async def sign_tx(ctx, envelope, keychain):
MessageType.BinanceTransferMsg,
)
if envelope.source is None or envelope.source < 0:
raise wire.DataError("Source missing or invalid.")
msg_json = helpers.produce_json_for_signing(envelope, msg)
if isinstance(msg, BinanceTransferMsg):

View File

@ -1,8 +1,3 @@
if False:
from typing import Tuple
from apps.common.coininfo import CoinType
def length(address_type: int) -> int:
if address_type <= 0xFF:
return 1
@ -24,19 +19,5 @@ def check(address_type: int, raw_address: bytes) -> bool:
def strip(address_type: int, raw_address: bytes) -> bytes:
if not check(address_type, raw_address):
raise ValueError("Invalid address")
raise ValueError
return raw_address[length(address_type) :]
def split(coin: CoinType, raw_address: bytes) -> Tuple[bytes, bytes]:
for f in (
"address_type",
"address_type_p2sh",
"address_type_p2wpkh",
"address_type_p2wsh",
):
at = getattr(coin, f)
if at is not None and check(at, raw_address):
l = length(at)
return raw_address[:l], raw_address[l:]
raise ValueError("Invalid address")

View File

@ -33,7 +33,7 @@ async def load_device(ctx, msg):
elif share.group_count > 1:
backup_type = BackupType.Slip39_Advanced
else:
raise RuntimeError("Invalid group count")
raise wire.ProcessError("Invalid group count")
storage.device.set_slip39_identifier(identifier)
storage.device.set_slip39_iteration_exponent(iteration_exponent)

View File

@ -1,3 +1,4 @@
from trezor import wire
from trezor.crypto.curve import ed25519
from trezor.messages.NEMSignedTx import NEMSignedTx
from trezor.messages.NEMSignTx import NEMSignTx
@ -52,7 +53,7 @@ async def sign_tx(ctx, msg: NEMSignTx, keychain):
ctx, public_key, common, msg.importance_transfer
)
else:
raise ValueError("No transaction provided")
raise wire.DataError("No transaction provided")
if msg.multisig:
# wrap transaction in multisig wrapper

View File

@ -77,3 +77,7 @@ def validate(msg: RippleSignTx):
raise ProcessError(
"Some of the required fields are missing (fee, sequence, payment.amount, payment.destination)"
)
if msg.payment.amount < 0:
raise ProcessError("Only non-negative amounts are allowed.")
if msg.payment.amount > helpers.MAX_ALLOWED_AMOUNT:
raise ProcessError("Amount exceeds maximum allowed amount.")

View File

@ -1,5 +1,6 @@
from ustruct import pack, unpack
from trezor import wire
from trezor.crypto.hashlib import sha256
from trezor.messages.ECDHSessionKey import ECDHSessionKey
from trezor.ui.text import Text
@ -62,9 +63,9 @@ def ecdh(seckey: bytes, peer_public_key: bytes, curve: str) -> bytes:
from trezor.crypto.curve import curve25519
if peer_public_key[0] != 0x40:
raise ValueError("Curve25519 public key should start with 0x40")
raise wire.DataError("Curve25519 public key should start with 0x40")
session_key = b"\x04" + curve25519.multiply(seckey, peer_public_key[1:])
else:
raise ValueError("Unsupported curve for ECDH: " + curve)
raise wire.DataError("Unsupported curve for ECDH: " + curve)
return session_key

View File

@ -342,7 +342,7 @@ async def sign_tx(tx: SignTx, keychain: seed.Keychain):
addresses.ecdsa_hash_pubkey(key_sign_pub, coin)
)
else:
raise ValueError("Unknown input script type")
raise SigningError("Unsupported input script type")
h_witness = utils.HashWriter(blake256())
writers.write_uint32(
@ -730,10 +730,13 @@ def output_derive_script(
elif version == cashaddr.ADDRESS_TYPE_P2SH:
version = coin.address_type_p2sh
else:
raise ValueError("Unknown cashaddr address type")
raise SigningError("Unknown cashaddr address type")
raw_address = bytes([version]) + data
else:
raw_address = base58.decode_check(o.address, coin.b58_hash)
try:
raw_address = base58.decode_check(o.address, coin.b58_hash)
except ValueError:
raise SigningError(FailureType.DataError, "Invalid address")
if address_type.check(coin.address_type, raw_address):
# p2pkh

View File

@ -17,6 +17,7 @@ from trezor.messages import OutputScriptType
from apps.common import coins
from apps.common.seed import Keychain
from apps.wallet.sign_tx import helpers, signing
from apps.wallet.sign_tx.signing import SigningError
class TestSignSegwitTxNativeP2WPKH(unittest.TestCase):
@ -214,6 +215,54 @@ class TestSignSegwitTxNativeP2WPKH(unittest.TestCase):
with self.assertRaises(StopIteration):
signer.send(None)
def test_send_native_invalid_address(self):
coin = coins.by_name('Testnet')
seed = bip39.seed(' '.join(['all'] * 12), '')
inp1 = TxInputType(
# 49'/1'/0'/0/0" - tb1qqzv60m9ajw8drqulta4ld4gfx0rdh82un5s65s
address_n=[49 | 0x80000000, 1 | 0x80000000, 0 | 0x80000000, 0, 0],
amount=12300000,
prev_hash=unhexlify('09144602765ce3dd8f4329445b20e3684e948709c5cdcaf12da3bb079c99448a'),
prev_index=0,
script_type=InputScriptType.SPENDWITNESS,
sequence=0xffffffff,
multisig=None,
)
out1 = TxOutputType(
address='TB1Q694CCP5QCC0UDMFWGP692U2S2HJPQ5H407URTU', # Error: should be lower case
script_type=OutputScriptType.PAYTOADDRESS,
amount=12300000 - 11000 - 5000000,
address_n=[],
multisig=None,
)
tx = SignTx(coin_name='Testnet', version=None, lock_time=None, inputs_count=1, outputs_count=1)
messages = [
None,
# check fee
TxRequest(request_type=TXINPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None)),
TxAck(tx=TransactionType(inputs=[inp1])),
helpers.UiConfirmForeignAddress(address_n=inp1.address_n),
True,
TxRequest(request_type=TXOUTPUT, details=TxRequestDetailsType(request_index=0, tx_hash=None), serialized=None),
TxAck(tx=TransactionType(outputs=[out1])),
None
]
keychain = Keychain(seed, [[coin.curve_name]])
signer = signing.sign_tx(tx, keychain)
for request, response in chunks(messages, 2):
if response is None:
with self.assertRaises(SigningError):
signer.send(request)
else:
self.assertEqual(signer.send(request), response)
if __name__ == '__main__':
unittest.main()