from common import unittest, await_result, H_ import storage.cache from trezor import wire from trezor.crypto import bip32 from trezor.crypto.curve import bip340, secp256k1 from trezor.crypto.hashlib import sha256 from trezor.messages import AuthorizeCoinJoin from trezor.messages import TxInput from trezor.messages import TxOutput from trezor.messages import SignTx from trezor.messages import CoinJoinRequest from trezor.enums import InputScriptType, OutputScriptType from trezor.utils import HashWriter from apps.common import coins from apps.bitcoin.authorization import FEE_RATE_DECIMALS, CoinJoinAuthorization from apps.bitcoin.sign_tx.approvers import CoinJoinApprover from apps.bitcoin.sign_tx.bitcoin import Bitcoin from apps.bitcoin.sign_tx.tx_info import TxInfo from apps.bitcoin import writers class TestApprover(unittest.TestCase): def setUp(self): self.coin = coins.by_name('Bitcoin') self.fee_rate_percent = 0.3 self.no_fee_threshold=1000000 self.min_registrable_amount=5000 self.coordinator_name = "www.example.com" # Private key for signing and masking CoinJoin requests. # m/0h for "all all ... all" seed. self.private_key = b'?S\ti\x8b\xc5o{,\xab\x03\x194\xea\xa8[_:\xeb\xdf\xce\xef\xe50\xf17D\x98`\xb9dj' self.node = bip32.HDNode( depth=0, fingerprint=0, child_num=0, chain_code=bytearray(32), private_key=b"\x01" * 32, curve_name="secp256k1", ) self.tweaked_node_pubkey = b"\x02" + bip340.tweak_public_key(self.node.public_key()[1:]) self.msg_auth = AuthorizeCoinJoin( coordinator=self.coordinator_name, max_rounds=10, max_coordinator_fee_rate=int(self.fee_rate_percent * 10**FEE_RATE_DECIMALS), max_fee_per_kvbyte=7000, address_n=[H_(10025), H_(0), H_(0), H_(1)], coin_name=self.coin.coin_name, script_type=InputScriptType.SPENDTAPROOT, ) storage.cache.start_session() def make_coinjoin_request(self, inputs): mask_public_key = secp256k1.publickey(self.private_key) coinjoin_flags = bytearray() for txi in inputs: shared_secret = secp256k1.multiply(self.private_key, self.tweaked_node_pubkey)[1:33] h_mask = HashWriter(sha256()) writers.write_bytes_fixed(h_mask, shared_secret, 32) writers.write_bytes_reversed(h_mask, txi.prev_hash, writers.TX_HASH_SIZE) writers.write_uint32(h_mask, txi.prev_index) mask = h_mask.get_digest()[0] & 1 signable = txi.script_type == InputScriptType.SPENDTAPROOT txi.coinjoin_flags = signable ^ mask coinjoin_flags.append(txi.coinjoin_flags) # Compute CoinJoin request signature. h_request = HashWriter(sha256(b"CJR1")) writers.write_bytes_prefixed( h_request, self.coordinator_name.encode() ) writers.write_uint32(h_request, self.coin.slip44) writers.write_uint32(h_request, int(self.fee_rate_percent * 10**FEE_RATE_DECIMALS)) writers.write_uint64(h_request, self.no_fee_threshold) writers.write_uint64(h_request, self.min_registrable_amount) writers.write_bytes_fixed(h_request, mask_public_key, 33) writers.write_bytes_prefixed(h_request, coinjoin_flags) writers.write_bytes_fixed(h_request, sha256().digest(), 32) writers.write_bytes_fixed(h_request, sha256().digest(), 32) signature = secp256k1.sign(self.private_key, h_request.get_digest()) return CoinJoinRequest( fee_rate=int(self.fee_rate_percent * 10**FEE_RATE_DECIMALS), no_fee_threshold=self.no_fee_threshold, min_registrable_amount=self.min_registrable_amount, mask_public_key=mask_public_key, signature=signature, ) def test_coinjoin_lots_of_inputs(self): denomination = 10_000_000 coordinator_fee = int(self.fee_rate_percent / 100 * denomination) fees = coordinator_fee + 500 # Other's inputs. inputs = [ TxInput( prev_hash=bytes(32), prev_index=0, amount=denomination, script_pubkey=bytes(22), script_type=InputScriptType.EXTERNAL, sequence=0xffffffff, witness="", ) for i in range(99) ] # Our input. inputs.insert(30, TxInput( prev_hash=bytes(32), prev_index=0, address_n=[H_(10025), H_(0), H_(0), H_(1), 0, 1], amount=denomination, script_type=InputScriptType.SPENDTAPROOT, sequence=0xffffffff, ) ) # Other's CoinJoined outputs. outputs = [ TxOutput( address="", amount=denomination-fees, script_type=OutputScriptType.PAYTOTAPROOT, payment_req_index=0, ) for i in range(99) ] # Our CoinJoined output. outputs.insert( 40, TxOutput( address="", address_n=[H_(10025), H_(0), H_(0), H_(1), 0, 2], amount=denomination-fees, script_type=OutputScriptType.PAYTOTAPROOT, payment_req_index=0, ) ) # Coordinator's output. outputs.append( TxOutput( address="", amount=coordinator_fee * len(outputs), script_type=OutputScriptType.PAYTOTAPROOT, payment_req_index=0, ) ) coinjoin_req = self.make_coinjoin_request(inputs) tx = SignTx(outputs_count=len(outputs), inputs_count=len(inputs), coin_name=self.coin.coin_name, lock_time=0, coinjoin_request=coinjoin_req) authorization = CoinJoinAuthorization(self.msg_auth) approver = CoinJoinApprover(tx, self.coin, authorization) signer = Bitcoin(tx, None, self.coin, approver) for txi in inputs: if txi.script_type == InputScriptType.EXTERNAL: approver.add_external_input(txi) else: await_result(approver.add_internal_input(txi, self.node)) for txo in outputs: if txo.address_n: await_result(approver.add_change_output(txo, script_pubkey=bytes(22))) else: await_result(approver.add_external_output(txo, script_pubkey=bytes(22))) await_result(approver.approve_tx(TxInfo(signer, tx), [])) def test_coinjoin_input_account_depth_mismatch(self): txi = TxInput( prev_hash=bytes(32), prev_index=0, address_n=[H_(10025), H_(0), H_(0), H_(1), 0], amount=10000000, script_type=InputScriptType.SPENDTAPROOT ) coinjoin_req = self.make_coinjoin_request([txi]) tx = SignTx(outputs_count=201, inputs_count=100, coin_name=self.coin.coin_name, lock_time=0, coinjoin_request=coinjoin_req) authorization = CoinJoinAuthorization(self.msg_auth) approver = CoinJoinApprover(tx, self.coin, authorization) with self.assertRaises(wire.ProcessError): await_result(approver.add_internal_input(txi, self.node)) def test_coinjoin_input_account_path_mismatch(self): txi = TxInput( prev_hash=bytes(32), prev_index=0, address_n=[H_(10025), H_(0), H_(1), H_(1), 0, 0], amount=10000000, script_type=InputScriptType.SPENDTAPROOT ) coinjoin_req = self.make_coinjoin_request([txi]) tx = SignTx(outputs_count=201, inputs_count=100, coin_name=self.coin.coin_name, lock_time=0, coinjoin_request=coinjoin_req) authorization = CoinJoinAuthorization(self.msg_auth) approver = CoinJoinApprover(tx, self.coin, authorization) with self.assertRaises(wire.ProcessError): await_result(approver.add_internal_input(txi, self.node)) if __name__ == '__main__': unittest.main()