|
|
|
@ -3,7 +3,6 @@ import ustruct
|
|
|
|
|
from typing import TYPE_CHECKING
|
|
|
|
|
from ubinascii import hexlify
|
|
|
|
|
|
|
|
|
|
import trezor.wire.thp
|
|
|
|
|
from storage.cache_thp import BROADCAST_CHANNEL_ID
|
|
|
|
|
from trezor import io, log, utils
|
|
|
|
|
from trezor.loop import wait
|
|
|
|
@ -11,6 +10,7 @@ from trezor.utils import chunks
|
|
|
|
|
from trezor.wire.protocol_common import Message
|
|
|
|
|
|
|
|
|
|
if utils.USE_THP:
|
|
|
|
|
import trezor.wire.thp
|
|
|
|
|
from trezor.wire import thp_main
|
|
|
|
|
from trezor.wire.thp import alternating_bit_protocol as ABP
|
|
|
|
|
from trezor.wire.thp import checksum
|
|
|
|
@ -69,10 +69,6 @@ def makeCidRequest(header, message_data):
|
|
|
|
|
return header + message_data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def printBytes(a):
|
|
|
|
|
print(hexlify(a).decode("utf-8"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def getPlaintext() -> bytes:
|
|
|
|
|
if ABP.get_expected_receive_seq_bit(THP.get_active_session()) == 1:
|
|
|
|
|
return PLAINTEXT_1
|
|
|
|
@ -96,8 +92,6 @@ async def deprecated_write_message(
|
|
|
|
|
class TestWireTrezorHostProtocolV1(unittest.TestCase):
|
|
|
|
|
def setUp(self):
|
|
|
|
|
self.interface = MockHID(0xDEADBEEF)
|
|
|
|
|
if not utils.USE_THP:
|
|
|
|
|
import storage.cache_thp # noqa: F401
|
|
|
|
|
|
|
|
|
|
def _simple(self):
|
|
|
|
|
cid_req_header = make_header(
|
|
|
|
@ -252,7 +246,7 @@ class TestWireTrezorHostProtocolV1(unittest.TestCase):
|
|
|
|
|
query = gen.send(None)
|
|
|
|
|
for packet in self.interface.data:
|
|
|
|
|
self.assertObjectEqual(query, self.interface.wait_object(io.POLL_READ))
|
|
|
|
|
printBytes(packet)
|
|
|
|
|
print(utils.get_bytes_as_str(packet))
|
|
|
|
|
query = gen.send(packet)
|
|
|
|
|
|
|
|
|
|
with self.assertRaises(StopIteration) as e:
|
|
|
|
|