test(core): fix THP tests failing on non-THP builds

[no changelog]
M1nd3r/thp1
M1nd3r 2 months ago
parent 65a295425b
commit 5f887f4457

@ -1,4 +1,4 @@
from common import * from common import * # isort:skip
from trezor import utils from trezor import utils
if utils.USE_THP: if utils.USE_THP:

@ -1,11 +1,11 @@
from common import * from common import * # isort:skip
from trezor import config, utils from trezor import config, log, utils
from trezor import log
if utils.USE_THP: if utils.USE_THP:
from apps.thp import credential_manager
from trezor.messages import ThpCredentialMetadata from trezor.messages import ThpCredentialMetadata
from apps.thp import credential_manager
def _issue_credential(host_name: str, host_static_pubkey: bytes) -> bytes: def _issue_credential(host_name: str, host_static_pubkey: bytes) -> bytes:
metadata = ThpCredentialMetadata(host_name=host_name) metadata = ThpCredentialMetadata(host_name=host_name)
return credential_manager.issue_credential(host_static_pubkey, metadata) return credential_manager.issue_credential(host_static_pubkey, metadata)

@ -1,79 +1,80 @@
from common import * from common import * # isort:skip
from trezorcrypto import aesgcm, curve25519
import storage import storage
from trezor import utils from trezor import utils
from trezor.wire.thp.crypto import IV_1, IV_2, Handshake
from trezorcrypto import aesgcm, curve25519
if utils.USE_THP: if utils.USE_THP:
from trezor.wire.thp import crypto from trezor.wire.thp import crypto
from trezor.wire.thp.crypto import IV_1, IV_2, Handshake
def get_dummy_device_secret():
def get_dummy_device_secret(): return b"\x01\x02\x03\x04\x05\x06\x07\x08\x01\x02\x03\x04\x05\x06\x07\x08"
return b"\x01\x02\x03\x04\x05\x06\x07\x08\x01\x02\x03\x04\x05\x06\x07\x08"
@unittest.skipUnless(utils.USE_THP, "only needed for THP") @unittest.skipUnless(utils.USE_THP, "only needed for THP")
class TestTrezorHostProtocolCrypto(unittest.TestCase): class TestTrezorHostProtocolCrypto(unittest.TestCase):
key_1 = b"\x00\x01\x02\x03\x04\x05\x06\x07\x00\x01\x02\x03\x04\x05\x06\x07\x00\x01\x02\x03\x04\x05\x06\x07\x00\x01\x02\x03\x04\x05\x06\x07" if utils.USE_THP:
handshake = Handshake() handshake = Handshake()
# 0:key, 1:nonce, 2:auth_data, 3:plaintext, 4:expected_ciphertext, 5:expected_tag key_1 = b"\x00\x01\x02\x03\x04\x05\x06\x07\x00\x01\x02\x03\x04\x05\x06\x07\x00\x01\x02\x03\x04\x05\x06\x07\x00\x01\x02\x03\x04\x05\x06\x07"
vectors_enc = [ # 0:key, 1:nonce, 2:auth_data, 3:plaintext, 4:expected_ciphertext, 5:expected_tag
( vectors_enc = [
key_1, (
0, key_1,
b"\x55\x64", 0,
b"\x00\x01\x02\x03\x04\05\x06\x07\x08\x09", b"\x55\x64",
b"e2c9dd152fbee5821ea7", b"\x00\x01\x02\x03\x04\05\x06\x07\x08\x09",
b"10625812de81b14a46b9f1e5100a6d0c", b"e2c9dd152fbee5821ea7",
), b"10625812de81b14a46b9f1e5100a6d0c",
( ),
key_1, (
1, key_1,
b"\x55\x64", 1,
b"\x00\x01\x02\x03\x04\05\x06\x07\x08\x09", b"\x55\x64",
b"79811619ddb07c2b99f8", b"\x00\x01\x02\x03\x04\05\x06\x07\x08\x09",
b"71c6b872cdc499a7e9a3c7441f053214", b"79811619ddb07c2b99f8",
), b"71c6b872cdc499a7e9a3c7441f053214",
( ),
key_1, (
369, key_1,
b"\x55\x64", 369,
b"\x00\x01\x02\x03\x04\05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f", b"\x55\x64",
b"03bd030390f2dfe815a61c2b157a064f", b"\x00\x01\x02\x03\x04\05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f",
b"c1200f8a7ae9a6d32cef0fff878d55c2", b"03bd030390f2dfe815a61c2b157a064f",
), b"c1200f8a7ae9a6d32cef0fff878d55c2",
( ),
key_1, (
369, key_1,
b"\x55\x64\x73\x82\x91", 369,
b"\x00\x01\x02\x03\x04\05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f", b"\x55\x64\x73\x82\x91",
b"03bd030390f2dfe815a61c2b157a064f", b"\x00\x01\x02\x03\x04\05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f",
b"693ac160cd93a20f7fc255f049d808d0", b"03bd030390f2dfe815a61c2b157a064f",
), b"693ac160cd93a20f7fc255f049d808d0",
] ),
# 0:chaining key, 1:input, 2:output_1, 3:output:2 ]
vectors_hkdf = [ # 0:chaining key, 1:input, 2:output_1, 3:output:2
( vectors_hkdf = [
crypto.PROTOCOL_NAME, (
b"\x01\x02", crypto.PROTOCOL_NAME,
b"c784373a217d6be057cddc6068e6748f255fc8beb6f99b7b90cbc64aad947514", b"\x01\x02",
b"12695451e29bf08ffe5e4e6ab734b0c3d7cdd99b16cd409f57bd4eaa874944ba", b"c784373a217d6be057cddc6068e6748f255fc8beb6f99b7b90cbc64aad947514",
), b"12695451e29bf08ffe5e4e6ab734b0c3d7cdd99b16cd409f57bd4eaa874944ba",
( ),
b"\xc7\x84\x37\x3a\x21\x7d\x6b\xe0\x57\xcd\xdc\x60\x68\xe6\x74\x8f\x25\x5f\xc8\xbe\xb6\xf9\x9b\x7b\x90\xcb\xc6\x4a\xad\x94\x75\x14", (
b"\x31\x41\x59\x26\x52\x12\x34\x56\x78\x89\x04\xaa", b"\xc7\x84\x37\x3a\x21\x7d\x6b\xe0\x57\xcd\xdc\x60\x68\xe6\x74\x8f\x25\x5f\xc8\xbe\xb6\xf9\x9b\x7b\x90\xcb\xc6\x4a\xad\x94\x75\x14",
b"f88c1e08d5c3bae8f6e4a3d3324c8cbc60a805603e399e69c4bf4eacb27c2f48", b"\x31\x41\x59\x26\x52\x12\x34\x56\x78\x89\x04\xaa",
b"5f0216bdb7110ee05372286974da8c9c8b96e2efa15b4af430755f462bd79a76", b"f88c1e08d5c3bae8f6e4a3d3324c8cbc60a805603e399e69c4bf4eacb27c2f48",
), b"5f0216bdb7110ee05372286974da8c9c8b96e2efa15b4af430755f462bd79a76",
] ),
vectors_iv = [ ]
(0, b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"), vectors_iv = [
(1, b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01"), (0, b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"),
(7, b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07"), (1, b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01"),
(1025, b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x01"), (7, b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07"),
(4294967295, b"\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff"), (1025, b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x01"),
(0xFFFFFFFFFFFFFFFF, b"\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff"), (4294967295, b"\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff"),
] (0xFFFFFFFFFFFFFFFF, b"\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff"),
]
def setUp(self): def setUp(self):
utils.DISABLE_ENCRYPTION = False utils.DISABLE_ENCRYPTION = False

@ -1,35 +1,36 @@
from common import * from common import * # isort:skip
from apps.thp import pairing
from storage.cache_common import (
CHANNEL_HANDSHAKE_HASH,
CHANNEL_KEY_RECEIVE,
CHANNEL_KEY_SEND,
CHANNEL_NONCE_RECEIVE,
CHANNEL_NONCE_SEND,
)
from trezor.enums import ThpPairingMethod, MessageType
from trezor.wire.errors import UnexpectedMessage
from trezor.wire.protocol_common import Message
from trezor.wire.thp.crypto import Handshake
from trezor.wire.thp.pairing_context import PairingContext
from trezor.messages import (
ThpCodeEntryChallenge,
ThpCodeEntryCpaceHost,
ThpCodeEntryTag,
ThpCredentialRequest,
ThpEndRequest,
ThpStartPairingRequest,
)
from trezor import io, config, log, protobuf
from trezor.loop import wait
from trezor.wire import thp_main
from trezor.wire.thp import interface_manager
from storage import cache_thp from storage import cache_thp
from trezor.wire.thp import ChannelState from trezor import config, io, log, protobuf, utils
from trezor.crypto import elligator2
from trezor.crypto.curve import curve25519 from trezor.crypto.curve import curve25519
from trezor.enums import MessageType
from trezor.loop import wait
from trezor.wire.errors import UnexpectedMessage
from trezor.wire.protocol_common import Message
if utils.USE_THP:
from storage.cache_common import (
CHANNEL_HANDSHAKE_HASH,
CHANNEL_KEY_RECEIVE,
CHANNEL_KEY_SEND,
CHANNEL_NONCE_RECEIVE,
CHANNEL_NONCE_SEND,
)
from trezor.crypto import elligator2
from trezor.enums import ThpPairingMethod
from trezor.messages import (
ThpCodeEntryChallenge,
ThpCodeEntryCpaceHost,
ThpCodeEntryTag,
ThpCredentialRequest,
ThpEndRequest,
ThpStartPairingRequest,
)
from trezor.wire import thp_main
from trezor.wire.thp import ChannelState, interface_manager
from trezor.wire.thp.crypto import Handshake
from trezor.wire.thp.pairing_context import PairingContext
from apps.thp import pairing
# Disable log.debug for the test # Disable log.debug for the test
log.debug = lambda name, msg, *args: None log.debug = lambda name, msg, *args: None
@ -61,6 +62,7 @@ def get_dummy_key() -> bytes:
return b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x20\x01\x02\x03\x04\x05\x06\x07\x08\x09\x30\x31" return b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x20\x01\x02\x03\x04\x05\x06\x07\x08\x09\x30\x31"
@unittest.skipUnless(utils.USE_THP, "only needed for THP")
class TestTrezorHostProtocol(unittest.TestCase): class TestTrezorHostProtocol(unittest.TestCase):
def setUp(self): def setUp(self):
self.interface = MockHID(0xDEADBEEF) self.interface = MockHID(0xDEADBEEF)

@ -1,32 +1,31 @@
from common import * from common import * # isort:skip
from trezor import utils from trezor import utils
if utils.USE_THP: if utils.USE_THP:
from trezor.wire.thp import writer from trezor.wire.thp import writer
from trezor.wire.thp.thp_messages import PacketHeader, ENCRYPTED_TRANSPORT from trezor.wire.thp.thp_messages import ENCRYPTED_TRANSPORT, PacketHeader
if __debug__: class MockHID:
# Disable log.debug for the test def __init__(self, num):
from trezor import log self.num = num
self.data = []
log.debug = lambda name, msg, *args: None def iface_num(self):
return self.num
def write(self, msg):
self.data.append(bytearray(msg))
return len(msg)
class MockHID: def wait_object(self, mode):
def __init__(self, num): return wait(mode | self.num)
self.num = num
self.data = []
def iface_num(self):
return self.num
def write(self, msg): if __debug__:
self.data.append(bytearray(msg)) # Disable log.debug for the test
return len(msg) from trezor import log
def wait_object(self, mode):
return wait(mode | self.num)
log.debug = lambda name, msg, *args: None
@unittest.skipUnless(utils.USE_THP, "only needed for THP") @unittest.skipUnless(utils.USE_THP, "only needed for THP")
class TestTrezorHostProtocolWriter(unittest.TestCase): class TestTrezorHostProtocolWriter(unittest.TestCase):
@ -86,6 +85,15 @@ class TestTrezorHostProtocolWriter(unittest.TestCase):
def setUp(self): def setUp(self):
self.interface = MockHID(0xDEADBEEF) self.interface = MockHID(0xDEADBEEF)
def test_write_empty_packet(self):
gen = writer.write_packet_to_wire(self.interface, b"")
with self.assertRaises(StopIteration):
gen.send(None)
gen.send(None)
print(self.interface.data[0])
self.assertEqual(len(self.interface.data), 1)
self.assertEqual(self.interface.data[0], b"")
def test_write_empty_payload(self): def test_write_empty_payload(self):
header = PacketHeader(ENCRYPTED_TRANSPORT, 4660, 4) header = PacketHeader(ENCRYPTED_TRANSPORT, 4660, 4)
gen = writer.write_payloads_to_wire(self.interface, header, (b"",)) gen = writer.write_payloads_to_wire(self.interface, header, (b"",))

@ -1,22 +1,24 @@
from common import * from common import * # isort:skip
import ustruct
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from storage.cache_thp import BROADCAST_CHANNEL_ID
import trezor.wire.thp
from trezor.wire.thp import alternating_bit_protocol as ABP
from trezor.wire.thp.writer import PACKET_LENGTH
from ubinascii import hexlify from ubinascii import hexlify
import ustruct
import trezor.wire.thp
from storage.cache_thp import BROADCAST_CHANNEL_ID
from trezor import io, log, utils from trezor import io, log, utils
from trezor.loop import wait from trezor.loop import wait
from trezor.utils import chunks from trezor.utils import chunks
from trezor.wire import thp_main
from trezor.wire.protocol_common import Message from trezor.wire.protocol_common import Message
from trezor.wire.thp import checksum
from trezor.wire.thp.checksum import CHECKSUM_LENGTH
# Disable log.debug for the test if utils.USE_THP:
log.debug = lambda name, msg, *args: None from trezor.wire import thp_main
from trezor.wire.thp import alternating_bit_protocol as ABP
from trezor.wire.thp import checksum
from trezor.wire.thp.checksum import CHECKSUM_LENGTH
from trezor.wire.thp.writer import PACKET_LENGTH
if __debug__:
# Disable log.debug for the test
log.debug = lambda name, msg, *args: None
if TYPE_CHECKING: if TYPE_CHECKING:
from trezorio import WireInterface from trezorio import WireInterface
@ -47,7 +49,8 @@ CONT = 0x80
HEADER_INIT_LENGTH = 5 HEADER_INIT_LENGTH = 5
HEADER_CONT_LENGTH = 3 HEADER_CONT_LENGTH = 3
INIT_MESSAGE_DATA_LENGTH = PACKET_LENGTH - HEADER_INIT_LENGTH - _MESSAGE_TYPE_LEN if utils.USE_THP:
INIT_MESSAGE_DATA_LENGTH = PACKET_LENGTH - HEADER_INIT_LENGTH - _MESSAGE_TYPE_LEN
def make_header(ctrl_byte, cid, length): def make_header(ctrl_byte, cid, length):
@ -89,6 +92,7 @@ async def deprecated_write_message(
# This test suite is an adaptation of test_trezor.wire.codec_v1 # This test suite is an adaptation of test_trezor.wire.codec_v1
@unittest.skipUnless(utils.USE_THP, "only needed for THP")
class TestWireTrezorHostProtocolV1(unittest.TestCase): class TestWireTrezorHostProtocolV1(unittest.TestCase):
def setUp(self): def setUp(self):
self.interface = MockHID(0xDEADBEEF) self.interface = MockHID(0xDEADBEEF)

Loading…
Cancel
Save