mirror of
https://github.com/trezor/trezor-firmware.git
synced 2025-04-28 04:59:01 +00:00
chore(python): test fallback in handshake
This commit is contained in:
parent
9e7c842f5b
commit
793f60eee9
@ -171,9 +171,12 @@ class ProtocolV2Channel(Channel):
|
|||||||
header, payload = self._read_until_valid_crc_check()
|
header, payload = self._read_until_valid_crc_check()
|
||||||
self._send_ack_0()
|
self._send_ack_0()
|
||||||
|
|
||||||
if header.ctrl_byte == 0x42:
|
if control_byte.is_error(header.ctrl_byte):
|
||||||
if payload == b"\x05":
|
if payload == b"\x05":
|
||||||
raise exceptions.DeviceLockedException()
|
raise exceptions.DeviceLockedException()
|
||||||
|
else:
|
||||||
|
err = _get_error_from_int(payload[0])
|
||||||
|
raise Exception("Received ThpError: " + err)
|
||||||
|
|
||||||
if not header.is_handshake_init_response():
|
if not header.is_handshake_init_response():
|
||||||
LOG.error("Received message is not a valid handshake init response message")
|
LOG.error("Received message is not a valid handshake init response message")
|
||||||
@ -214,6 +217,9 @@ class ProtocolV2Channel(Channel):
|
|||||||
header, data = self._read_until_valid_crc_check()
|
header, data = self._read_until_valid_crc_check()
|
||||||
if not header.is_handshake_comp_response():
|
if not header.is_handshake_comp_response():
|
||||||
LOG.error("Received message is not a valid handshake completion response")
|
LOG.error("Received message is not a valid handshake completion response")
|
||||||
|
if control_byte.is_error(header.ctrl_byte):
|
||||||
|
err = _get_error_from_int(data[0])
|
||||||
|
raise Exception("Received ThpError: " + err)
|
||||||
trezor_state = self._noise.decrypt(bytes(data))
|
trezor_state = self._noise.decrypt(bytes(data))
|
||||||
# TODO handle trezor_state
|
# TODO handle trezor_state
|
||||||
print("trezor state:", trezor_state)
|
print("trezor state:", trezor_state)
|
||||||
@ -243,6 +249,9 @@ class ProtocolV2Channel(Channel):
|
|||||||
header, payload = self._read_until_valid_crc_check()
|
header, payload = self._read_until_valid_crc_check()
|
||||||
if not header.is_ack() or len(payload) > 0:
|
if not header.is_ack() or len(payload) > 0:
|
||||||
LOG.error("Received message is not a valid ACK")
|
LOG.error("Received message is not a valid ACK")
|
||||||
|
if control_byte.is_error(header.ctrl_byte):
|
||||||
|
err = _get_error_from_int(payload[0])
|
||||||
|
raise Exception("Received ThpError: " + err)
|
||||||
|
|
||||||
def _send_ack_0(self):
|
def _send_ack_0(self):
|
||||||
LOG.debug("sending ack 0")
|
LOG.debug("sending ack 0")
|
||||||
|
@ -13,6 +13,7 @@ def test_multiple_hosts(client: Client) -> None:
|
|||||||
assert isinstance(client.protocol, ProtocolV2Channel)
|
assert isinstance(client.protocol, ProtocolV2Channel)
|
||||||
protocol_1 = client.protocol
|
protocol_1 = client.protocol
|
||||||
protocol_2 = ProtocolV2Channel(protocol_1.transport, protocol_1.mapping)
|
protocol_2 = ProtocolV2Channel(protocol_1.transport, protocol_1.mapping)
|
||||||
|
protocol_2._reset_sync_bits()
|
||||||
|
|
||||||
nonce_1 = os.urandom(8)
|
nonce_1 = os.urandom(8)
|
||||||
nonce_2 = os.urandom(8)
|
nonce_2 = os.urandom(8)
|
||||||
@ -34,7 +35,29 @@ def test_multiple_hosts(client: Client) -> None:
|
|||||||
protocol_1._read_ack()
|
protocol_1._read_ack()
|
||||||
protocol_1._read_handshake_init_response()
|
protocol_1._read_handshake_init_response()
|
||||||
|
|
||||||
|
protocol_2._send_handshake_init_request()
|
||||||
|
|
||||||
|
with pytest.raises(Exception) as e:
|
||||||
|
protocol_2._read_ack()
|
||||||
|
assert e.value.args[0] == "Received ThpError: TRANSPORT BUSY"
|
||||||
|
|
||||||
time.sleep(0.2) # To pass LOCK_TIME
|
time.sleep(0.2) # To pass LOCK_TIME
|
||||||
|
protocol_2._init_noise()
|
||||||
protocol_2._send_handshake_init_request()
|
protocol_2._send_handshake_init_request()
|
||||||
protocol_2._read_ack()
|
protocol_2._read_ack()
|
||||||
# protocol_2._read_handshake_init_response()
|
protocol_2._read_handshake_init_response()
|
||||||
|
|
||||||
|
protocol_2._send_handshake_completion_request()
|
||||||
|
protocol_2._read_ack()
|
||||||
|
protocol_2._read_handshake_completion_response()
|
||||||
|
|
||||||
|
protocol_2._do_pairing(helper_debug=client.debug)
|
||||||
|
|
||||||
|
protocol_1._send_handshake_completion_request()
|
||||||
|
protocol_1._read_ack()
|
||||||
|
|
||||||
|
with pytest.raises(Exception) as e:
|
||||||
|
protocol_1._read_handshake_completion_response()
|
||||||
|
assert e.value.args[0] == "Received ThpError: UNALLOCATED CHANNEL"
|
||||||
|
|
||||||
|
# TODO - test ACK fallback, test standard encrypted message fallback
|
||||||
|
Loading…
Reference in New Issue
Block a user