mirror of
https://github.com/trezor/trezor-firmware.git
synced 2024-12-04 13:38:28 +00:00
fix(python): test_session_id_and_passphrase device test
[no changelog]
This commit is contained in:
parent
1609fa990d
commit
8cd9b03468
@ -21,6 +21,7 @@ import pytest
|
|||||||
from trezorlib import device, exceptions, messages
|
from trezorlib import device, exceptions, messages
|
||||||
from trezorlib.debuglink import LayoutType
|
from trezorlib.debuglink import LayoutType
|
||||||
from trezorlib.debuglink import TrezorClientDebugLink as Client
|
from trezorlib.debuglink import TrezorClientDebugLink as Client
|
||||||
|
from trezorlib.debuglink import SessionDebugWrapper as Session
|
||||||
from trezorlib.exceptions import TrezorFailure
|
from trezorlib.exceptions import TrezorFailure
|
||||||
from trezorlib.messages import FailureType, SafetyCheckLevel
|
from trezorlib.messages import FailureType, SafetyCheckLevel
|
||||||
from trezorlib.tools import parse_path
|
from trezorlib.tools import parse_path
|
||||||
@ -49,19 +50,9 @@ XPUB_REQUEST = messages.GetPublicKey(address_n=ADDRESS_N, coin_name="Bitcoin")
|
|||||||
SESSIONS_STORED = 10
|
SESSIONS_STORED = 10
|
||||||
|
|
||||||
|
|
||||||
def _init_session(client: Client, session_id=None, derive_cardano=False):
|
def _get_xpub(session: Session, expected_passphrase_req: bool = False):
|
||||||
"""Call Initialize, check and return the session ID."""
|
|
||||||
response = client.call(
|
|
||||||
messages.Initialize(session_id=session_id, derive_cardano=derive_cardano)
|
|
||||||
)
|
|
||||||
assert isinstance(response, messages.Features)
|
|
||||||
assert len(response.session_id) == 32
|
|
||||||
return response.session_id
|
|
||||||
|
|
||||||
|
|
||||||
def _get_xpub(client: Client, passphrase=None):
|
|
||||||
"""Get XPUB and check that the appropriate passphrase flow has happened."""
|
"""Get XPUB and check that the appropriate passphrase flow has happened."""
|
||||||
if passphrase is not None:
|
if expected_passphrase_req:
|
||||||
expected_responses = [
|
expected_responses = [
|
||||||
messages.PassphraseRequest,
|
messages.PassphraseRequest,
|
||||||
messages.ButtonRequest,
|
messages.ButtonRequest,
|
||||||
@ -71,110 +62,114 @@ def _get_xpub(client: Client, passphrase=None):
|
|||||||
else:
|
else:
|
||||||
expected_responses = [messages.PublicKey]
|
expected_responses = [messages.PublicKey]
|
||||||
|
|
||||||
with client:
|
with session:
|
||||||
client.use_passphrase(passphrase or "")
|
session.set_expected_responses(expected_responses)
|
||||||
client.set_expected_responses(expected_responses)
|
result = session.call(XPUB_REQUEST)
|
||||||
result = client.call(XPUB_REQUEST)
|
|
||||||
return result.xpub
|
return result.xpub
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.setup_client(passphrase=True)
|
@pytest.mark.setup_client(passphrase=True)
|
||||||
def test_session_with_passphrase(client: Client):
|
def test_session_with_passphrase(client: Client):
|
||||||
# Let's start the communication by calling Initialize.
|
|
||||||
session_id = _init_session(client)
|
|
||||||
|
|
||||||
|
session = Session(client.get_session(passphrase="A"))
|
||||||
|
session_id = session.id
|
||||||
# GetPublicKey requires passphrase and since it is not cached,
|
# GetPublicKey requires passphrase and since it is not cached,
|
||||||
# Trezor will prompt for it.
|
# Trezor will prompt for it.
|
||||||
assert _get_xpub(client, passphrase="A") == XPUB_PASSPHRASES["A"]
|
assert _get_xpub(session, expected_passphrase_req=True) == XPUB_PASSPHRASES["A"]
|
||||||
|
|
||||||
# Call Initialize again, this time with the received session id and then call
|
# Call Initialize again, this time with the received session id and then call
|
||||||
# GetPublicKey. The passphrase should be cached now so Trezor must
|
# GetPublicKey. The passphrase should be cached now so Trezor must
|
||||||
# not ask for it again, whilst returning the same xpub.
|
# not ask for it again, whilst returning the same xpub.
|
||||||
new_session_id = _init_session(client, session_id=session_id)
|
session2 = Session(client.resume_session(session))
|
||||||
assert new_session_id == session_id
|
assert session2.id == session_id
|
||||||
assert _get_xpub(client, passphrase=None) == XPUB_PASSPHRASES["A"]
|
assert _get_xpub(session2) == XPUB_PASSPHRASES["A"]
|
||||||
|
|
||||||
# If we set session id in Initialize to None, the cache will be cleared
|
# If we set session id in Initialize to None, the cache will be cleared
|
||||||
# and Trezor will ask for the passphrase again.
|
# and Trezor will ask for the passphrase again.
|
||||||
new_session_id = _init_session(client)
|
session3 = Session(client.get_session(passphrase="A"))
|
||||||
assert new_session_id != session_id
|
assert session3 != session_id
|
||||||
assert _get_xpub(client, passphrase="A") == XPUB_PASSPHRASES["A"]
|
assert _get_xpub(session3, expected_passphrase_req=True) == XPUB_PASSPHRASES["A"]
|
||||||
|
|
||||||
# Unknown session id is the same as setting it to None.
|
# Unknown session id is the same as setting it to None.
|
||||||
_init_session(client, session_id=b"X" * 32)
|
# _init_session(client, session_id=b"X" * 32)
|
||||||
assert _get_xpub(client, passphrase="A") == XPUB_PASSPHRASES["A"]
|
# assert _get_xpub(passphrase="A") == XPUB_PASSPHRASES["A"]
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.setup_client(passphrase=True)
|
@pytest.mark.setup_client(passphrase=True)
|
||||||
def test_multiple_sessions(client: Client):
|
def test_multiple_sessions(client: Client):
|
||||||
# start SESSIONS_STORED sessions
|
# start SESSIONS_STORED sessions
|
||||||
session_ids = []
|
session_ids = []
|
||||||
|
sessions = []
|
||||||
for _ in range(SESSIONS_STORED):
|
for _ in range(SESSIONS_STORED):
|
||||||
session_ids.append(_init_session(client))
|
session = client.get_session()
|
||||||
|
sessions.append(session)
|
||||||
|
session_ids.append(session.id)
|
||||||
|
|
||||||
# Resume each session
|
# Resume each session
|
||||||
for session_id in session_ids:
|
for i in range(SESSIONS_STORED):
|
||||||
new_session_id = _init_session(client, session_id)
|
resumed_session = client.resume_session(sessions[i])
|
||||||
assert session_id == new_session_id
|
assert session_ids[i] == resumed_session.id
|
||||||
|
|
||||||
# Creating a new session replaces the least-recently-used session
|
# Creating a new session replaces the least-recently-used session
|
||||||
_init_session(client)
|
client.get_session()
|
||||||
|
|
||||||
# Resuming session 1 through SESSIONS_STORED will still work
|
# Resuming session 1 through SESSIONS_STORED will still work
|
||||||
for session_id in session_ids[1:]:
|
for i in range(1, SESSIONS_STORED):
|
||||||
new_session_id = _init_session(client, session_id)
|
resumed_session = client.resume_session(sessions[i])
|
||||||
assert session_id == new_session_id
|
assert session_ids[i] == resumed_session.id
|
||||||
|
|
||||||
# Resuming session 0 will not work
|
# Resuming session 0 will not work
|
||||||
new_session_id = _init_session(client, session_ids[0])
|
resumed_session = client.resume_session(sessions[0])
|
||||||
assert new_session_id != session_ids[0]
|
assert session_ids[0] != resumed_session.id
|
||||||
|
|
||||||
# New session bumped out the least-recently-used anonymous session.
|
# New session bumped out the least-recently-used anonymous session.
|
||||||
# Resuming session 1 through SESSIONS_STORED will still work
|
# Resuming session 1 through SESSIONS_STORED will still work
|
||||||
for session_id in session_ids[1:]:
|
for i in range(1, SESSIONS_STORED):
|
||||||
new_session_id = _init_session(client, session_id)
|
resumed_session = client.resume_session(sessions[i])
|
||||||
assert session_id == new_session_id
|
assert session_ids[i] == resumed_session.id
|
||||||
|
|
||||||
# Creating a new session replaces session_ids[0] again
|
# Creating a new session replaces session_ids[0] again
|
||||||
_init_session(client)
|
client.get_session()
|
||||||
|
|
||||||
# Resuming all sessions one by one will in turn bump out the previous session.
|
# Resuming all sessions one by one will in turn bump out the previous session.
|
||||||
for session_id in session_ids:
|
for i in range(SESSIONS_STORED):
|
||||||
new_session_id = _init_session(client, session_id)
|
resumed_session = client.resume_session(sessions[i])
|
||||||
assert session_id != new_session_id
|
assert session_ids[i] != resumed_session.id
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.setup_client(passphrase=True)
|
@pytest.mark.setup_client(passphrase=True)
|
||||||
def test_multiple_passphrases(client: Client):
|
def test_multiple_passphrases(client: Client):
|
||||||
# start a session
|
# start a session
|
||||||
session_a = _init_session(client)
|
session_a = Session(client.get_session(passphrase="A"))
|
||||||
assert _get_xpub(client, passphrase="A") == XPUB_PASSPHRASES["A"]
|
session_a_id = session_a.id
|
||||||
|
assert _get_xpub(session_a, expected_passphrase_req=True) == XPUB_PASSPHRASES["A"]
|
||||||
# start it again wit the same session id
|
# start it again wit the same session id
|
||||||
new_session_id = _init_session(client, session_id=session_a)
|
session_a_resumed = Session(client.resume_session(session_a))
|
||||||
# session is the same
|
# session is the same
|
||||||
assert new_session_id == session_a
|
assert session_a_resumed.id == session_a_id
|
||||||
# passphrase is not prompted
|
# passphrase is not prompted
|
||||||
assert _get_xpub(client, passphrase=None) == XPUB_PASSPHRASES["A"]
|
assert _get_xpub(session_a_resumed) == XPUB_PASSPHRASES["A"]
|
||||||
|
|
||||||
# start a second session
|
# start a second session
|
||||||
session_b = _init_session(client)
|
session_b = Session(client.get_session(passphrase="B"))
|
||||||
|
session_b_id = session_b.id
|
||||||
# new session -> new session id and passphrase prompt
|
# new session -> new session id and passphrase prompt
|
||||||
assert _get_xpub(client, passphrase="B") == XPUB_PASSPHRASES["B"]
|
assert _get_xpub(session_b, expected_passphrase_req=True) == XPUB_PASSPHRASES["B"]
|
||||||
|
|
||||||
# provide the same session id -> must not ask for passphrase again.
|
# provide the same session id -> must not ask for passphrase again.
|
||||||
new_session_id = _init_session(client, session_id=session_b)
|
session_b_resumed = Session(client.resume_session(session_b))
|
||||||
assert new_session_id == session_b
|
assert session_b_resumed.id == session_b_id
|
||||||
assert _get_xpub(client, passphrase=None) == XPUB_PASSPHRASES["B"]
|
assert _get_xpub(session_b_resumed) == XPUB_PASSPHRASES["B"]
|
||||||
|
|
||||||
# provide the first session id -> must not ask for passphrase again and return the same result.
|
# provide the first session id -> must not ask for passphrase again and return the same result.
|
||||||
new_session_id = _init_session(client, session_id=session_a)
|
session_a_resumed_again = Session(client.resume_session(session_a))
|
||||||
assert new_session_id == session_a
|
assert session_a_resumed_again.id == session_a_id
|
||||||
assert _get_xpub(client, passphrase=None) == XPUB_PASSPHRASES["A"]
|
assert _get_xpub(session_a_resumed_again) == XPUB_PASSPHRASES["A"]
|
||||||
|
|
||||||
# provide the second session id -> must not ask for passphrase again and return the same result.
|
# provide the second session id -> must not ask for passphrase again and return the same result.
|
||||||
new_session_id = _init_session(client, session_id=session_b)
|
session_b_resumed_again = Session(client.resume_session(session_b))
|
||||||
assert new_session_id == session_b
|
assert session_b_resumed_again.id == session_b_id
|
||||||
assert _get_xpub(client, passphrase=None) == XPUB_PASSPHRASES["B"]
|
assert _get_xpub(session_b_resumed_again) == XPUB_PASSPHRASES["B"]
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.slow
|
@pytest.mark.slow
|
||||||
@ -185,12 +180,20 @@ def test_max_sessions_with_passphrases(client: Client):
|
|||||||
|
|
||||||
# start as many sessions as the limit is
|
# start as many sessions as the limit is
|
||||||
session_ids = {}
|
session_ids = {}
|
||||||
|
sessions = {}
|
||||||
for passphrase, xpub in XPUB_PASSPHRASES.items():
|
for passphrase, xpub in XPUB_PASSPHRASES.items():
|
||||||
session_id = _init_session(client)
|
session = Session(client.get_session(passphrase=passphrase))
|
||||||
assert session_id not in session_ids.values()
|
assert session.id not in session_ids.values()
|
||||||
session_ids[passphrase] = session_id
|
session_ids[passphrase] = session.id
|
||||||
assert _get_xpub(client, passphrase=passphrase) == xpub
|
sessions[passphrase] = session
|
||||||
|
assert _get_xpub(session, expected_passphrase_req=True) == xpub
|
||||||
|
|
||||||
|
for passphrase, xpub in XPUB_PASSPHRASES.items():
|
||||||
|
session = Session(client.get_session(passphrase=passphrase))
|
||||||
|
assert session.id not in session_ids.values()
|
||||||
|
session_ids[passphrase] = session.id
|
||||||
|
sessions[passphrase] = session
|
||||||
|
assert _get_xpub(session, expected_passphrase_req=True) == xpub
|
||||||
# passphrase is not prompted for the started the sessions, regardless the order
|
# passphrase is not prompted for the started the sessions, regardless the order
|
||||||
# let's try 20 different orderings
|
# let's try 20 different orderings
|
||||||
passphrases = list(XPUB_PASSPHRASES.keys())
|
passphrases = list(XPUB_PASSPHRASES.keys())
|
||||||
@ -198,85 +201,88 @@ def test_max_sessions_with_passphrases(client: Client):
|
|||||||
for _ in range(20):
|
for _ in range(20):
|
||||||
random.shuffle(shuffling)
|
random.shuffle(shuffling)
|
||||||
for passphrase in shuffling:
|
for passphrase in shuffling:
|
||||||
session_id = _init_session(client, session_id=session_ids[passphrase])
|
resumed_session = Session(client.resume_session(sessions[passphrase]))
|
||||||
assert session_id == session_ids[passphrase]
|
assert resumed_session.id == session_ids[passphrase]
|
||||||
assert _get_xpub(client, passphrase=None) == XPUB_PASSPHRASES[passphrase]
|
assert _get_xpub(resumed_session) == XPUB_PASSPHRASES[passphrase]
|
||||||
|
|
||||||
# make sure the usage order is the reverse of the creation order
|
# make sure the usage order is the reverse of the creation order
|
||||||
for passphrase in reversed(passphrases):
|
for passphrase in reversed(passphrases):
|
||||||
session_id = _init_session(client, session_id=session_ids[passphrase])
|
resumed_session = Session(client.resume_session(sessions[passphrase]))
|
||||||
assert session_id == session_ids[passphrase]
|
assert resumed_session.id == session_ids[passphrase]
|
||||||
assert _get_xpub(client, passphrase=None) == XPUB_PASSPHRASES[passphrase]
|
assert _get_xpub(resumed_session) == XPUB_PASSPHRASES[passphrase]
|
||||||
|
|
||||||
# creating one more session will exceed the limit
|
# creating one more session will exceed the limit
|
||||||
_init_session(client)
|
new_session = Session(client.get_session(passphrase="XX"))
|
||||||
# new session asks for passphrase
|
# new session asks for passphrase
|
||||||
_get_xpub(client, passphrase="XX")
|
_get_xpub(new_session, expected_passphrase_req=True)
|
||||||
|
|
||||||
# restoring the sessions in reverse will evict the next-up session
|
# restoring the sessions in reverse will evict the next-up session
|
||||||
for passphrase in reversed(passphrases):
|
for passphrase in reversed(passphrases):
|
||||||
_init_session(client, session_id=session_ids[passphrase])
|
resumed_session = Session(client.resume_session(sessions[passphrase]))
|
||||||
_get_xpub(client, passphrase="whatever") # passphrase is prompted
|
_get_xpub(
|
||||||
|
resumed_session, expected_passphrase_req=True
|
||||||
|
) # passphrase is prompted
|
||||||
|
|
||||||
|
|
||||||
def test_session_enable_passphrase(client: Client):
|
def test_session_enable_passphrase(client: Client):
|
||||||
# Let's start the communication by calling Initialize.
|
# Let's start the communication by calling Initialize.
|
||||||
session_id = _init_session(client)
|
session = Session(client.get_session(passphrase=""))
|
||||||
|
|
||||||
# Trezor will not prompt for passphrase because it is turned off.
|
# Trezor will not prompt for passphrase because it is turned off.
|
||||||
assert _get_xpub(client, passphrase=None) == XPUB_PASSPHRASE_NONE
|
assert _get_xpub(session, expected_passphrase_req=False) == XPUB_PASSPHRASE_NONE
|
||||||
|
|
||||||
# Turn on passphrase.
|
# Turn on passphrase.
|
||||||
# Emit the call explicitly to avoid ClearSession done by the library function
|
# Emit the call explicitly to avoid ClearSession done by the library function
|
||||||
response = client.call(messages.ApplySettings(use_passphrase=True))
|
response = session.call(messages.ApplySettings(use_passphrase=True))
|
||||||
assert isinstance(response, messages.Success)
|
assert isinstance(response, messages.Success)
|
||||||
|
|
||||||
# The session id is unchanged, therefore we do not prompt for the passphrase.
|
# The session id is unchanged, therefore we do not prompt for the passphrase.
|
||||||
new_session_id = _init_session(client, session_id=session_id)
|
session_id = session.id
|
||||||
assert session_id == new_session_id
|
resumed_session = Session(client.resume_session(session))
|
||||||
assert _get_xpub(client, passphrase=None) == XPUB_PASSPHRASE_NONE
|
assert session_id == resumed_session.id
|
||||||
|
assert _get_xpub(resumed_session) == XPUB_PASSPHRASE_NONE
|
||||||
|
|
||||||
# We clear the session id now, so the passphrase should be asked.
|
# We clear the session id now, so the passphrase should be asked.
|
||||||
new_session_id = _init_session(client)
|
new_session = Session(client.get_session(passphrase="A"))
|
||||||
assert session_id != new_session_id
|
assert session_id != new_session.id
|
||||||
assert _get_xpub(client, passphrase="A") == XPUB_PASSPHRASES["A"]
|
assert _get_xpub(new_session, expected_passphrase_req=True) == XPUB_PASSPHRASES["A"]
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.models("core")
|
@pytest.mark.models("core")
|
||||||
@pytest.mark.setup_client(passphrase=True)
|
@pytest.mark.setup_client(passphrase=True)
|
||||||
def test_passphrase_on_device(client: Client):
|
def test_passphrase_on_device(client: Client):
|
||||||
_init_session(client)
|
# _init_session(client)
|
||||||
|
session = client.get_session(passphrase="A")
|
||||||
# try to get xpub with passphrase on host:
|
# try to get xpub with passphrase on host:
|
||||||
response = client.call_raw(XPUB_REQUEST)
|
response = session.call_raw(XPUB_REQUEST)
|
||||||
assert isinstance(response, messages.PassphraseRequest)
|
assert isinstance(response, messages.PassphraseRequest)
|
||||||
# using `client.call` to auto-skip subsequent ButtonRequests for "show passphrase"
|
# using `client.call` to auto-skip subsequent ButtonRequests for "show passphrase"
|
||||||
response = client.call(messages.PassphraseAck(passphrase="A", on_device=False))
|
response = session.call(messages.PassphraseAck(passphrase="A", on_device=False))
|
||||||
|
|
||||||
assert isinstance(response, messages.PublicKey)
|
assert isinstance(response, messages.PublicKey)
|
||||||
assert response.xpub == XPUB_PASSPHRASES["A"]
|
assert response.xpub == XPUB_PASSPHRASES["A"]
|
||||||
|
|
||||||
# try to get xpub again, passphrase should be cached
|
# try to get xpub again, passphrase should be cached
|
||||||
response = client.call_raw(XPUB_REQUEST)
|
response = session.call_raw(XPUB_REQUEST)
|
||||||
assert isinstance(response, messages.PublicKey)
|
assert isinstance(response, messages.PublicKey)
|
||||||
assert response.xpub == XPUB_PASSPHRASES["A"]
|
assert response.xpub == XPUB_PASSPHRASES["A"]
|
||||||
|
|
||||||
# make a new session
|
# make a new session
|
||||||
_init_session(client)
|
session2 = session.client.get_session(passphrase="A")
|
||||||
|
|
||||||
# try to get xpub with passphrase on device:
|
# try to get xpub with passphrase on device:
|
||||||
response = client.call_raw(XPUB_REQUEST)
|
response = session2.call_raw(XPUB_REQUEST)
|
||||||
assert isinstance(response, messages.PassphraseRequest)
|
assert isinstance(response, messages.PassphraseRequest)
|
||||||
response = client.call_raw(messages.PassphraseAck(on_device=True))
|
response = session2.call_raw(messages.PassphraseAck(on_device=True))
|
||||||
# no "show passphrase" here
|
# no "show passphrase" here
|
||||||
assert isinstance(response, messages.ButtonRequest)
|
assert isinstance(response, messages.ButtonRequest)
|
||||||
client.debug.input("A")
|
client.debug.input("A")
|
||||||
response = client.call_raw(messages.ButtonAck())
|
response = session2.call_raw(messages.ButtonAck())
|
||||||
assert isinstance(response, messages.PublicKey)
|
assert isinstance(response, messages.PublicKey)
|
||||||
assert response.xpub == XPUB_PASSPHRASES["A"]
|
assert response.xpub == XPUB_PASSPHRASES["A"]
|
||||||
|
|
||||||
# try to get xpub again, passphrase should be cached
|
# try to get xpub again, passphrase should be cached
|
||||||
response = client.call_raw(XPUB_REQUEST)
|
response = session2.call_raw(XPUB_REQUEST)
|
||||||
assert isinstance(response, messages.PublicKey)
|
assert isinstance(response, messages.PublicKey)
|
||||||
assert response.xpub == XPUB_PASSPHRASES["A"]
|
assert response.xpub == XPUB_PASSPHRASES["A"]
|
||||||
|
|
||||||
@ -285,32 +291,33 @@ def test_passphrase_on_device(client: Client):
|
|||||||
@pytest.mark.setup_client(passphrase=True)
|
@pytest.mark.setup_client(passphrase=True)
|
||||||
def test_passphrase_always_on_device(client: Client):
|
def test_passphrase_always_on_device(client: Client):
|
||||||
# Let's start the communication by calling Initialize.
|
# Let's start the communication by calling Initialize.
|
||||||
session_id = _init_session(client)
|
session = client.get_session()
|
||||||
|
# session_id = _init_session(client)
|
||||||
|
|
||||||
# Force passphrase entry on Trezor.
|
# Force passphrase entry on Trezor.
|
||||||
response = client.call(messages.ApplySettings(passphrase_always_on_device=True))
|
response = session.call(messages.ApplySettings(passphrase_always_on_device=True))
|
||||||
assert isinstance(response, messages.Success)
|
assert isinstance(response, messages.Success)
|
||||||
|
|
||||||
# Since we enabled the always_on_device setting, Trezor will send ButtonRequests and ask for it on the device.
|
# Since we enabled the always_on_device setting, Trezor will send ButtonRequests and ask for it on the device.
|
||||||
response = client.call_raw(XPUB_REQUEST)
|
response = session.call_raw(XPUB_REQUEST)
|
||||||
assert isinstance(response, messages.ButtonRequest)
|
assert isinstance(response, messages.ButtonRequest)
|
||||||
client.debug.input("") # Input empty passphrase.
|
client.debug.input("") # Input empty passphrase.
|
||||||
response = client.call_raw(messages.ButtonAck())
|
response = session.call_raw(messages.ButtonAck())
|
||||||
assert isinstance(response, messages.PublicKey)
|
assert isinstance(response, messages.PublicKey)
|
||||||
assert response.xpub == XPUB_PASSPHRASE_NONE
|
assert response.xpub == XPUB_PASSPHRASE_NONE
|
||||||
|
|
||||||
# Passphrase will not be prompted. The session id stays the same and the passphrase is cached.
|
# Passphrase will not be prompted. The session id stays the same and the passphrase is cached.
|
||||||
_init_session(client, session_id=session_id)
|
resumed_session = client.resume_session(session)
|
||||||
response = client.call_raw(XPUB_REQUEST)
|
response = resumed_session.call_raw(XPUB_REQUEST)
|
||||||
assert isinstance(response, messages.PublicKey)
|
assert isinstance(response, messages.PublicKey)
|
||||||
assert response.xpub == XPUB_PASSPHRASE_NONE
|
assert response.xpub == XPUB_PASSPHRASE_NONE
|
||||||
|
|
||||||
# In case we want to add a new passphrase we need to send session_id = None.
|
# In case we want to add a new passphrase we need to send session_id = None.
|
||||||
_init_session(client)
|
new_session = client.get_session(passphrase="A")
|
||||||
response = client.call_raw(XPUB_REQUEST)
|
response = new_session.call_raw(XPUB_REQUEST)
|
||||||
assert isinstance(response, messages.ButtonRequest)
|
assert isinstance(response, messages.ButtonRequest)
|
||||||
client.debug.input("A") # Input non-empty passphrase.
|
client.debug.input("A") # Input non-empty passphrase.
|
||||||
response = client.call_raw(messages.ButtonAck())
|
response = new_session.call_raw(messages.ButtonAck())
|
||||||
assert isinstance(response, messages.PublicKey)
|
assert isinstance(response, messages.PublicKey)
|
||||||
assert response.xpub == XPUB_PASSPHRASES["A"]
|
assert response.xpub == XPUB_PASSPHRASES["A"]
|
||||||
|
|
||||||
@ -332,25 +339,27 @@ def test_passphrase_on_device_not_possible_on_t1(client: Client):
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.setup_client(passphrase=True)
|
@pytest.mark.setup_client(passphrase=True)
|
||||||
def test_passphrase_ack_mismatch(client: Client):
|
def test_passphrase_ack_mismatch(session: Session):
|
||||||
response = client.call_raw(XPUB_REQUEST)
|
response = session.call_raw(XPUB_REQUEST)
|
||||||
assert isinstance(response, messages.PassphraseRequest)
|
assert isinstance(response, messages.PassphraseRequest)
|
||||||
response = client.call_raw(messages.PassphraseAck(passphrase="A", on_device=True))
|
response = session.call_raw(messages.PassphraseAck(passphrase="A", on_device=True))
|
||||||
assert isinstance(response, messages.Failure)
|
assert isinstance(response, messages.Failure)
|
||||||
assert response.code == FailureType.DataError
|
assert response.code == FailureType.DataError
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.setup_client(passphrase="")
|
@pytest.mark.setup_client(passphrase="")
|
||||||
def test_passphrase_missing(client: Client):
|
def test_passphrase_missing(session: Session):
|
||||||
response = client.call_raw(XPUB_REQUEST)
|
response = session.call_raw(XPUB_REQUEST)
|
||||||
assert isinstance(response, messages.PassphraseRequest)
|
assert isinstance(response, messages.PassphraseRequest)
|
||||||
response = client.call_raw(messages.PassphraseAck(passphrase=None))
|
response = session.call_raw(messages.PassphraseAck(passphrase=None))
|
||||||
assert isinstance(response, messages.Failure)
|
assert isinstance(response, messages.Failure)
|
||||||
assert response.code == FailureType.DataError
|
assert response.code == FailureType.DataError
|
||||||
|
|
||||||
response = client.call_raw(XPUB_REQUEST)
|
response = session.call_raw(XPUB_REQUEST)
|
||||||
assert isinstance(response, messages.PassphraseRequest)
|
assert isinstance(response, messages.PassphraseRequest)
|
||||||
response = client.call_raw(messages.PassphraseAck(passphrase=None, on_device=False))
|
response = session.call_raw(
|
||||||
|
messages.PassphraseAck(passphrase=None, on_device=False)
|
||||||
|
)
|
||||||
assert isinstance(response, messages.Failure)
|
assert isinstance(response, messages.Failure)
|
||||||
assert response.code == FailureType.DataError
|
assert response.code == FailureType.DataError
|
||||||
|
|
||||||
@ -358,11 +367,11 @@ def test_passphrase_missing(client: Client):
|
|||||||
@pytest.mark.setup_client(passphrase=True)
|
@pytest.mark.setup_client(passphrase=True)
|
||||||
def test_passphrase_length(client: Client):
|
def test_passphrase_length(client: Client):
|
||||||
def call(passphrase: str, expected_result: bool):
|
def call(passphrase: str, expected_result: bool):
|
||||||
_init_session(client)
|
session = client.get_session(passphrase=passphrase)
|
||||||
response = client.call_raw(XPUB_REQUEST)
|
response = session.call_raw(XPUB_REQUEST)
|
||||||
assert isinstance(response, messages.PassphraseRequest)
|
assert isinstance(response, messages.PassphraseRequest)
|
||||||
try:
|
try:
|
||||||
response = client.call(messages.PassphraseAck(passphrase=passphrase))
|
response = session.call(messages.PassphraseAck(passphrase=passphrase))
|
||||||
assert expected_result is True, "Call should have failed"
|
assert expected_result is True, "Call should have failed"
|
||||||
assert isinstance(response, messages.PublicKey)
|
assert isinstance(response, messages.PublicKey)
|
||||||
except exceptions.TrezorFailure as e:
|
except exceptions.TrezorFailure as e:
|
||||||
@ -383,17 +392,18 @@ def test_passphrase_length(client: Client):
|
|||||||
@pytest.mark.setup_client(passphrase=True)
|
@pytest.mark.setup_client(passphrase=True)
|
||||||
def test_hide_passphrase_from_host(client: Client):
|
def test_hide_passphrase_from_host(client: Client):
|
||||||
# Without safety checks, turning it on fails
|
# Without safety checks, turning it on fails
|
||||||
|
session = client.get_management_session()
|
||||||
with pytest.raises(TrezorFailure, match="Safety checks are strict"), client:
|
with pytest.raises(TrezorFailure, match="Safety checks are strict"), client:
|
||||||
device.apply_settings(client, hide_passphrase_from_host=True)
|
device.apply_settings(session, hide_passphrase_from_host=True)
|
||||||
|
|
||||||
device.apply_settings(client, safety_checks=SafetyCheckLevel.PromptTemporarily)
|
device.apply_settings(session, safety_checks=SafetyCheckLevel.PromptTemporarily)
|
||||||
|
|
||||||
# Turning it on
|
# Turning it on
|
||||||
device.apply_settings(client, hide_passphrase_from_host=True)
|
device.apply_settings(session, hide_passphrase_from_host=True)
|
||||||
|
|
||||||
passphrase = "abc"
|
passphrase = "abc"
|
||||||
|
session = Session(client.get_session(passphrase=passphrase))
|
||||||
with client:
|
with client, session:
|
||||||
|
|
||||||
def input_flow():
|
def input_flow():
|
||||||
yield
|
yield
|
||||||
@ -410,7 +420,7 @@ def test_hide_passphrase_from_host(client: Client):
|
|||||||
|
|
||||||
client.watch_layout()
|
client.watch_layout()
|
||||||
client.set_input_flow(input_flow)
|
client.set_input_flow(input_flow)
|
||||||
client.set_expected_responses(
|
session.set_expected_responses(
|
||||||
[
|
[
|
||||||
messages.PassphraseRequest,
|
messages.PassphraseRequest,
|
||||||
messages.ButtonRequest,
|
messages.ButtonRequest,
|
||||||
@ -418,17 +428,17 @@ def test_hide_passphrase_from_host(client: Client):
|
|||||||
]
|
]
|
||||||
)
|
)
|
||||||
client.use_passphrase(passphrase)
|
client.use_passphrase(passphrase)
|
||||||
result = client.call(XPUB_REQUEST)
|
result = session.call(XPUB_REQUEST)
|
||||||
assert isinstance(result, messages.PublicKey)
|
assert isinstance(result, messages.PublicKey)
|
||||||
xpub_hidden_passphrase = result.xpub
|
xpub_hidden_passphrase = result.xpub
|
||||||
|
|
||||||
# Turning it off
|
# Turning it off
|
||||||
device.apply_settings(client, hide_passphrase_from_host=False)
|
device.apply_settings(session, hide_passphrase_from_host=False)
|
||||||
|
|
||||||
# Starting new session, otherwise the passphrase would be cached
|
# Starting new session, otherwise the passphrase would be cached
|
||||||
_init_session(client)
|
session = Session(client.get_session(passphrase=passphrase))
|
||||||
|
|
||||||
with client:
|
with client, session:
|
||||||
|
|
||||||
def input_flow():
|
def input_flow():
|
||||||
yield
|
yield
|
||||||
@ -445,7 +455,7 @@ def test_hide_passphrase_from_host(client: Client):
|
|||||||
|
|
||||||
client.watch_layout()
|
client.watch_layout()
|
||||||
client.set_input_flow(input_flow)
|
client.set_input_flow(input_flow)
|
||||||
client.set_expected_responses(
|
session.set_expected_responses(
|
||||||
[
|
[
|
||||||
messages.PassphraseRequest,
|
messages.PassphraseRequest,
|
||||||
messages.ButtonRequest,
|
messages.ButtonRequest,
|
||||||
@ -454,22 +464,22 @@ def test_hide_passphrase_from_host(client: Client):
|
|||||||
]
|
]
|
||||||
)
|
)
|
||||||
client.use_passphrase(passphrase)
|
client.use_passphrase(passphrase)
|
||||||
result = client.call(XPUB_REQUEST)
|
result = session.call(XPUB_REQUEST)
|
||||||
assert isinstance(result, messages.PublicKey)
|
assert isinstance(result, messages.PublicKey)
|
||||||
xpub_shown_passphrase = result.xpub
|
xpub_shown_passphrase = result.xpub
|
||||||
|
|
||||||
assert xpub_hidden_passphrase == xpub_shown_passphrase
|
assert xpub_hidden_passphrase == xpub_shown_passphrase
|
||||||
|
|
||||||
|
|
||||||
def _get_xpub_cardano(client: Client, passphrase):
|
def _get_xpub_cardano(session: Session, expected_passphrase_req: bool = False):
|
||||||
msg = messages.CardanoGetPublicKey(
|
msg = messages.CardanoGetPublicKey(
|
||||||
address_n=parse_path("m/44h/1815h/0h/0/0"),
|
address_n=parse_path("m/44h/1815h/0h/0/0"),
|
||||||
derivation_type=messages.CardanoDerivationType.ICARUS,
|
derivation_type=messages.CardanoDerivationType.ICARUS,
|
||||||
)
|
)
|
||||||
response = client.call_raw(msg)
|
response = session.call_raw(msg)
|
||||||
if passphrase is not None:
|
if expected_passphrase_req:
|
||||||
assert isinstance(response, messages.PassphraseRequest)
|
assert isinstance(response, messages.PassphraseRequest)
|
||||||
response = client.call(messages.PassphraseAck(passphrase=passphrase))
|
response = session.call(messages.PassphraseAck(passphrase=session.passphrase))
|
||||||
assert isinstance(response, messages.CardanoPublicKey)
|
assert isinstance(response, messages.CardanoPublicKey)
|
||||||
return response.xpub
|
return response.xpub
|
||||||
|
|
||||||
@ -482,31 +492,37 @@ def test_cardano_passphrase(client: Client):
|
|||||||
# of the passphrase.
|
# of the passphrase.
|
||||||
# Historically, Cardano calls would ask for passphrase again. Now, they should not.
|
# Historically, Cardano calls would ask for passphrase again. Now, they should not.
|
||||||
|
|
||||||
session_id = _init_session(client, derive_cardano=True)
|
# session_id = _init_session(client, derive_cardano=True)
|
||||||
|
|
||||||
# GetPublicKey requires passphrase and since it is not cached,
|
# GetPublicKey requires passphrase and since it is not cached,
|
||||||
# Trezor will prompt for it.
|
# Trezor will prompt for it.
|
||||||
assert _get_xpub(client, passphrase="B") == XPUB_PASSPHRASES["B"]
|
session = Session(client.get_session(passphrase="B", derive_cardano=True))
|
||||||
|
assert _get_xpub(session, expected_passphrase_req=True) == XPUB_PASSPHRASES["B"]
|
||||||
|
|
||||||
# The passphrase is now cached for non-Cardano coins.
|
# The passphrase is now cached for non-Cardano coins.
|
||||||
assert _get_xpub(client, passphrase=None) == XPUB_PASSPHRASES["B"]
|
assert _get_xpub(session) == XPUB_PASSPHRASES["B"]
|
||||||
|
|
||||||
# The passphrase should be cached for Cardano as well
|
# The passphrase should be cached for Cardano as well
|
||||||
assert _get_xpub_cardano(client, passphrase=None) == XPUB_CARDANO_PASSPHRASE_B
|
assert _get_xpub_cardano(session) == XPUB_CARDANO_PASSPHRASE_B
|
||||||
|
|
||||||
# Initialize with the session id does not destroy the state
|
# Initialize with the session id does not destroy the state
|
||||||
_init_session(client, session_id=session_id, derive_cardano=True)
|
resumed_session = Session(client.resume_session(session))
|
||||||
assert _get_xpub(client, passphrase=None) == XPUB_PASSPHRASES["B"]
|
# _init_session(client, session_id=session_id, derive_cardano=True)
|
||||||
assert _get_xpub_cardano(client, passphrase=None) == XPUB_CARDANO_PASSPHRASE_B
|
assert _get_xpub(resumed_session) == XPUB_PASSPHRASES["B"]
|
||||||
|
assert _get_xpub_cardano(resumed_session) == XPUB_CARDANO_PASSPHRASE_B
|
||||||
|
|
||||||
# New session will destroy the state
|
# New session will destroy the state
|
||||||
_init_session(client, derive_cardano=True)
|
new_session = Session(client.get_session(passphrase="A", derive_cardano=True))
|
||||||
|
# _init_session(client, derive_cardano=True)
|
||||||
|
|
||||||
# Cardano must ask for passphrase again
|
# Cardano must ask for passphrase again
|
||||||
assert _get_xpub_cardano(client, passphrase="A") == XPUB_CARDANO_PASSPHRASE_A
|
assert (
|
||||||
|
_get_xpub_cardano(new_session, expected_passphrase_req=True)
|
||||||
|
== XPUB_CARDANO_PASSPHRASE_A
|
||||||
|
)
|
||||||
|
|
||||||
# Passphrase is now cached for Cardano
|
# Passphrase is now cached for Cardano
|
||||||
assert _get_xpub_cardano(client, passphrase=None) == XPUB_CARDANO_PASSPHRASE_A
|
assert _get_xpub_cardano(new_session) == XPUB_CARDANO_PASSPHRASE_A
|
||||||
|
|
||||||
# Passphrase is cached for non-Cardano coins too
|
# Passphrase is cached for non-Cardano coins too
|
||||||
assert _get_xpub(client, passphrase=None) == XPUB_PASSPHRASES["A"]
|
assert _get_xpub(new_session) == XPUB_PASSPHRASES["A"]
|
||||||
|
Loading…
Reference in New Issue
Block a user