feat(core): Verify ownership proofs before transaction approval in BTC signing.

pull/2552/merge
Andrew Kozlik 2 years ago committed by Andrew Kozlik
parent 5b453c88ed
commit 8ef5e5120a

@ -15,7 +15,6 @@ from ..common import (
bip340_sign,
ecdsa_sign,
input_is_external,
input_is_external_unverified,
input_is_segwit,
)
from ..ownership import verify_nonownership
@ -108,6 +107,9 @@ class Bitcoin:
# indicates whether all internal inputs are Taproot
self.taproot_only = True
# indicates whether any external inputs are presigned
self.has_presigned = False
# transaction and signature serialization
_SERIALIZED_TX_BUFFER[:] = bytes()
self.serialized_tx = _SERIALIZED_TX_BUFFER
@ -159,7 +161,7 @@ class Bitcoin:
if input_is_external(txi):
self.external.add(i)
writers.write_tx_input_check(h_external_inputs_check, txi)
await self.process_external_input(txi)
await self.process_external_input(txi, script_pubkey)
else:
await self.process_internal_input(txi)
@ -198,21 +200,25 @@ class Bitcoin:
async def step3_verify_inputs(self) -> None:
# should come out the same as h_inputs_check, checked before continuing
h_check = HashWriter(sha256())
expected_digest = None
if self.taproot_only:
# All internal inputs are Taproot. We only need to verify external inputs. We can trust
# the amounts and scriptPubKeys, because if an invalid value is provided then all
# issued signatures will be invalid.
expected_digest = self.h_external_inputs
for i in range(self.tx_info.tx.inputs_count):
progress.advance()
if i in self.external:
txi = await helpers.request_tx_input(self.tx_req, i, self.coin)
writers.write_tx_input_check(h_check, txi)
if not input_is_external_unverified(txi):
# txi.script_pubkey checked in sanitize_tx_input
assert txi.script_pubkey is not None
await self.verify_external_input(i, txi, txi.script_pubkey)
# All internal inputs are Taproot. We only need to verify presigned external inputs.
# We can trust the amounts and scriptPubKeys, because if an invalid value is provided
# then all issued signatures will be invalid.
if self.has_presigned:
expected_digest = self.h_external_inputs
for i in range(self.tx_info.tx.inputs_count):
progress.advance()
if i in self.external:
txi = await helpers.request_tx_input(self.tx_req, i, self.coin)
writers.write_tx_input_check(h_check, txi)
if txi.witness or txi.script_sig:
# txi.script_pubkey checked in sanitize_tx_input
assert txi.script_pubkey is not None
await self.verify_presigned_external_input(
i, txi, txi.script_pubkey
)
else:
# There are internal non-Taproot inputs. We need to verify all inputs, because we can't
# trust any amounts or scriptPubKeys. If we did, then an attacker who provides invalid
@ -237,11 +243,11 @@ class Bitcoin:
if script_pubkey != self.input_derive_script(txi):
raise wire.DataError("Input does not match scriptPubKey")
if i in self.external and not input_is_external_unverified(txi):
await self.verify_external_input(i, txi, script_pubkey)
if i in self.external and (txi.witness or txi.script_sig):
await self.verify_presigned_external_input(i, txi, script_pubkey)
# check that the inputs were the same as those streamed for approval
if h_check.get_digest() != expected_digest:
if expected_digest and h_check.get_digest() != expected_digest:
raise wire.ProcessError("Transaction has changed during signing")
# verify the signature of one SIGHASH_ALL input in each original transaction
@ -302,8 +308,20 @@ class Bitcoin:
await self.approver.add_internal_input(txi)
async def process_external_input(self, txi: TxInput) -> None:
async def process_external_input(self, txi: TxInput, script_pubkey: bytes) -> None:
self.approver.add_external_input(txi)
if txi.witness or txi.script_sig:
self.has_presigned = True
if txi.ownership_proof:
if not verify_nonownership(
txi.ownership_proof,
script_pubkey,
txi.commitment_data,
self.keychain,
self.coin,
):
raise wire.DataError("Invalid external input")
async def process_original_input(self, txi: TxInput, script_pubkey: bytes) -> None:
assert txi.orig_hash is not None
@ -503,36 +521,26 @@ class Bitcoin:
digest, _, _ = await self.get_legacy_tx_digest(i, tx_info, script_pubkey)
return digest
async def verify_external_input(
async def verify_presigned_external_input(
self, i: int, txi: TxInput, script_pubkey: bytes
) -> None:
if txi.ownership_proof:
if not verify_nonownership(
txi.ownership_proof,
script_pubkey,
txi.commitment_data,
self.keychain,
self.coin,
):
raise wire.DataError("Invalid external input")
else:
verifier = SignatureVerifier(
script_pubkey, txi.script_sig, txi.witness, self.coin
)
verifier = SignatureVerifier(
script_pubkey, txi.script_sig, txi.witness, self.coin
)
verifier.ensure_hash_type(
(SigHashType.SIGHASH_ALL_TAPROOT, self.get_sighash_type(txi))
)
verifier.ensure_hash_type(
(SigHashType.SIGHASH_ALL_TAPROOT, self.get_sighash_type(txi))
)
tx_digest = await self.get_tx_digest(
i,
txi,
self.tx_info,
verifier.public_keys,
verifier.threshold,
script_pubkey,
)
verifier.verify(tx_digest)
tx_digest = await self.get_tx_digest(
i,
txi,
self.tx_info,
verifier.public_keys,
verifier.threshold,
script_pubkey,
)
verifier.verify(tx_digest)
async def serialize_external_input(self, i: int) -> None:
txi = await helpers.request_tx_input(self.tx_req, i, self.coin)

@ -699,7 +699,6 @@ def test_p2tr_with_proof(client: Client):
messages.ButtonRequest(code=B.ConfirmOutput),
messages.ButtonRequest(code=B.SignTx),
request_input(0),
request_input(0),
request_input(1),
request_output(0),
request_input(1),
@ -759,19 +758,6 @@ def test_p2wpkh_with_false_proof(client: Client):
[
request_input(0),
request_input(1),
request_output(0),
messages.ButtonRequest(code=B.ConfirmOutput),
messages.ButtonRequest(code=B.SignTx),
request_input(0),
request_meta(TXHASH_70f987),
request_input(0, TXHASH_70f987),
request_output(0, TXHASH_70f987),
request_output(1, TXHASH_70f987),
request_input(1),
request_meta(TXHASH_65b768),
request_input(0, TXHASH_65b768),
request_output(0, TXHASH_65b768),
request_output(1, TXHASH_65b768),
messages.Failure(code=messages.FailureType.DataError),
]
)

Loading…
Cancel
Save