1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2024-12-05 14:08:39 +00:00
trezor-firmware/core/tests/test_apps.bitcoin.approver.py

227 lines
6.9 KiB
Python
Raw Normal View History

2024-11-15 16:31:22 +00:00
from common import * # isort:skip
import storage.cache_codec
from trezor import wire
from trezor.crypto import bip32
from trezor.enums import InputScriptType, OutputScriptType
from trezor.messages import (
AuthorizeCoinJoin,
CoinJoinRequest,
SignTx,
TxInput,
TxOutput,
)
from trezor.wire import context
from apps.bitcoin.authorization import FEE_RATE_DECIMALS, CoinJoinAuthorization
from apps.bitcoin.sign_tx.approvers import CoinJoinApprover
from apps.bitcoin.sign_tx.bitcoin import Bitcoin
from apps.bitcoin.sign_tx.tx_info import TxInfo
from apps.common import coins
2024-11-15 16:31:22 +00:00
from trezor.wire.codec.codec_context import CodecContext
if utils.USE_THP:
import thp_common
else:
import storage.cache_codec
class TestApprover(unittest.TestCase):
2024-11-15 16:31:22 +00:00
if utils.USE_THP:
2024-11-19 13:57:11 +00:00
def setUpClass(self):
if __debug__:
thp_common.suppres_debug_log()
thp_common.prepare_context()
else:
def setUpClass(self):
context.CURRENT_CONTEXT = CodecContext(None, bytearray(64))
def tearDownClass(self):
context.CURRENT_CONTEXT = None
def setUp(self):
2023-06-28 10:46:29 +00:00
self.coin = coins.by_name("Bitcoin")
self.fee_rate_percent = 0.3
2023-06-28 10:46:29 +00:00
self.no_fee_threshold = 1000000
self.min_registrable_amount = 5000
self.coordinator_name = "www.example.com"
self.node = bip32.HDNode(
depth=0,
fingerprint=0,
child_num=0,
chain_code=bytearray(32),
private_key=b"\x01" * 32,
curve_name="secp256k1",
)
self.msg_auth = AuthorizeCoinJoin(
coordinator=self.coordinator_name,
max_rounds=10,
2024-11-15 16:31:22 +00:00
max_coordinator_fee_rate=int(self.fee_rate_percent * 10**FEE_RATE_DECIMALS),
max_fee_per_kvbyte=7000,
address_n=[H_(10025), H_(0), H_(0), H_(1)],
coin_name=self.coin.coin_name,
script_type=InputScriptType.SPENDTAPROOT,
)
2024-11-15 16:31:22 +00:00
if not utils.USE_THP:
storage.cache_codec.start_session()
def make_coinjoin_request(self, inputs):
return CoinJoinRequest(
fee_rate=int(self.fee_rate_percent * 10**FEE_RATE_DECIMALS),
no_fee_threshold=self.no_fee_threshold,
min_registrable_amount=self.min_registrable_amount,
mask_public_key=bytearray(),
signature=bytearray(),
)
def test_coinjoin_lots_of_inputs(self):
denomination = 10_000_000
coordinator_fee = int(self.fee_rate_percent / 100 * denomination)
fees = coordinator_fee + 500
# Other's inputs.
inputs = [
TxInput(
prev_hash=bytes(32),
prev_index=0,
amount=denomination,
script_pubkey=bytes(22),
script_type=InputScriptType.EXTERNAL,
2023-06-28 10:46:29 +00:00
sequence=0xFFFFFFFF,
witness="",
2023-06-28 10:46:29 +00:00
)
for i in range(99)
]
# Our input.
2023-06-28 10:46:29 +00:00
inputs.insert(
30,
TxInput(
prev_hash=bytes(32),
prev_index=0,
address_n=[H_(10025), H_(0), H_(0), H_(1), 0, 1],
amount=denomination,
script_type=InputScriptType.SPENDTAPROOT,
2023-06-28 10:46:29 +00:00
sequence=0xFFFFFFFF,
),
)
# Other's CoinJoined outputs.
outputs = [
TxOutput(
address="",
2023-06-28 10:46:29 +00:00
amount=denomination - fees,
script_type=OutputScriptType.PAYTOTAPROOT,
payment_req_index=0,
2023-06-28 10:46:29 +00:00
)
for i in range(99)
]
# Our CoinJoined output.
outputs.insert(
40,
TxOutput(
address="",
address_n=[H_(10025), H_(0), H_(0), H_(1), 0, 2],
2023-06-28 10:46:29 +00:00
amount=denomination - fees,
script_type=OutputScriptType.PAYTOTAPROOT,
payment_req_index=0,
2023-06-28 10:46:29 +00:00
),
)
# Coordinator's output.
outputs.append(
TxOutput(
address="",
amount=coordinator_fee * len(outputs),
script_type=OutputScriptType.PAYTOTAPROOT,
payment_req_index=0,
)
)
coinjoin_req = self.make_coinjoin_request(inputs)
2023-06-28 10:46:29 +00:00
tx = SignTx(
outputs_count=len(outputs),
inputs_count=len(inputs),
coin_name=self.coin.coin_name,
lock_time=0,
coinjoin_request=coinjoin_req,
)
authorization = CoinJoinAuthorization(self.msg_auth)
approver = CoinJoinApprover(tx, self.coin, authorization)
signer = Bitcoin(tx, None, self.coin, approver)
tx_info = TxInfo(signer, tx)
for txi in inputs:
if txi.script_type == InputScriptType.EXTERNAL:
approver.add_external_input(txi)
else:
await_result(approver.add_internal_input(txi, self.node))
for txo in outputs:
if txo.address_n:
await_result(approver.add_change_output(txo, script_pubkey=bytes(22)))
else:
2024-11-15 16:31:22 +00:00
await_result(
approver.add_external_output(
txo, script_pubkey=bytes(22), tx_info=tx_info
)
)
await_result(approver.approve_tx(tx_info, [], None))
def test_coinjoin_input_account_depth_mismatch(self):
txi = TxInput(
prev_hash=bytes(32),
prev_index=0,
address_n=[H_(10025), H_(0), H_(0), H_(1), 0],
amount=10000000,
2023-06-28 10:46:29 +00:00
script_type=InputScriptType.SPENDTAPROOT,
)
coinjoin_req = self.make_coinjoin_request([txi])
2023-06-28 10:46:29 +00:00
tx = SignTx(
outputs_count=201,
inputs_count=100,
coin_name=self.coin.coin_name,
lock_time=0,
coinjoin_request=coinjoin_req,
)
authorization = CoinJoinAuthorization(self.msg_auth)
approver = CoinJoinApprover(tx, self.coin, authorization)
with self.assertRaises(wire.ProcessError):
await_result(approver.add_internal_input(txi, self.node))
def test_coinjoin_input_account_path_mismatch(self):
txi = TxInput(
prev_hash=bytes(32),
prev_index=0,
address_n=[H_(10025), H_(0), H_(1), H_(1), 0, 0],
amount=10000000,
2023-06-28 10:46:29 +00:00
script_type=InputScriptType.SPENDTAPROOT,
)
coinjoin_req = self.make_coinjoin_request([txi])
2023-06-28 10:46:29 +00:00
tx = SignTx(
outputs_count=201,
inputs_count=100,
coin_name=self.coin.coin_name,
lock_time=0,
coinjoin_request=coinjoin_req,
)
authorization = CoinJoinAuthorization(self.msg_auth)
approver = CoinJoinApprover(tx, self.coin, authorization)
with self.assertRaises(wire.ProcessError):
await_result(approver.add_internal_input(txi, self.node))
2023-06-28 10:46:29 +00:00
if __name__ == "__main__":
unittest.main()