mirror of
https://github.com/trezor/trezor-firmware.git
synced 2024-12-18 04:18:10 +00:00
201 lines
6.9 KiB
Python
201 lines
6.9 KiB
Python
from common import unittest, await_result, H_
|
|
|
|
import storage.cache
|
|
from trezor import wire
|
|
from trezor.crypto.curve import secp256k1
|
|
from trezor.crypto.hashlib import sha256
|
|
from trezor.messages import AuthorizeCoinJoin
|
|
from trezor.messages import TxInput
|
|
from trezor.messages import TxOutput
|
|
from trezor.messages import SignTx
|
|
from trezor.messages import TxAckPaymentRequest
|
|
from trezor.enums import InputScriptType, OutputScriptType
|
|
from trezor.utils import HashWriter
|
|
|
|
from apps.common import coins
|
|
from apps.bitcoin.authorization import CoinJoinAuthorization
|
|
from apps.bitcoin.sign_tx.approvers import CoinJoinApprover
|
|
from apps.bitcoin.sign_tx.bitcoin import Bitcoin
|
|
from apps.bitcoin.sign_tx.tx_info import TxInfo
|
|
from apps.bitcoin import writers
|
|
|
|
|
|
class TestApprover(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.coin = coins.by_name('Bitcoin')
|
|
self.fee_per_anonymity_percent = 0.003
|
|
self.coordinator_name = "www.example.com"
|
|
|
|
self.msg_auth = AuthorizeCoinJoin(
|
|
coordinator=self.coordinator_name,
|
|
max_total_fee=40000,
|
|
fee_per_anonymity=int(self.fee_per_anonymity_percent * 10**9),
|
|
address_n=[H_(84), H_(0), H_(0)],
|
|
coin_name=self.coin.coin_name,
|
|
script_type=InputScriptType.SPENDWITNESS,
|
|
)
|
|
storage.cache.start_session()
|
|
|
|
def test_coinjoin_lots_of_inputs(self):
|
|
denomination = 10000000
|
|
|
|
# Other's inputs.
|
|
inputs = [
|
|
TxInput(
|
|
prev_hash=b"",
|
|
prev_index=0,
|
|
amount=denomination + 1000000 * (i + 1),
|
|
script_pubkey=bytes(22),
|
|
script_type=InputScriptType.EXTERNAL,
|
|
sequence=0xffffffff,
|
|
) for i in range(99)
|
|
]
|
|
|
|
# Our input.
|
|
inputs.insert(
|
|
30,
|
|
TxInput(
|
|
prev_hash=b"",
|
|
prev_index=0,
|
|
address_n=[H_(84), H_(0), H_(0), 0, 1],
|
|
amount=denomination + 1000000,
|
|
script_type=InputScriptType.SPENDWITNESS,
|
|
sequence=0xffffffff,
|
|
)
|
|
)
|
|
|
|
# Other's CoinJoined outputs.
|
|
outputs = [
|
|
TxOutput(
|
|
address="",
|
|
amount=denomination,
|
|
script_type=OutputScriptType.PAYTOWITNESS,
|
|
payment_req_index=0,
|
|
) for i in range(99)
|
|
]
|
|
|
|
# Our CoinJoined output.
|
|
outputs.insert(
|
|
40,
|
|
TxOutput(
|
|
address="",
|
|
address_n=[H_(84), H_(0), H_(0), 0, 2],
|
|
amount=denomination,
|
|
script_type=OutputScriptType.PAYTOWITNESS,
|
|
payment_req_index=0,
|
|
)
|
|
)
|
|
|
|
coordinator_fee = int(self.fee_per_anonymity_percent / 100 * len(outputs) * denomination)
|
|
fees = coordinator_fee + 10000
|
|
total_coordinator_fee = coordinator_fee * len(outputs)
|
|
|
|
# Other's change-outputs.
|
|
outputs.extend(
|
|
TxOutput(
|
|
address="",
|
|
amount=1000000 * (i + 1) - fees,
|
|
script_type=OutputScriptType.PAYTOWITNESS,
|
|
payment_req_index=0,
|
|
) for i in range(99)
|
|
)
|
|
|
|
# Our change-output.
|
|
outputs.append(
|
|
TxOutput(
|
|
address="",
|
|
address_n=[H_(84), H_(0), H_(0), 1, 1],
|
|
amount=1000000 - fees,
|
|
script_type=OutputScriptType.PAYTOWITNESS,
|
|
payment_req_index=0,
|
|
)
|
|
)
|
|
|
|
# Coordinator's output.
|
|
outputs.append(
|
|
TxOutput(
|
|
address="",
|
|
amount=total_coordinator_fee,
|
|
script_type=OutputScriptType.PAYTOWITNESS,
|
|
payment_req_index=0,
|
|
)
|
|
)
|
|
|
|
authorization = CoinJoinAuthorization(self.msg_auth)
|
|
tx = SignTx(outputs_count=len(outputs), inputs_count=len(inputs), coin_name=self.coin.coin_name, lock_time=0)
|
|
approver = CoinJoinApprover(tx, self.coin, authorization)
|
|
signer = Bitcoin(tx, None, self.coin, approver)
|
|
|
|
# Compute payment request signature.
|
|
# Private key of m/0h for "all all ... all" seed.
|
|
private_key = b'?S\ti\x8b\xc5o{,\xab\x03\x194\xea\xa8[_:\xeb\xdf\xce\xef\xe50\xf17D\x98`\xb9dj'
|
|
h_pr = HashWriter(sha256())
|
|
writers.write_bytes_fixed(h_pr, b"SL\x00\x24", 4)
|
|
writers.write_bytes_prefixed(h_pr, b"") # Empty nonce.
|
|
writers.write_bytes_prefixed(h_pr, self.coordinator_name.encode())
|
|
writers.write_compact_size(h_pr, 0) # No memos.
|
|
writers.write_uint32(h_pr, self.coin.slip44)
|
|
h_outputs = HashWriter(sha256())
|
|
for txo in outputs:
|
|
writers.write_uint64(h_outputs, txo.amount)
|
|
writers.write_bytes_prefixed(h_outputs, txo.address.encode())
|
|
writers.write_bytes_fixed(h_pr, h_outputs.get_digest(), 32)
|
|
signature = secp256k1.sign(private_key, h_pr.get_digest())
|
|
|
|
tx_ack_payment_req = TxAckPaymentRequest(
|
|
recipient_name=self.coordinator_name,
|
|
signature=signature,
|
|
)
|
|
|
|
for txi in inputs:
|
|
if txi.script_type == InputScriptType.EXTERNAL:
|
|
approver.add_external_input(txi)
|
|
else:
|
|
await_result(approver.add_internal_input(txi))
|
|
|
|
await_result(approver.add_payment_request(tx_ack_payment_req, None))
|
|
for txo in outputs:
|
|
if txo.address_n:
|
|
approver.add_change_output(txo, script_pubkey=bytes(22))
|
|
else:
|
|
await_result(approver.add_external_output(txo, script_pubkey=bytes(22)))
|
|
|
|
await_result(approver.approve_tx(TxInfo(signer, tx), []))
|
|
|
|
def test_coinjoin_input_account_depth_mismatch(self):
|
|
authorization = CoinJoinAuthorization(self.msg_auth)
|
|
tx = SignTx(outputs_count=201, inputs_count=100, coin_name=self.coin.coin_name, lock_time=0)
|
|
approver = CoinJoinApprover(tx, self.coin, authorization)
|
|
|
|
txi = TxInput(
|
|
prev_hash=b"",
|
|
prev_index=0,
|
|
address_n=[H_(49), H_(0), H_(0), 0],
|
|
amount=10000000,
|
|
script_type=InputScriptType.SPENDWITNESS
|
|
)
|
|
|
|
with self.assertRaises(wire.ProcessError):
|
|
await_result(approver.add_internal_input(txi))
|
|
|
|
def test_coinjoin_input_account_path_mismatch(self):
|
|
authorization = CoinJoinAuthorization(self.msg_auth)
|
|
tx = SignTx(outputs_count=201, inputs_count=100, coin_name=self.coin.coin_name, lock_time=0)
|
|
approver = CoinJoinApprover(tx, self.coin, authorization)
|
|
|
|
txi = TxInput(
|
|
prev_hash=b"",
|
|
prev_index=0,
|
|
address_n=[H_(49), H_(0), H_(0), 0, 2],
|
|
amount=10000000,
|
|
script_type=InputScriptType.SPENDWITNESS
|
|
)
|
|
|
|
with self.assertRaises(wire.ProcessError):
|
|
await_result(approver.add_internal_input(txi))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|