mirror of
https://github.com/trezor/trezor-firmware.git
synced 2024-11-22 07:28:10 +00:00
tests: make use of new emulator code in emulator tests
This commit is contained in:
parent
0496e49507
commit
25910acdd1
@ -15,23 +15,22 @@
|
|||||||
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
|
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
|
||||||
|
|
||||||
import gzip
|
import gzip
|
||||||
import os
|
|
||||||
import subprocess
|
|
||||||
import tempfile
|
import tempfile
|
||||||
import time
|
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from trezorlib.debuglink import TrezorClientDebugLink
|
from trezorlib._internal.emulator import CoreEmulator, LegacyEmulator
|
||||||
from trezorlib.transport.udp import UdpTransport
|
|
||||||
|
ROOT = Path(__file__).parent.parent.resolve()
|
||||||
|
BINDIR = ROOT / "tests" / "emulators"
|
||||||
|
|
||||||
ROOT = os.path.abspath(os.path.dirname(__file__) + "/..")
|
|
||||||
BINDIR = ROOT + "/tests/emulators"
|
|
||||||
LOCAL_BUILD_PATHS = {
|
LOCAL_BUILD_PATHS = {
|
||||||
"core": ROOT + "/core/build/unix/micropython",
|
"core": ROOT / "core" / "build" / "unix" / "micropython",
|
||||||
"legacy": ROOT + "/legacy/firmware/trezor.elf",
|
"legacy": ROOT / "legacy" / "firmware" / "trezor.elf",
|
||||||
}
|
}
|
||||||
|
|
||||||
SD_CARD_GZ = ROOT + "/tests/trezor.sdcard.gz"
|
CORE_SRC_DIR = ROOT / "core" / "src"
|
||||||
|
SD_CARD_GZ = ROOT / "core" / "trezor.sdcard.gz"
|
||||||
|
|
||||||
ENV = {"SDL_VIDEODRIVER": "dummy"}
|
ENV = {"SDL_VIDEODRIVER": "dummy"}
|
||||||
|
|
||||||
@ -44,11 +43,11 @@ def check_version(tag, version_tuple):
|
|||||||
|
|
||||||
|
|
||||||
def filename_from_tag(gen, tag):
|
def filename_from_tag(gen, tag):
|
||||||
return f"{BINDIR}/trezor-emu-{gen}-{tag}"
|
return BINDIR / f"trezor-emu-{gen}-{tag}"
|
||||||
|
|
||||||
|
|
||||||
def get_tags():
|
def get_tags():
|
||||||
files = os.listdir(BINDIR)
|
files = list(BINDIR.iterdir())
|
||||||
if not files:
|
if not files:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"No files found. Use download_emulators.sh to download emulators."
|
"No files found. Use download_emulators.sh to download emulators."
|
||||||
@ -58,7 +57,7 @@ def get_tags():
|
|||||||
for f in sorted(files):
|
for f in sorted(files):
|
||||||
try:
|
try:
|
||||||
# example: "trezor-emu-core-v2.1.1"
|
# example: "trezor-emu-core-v2.1.1"
|
||||||
_, _, gen, tag = f.split("-", maxsplit=3)
|
_, _, gen, tag = f.name.split("-", maxsplit=3)
|
||||||
result[gen].append(tag)
|
result[gen].append(tag)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
pass
|
pass
|
||||||
@ -69,116 +68,40 @@ ALL_TAGS = get_tags()
|
|||||||
|
|
||||||
|
|
||||||
class EmulatorWrapper:
|
class EmulatorWrapper:
|
||||||
def __init__(self, gen, tag=None, executable=None, storage=None):
|
def __init__(self, gen, tag=None, storage=None):
|
||||||
self.gen = gen
|
if tag is not None:
|
||||||
self.tag = tag
|
executable = filename_from_tag(gen, tag)
|
||||||
|
|
||||||
if executable is not None:
|
|
||||||
self.executable = executable
|
|
||||||
elif tag is not None:
|
|
||||||
self.executable = filename_from_tag(gen, tag)
|
|
||||||
else:
|
else:
|
||||||
self.executable = LOCAL_BUILD_PATHS[gen]
|
executable = LOCAL_BUILD_PATHS[gen]
|
||||||
|
|
||||||
if not os.path.exists(self.executable):
|
if not executable.exists():
|
||||||
raise ValueError(f"emulator executable not found: {self.executable}")
|
raise ValueError(f"emulator executable not found: {executable}")
|
||||||
|
|
||||||
self.workdir = tempfile.TemporaryDirectory()
|
self.profile_dir = tempfile.TemporaryDirectory()
|
||||||
if storage:
|
if executable == LOCAL_BUILD_PATHS["core"]:
|
||||||
open(self._storage_file(), "wb").write(storage)
|
workdir = CORE_SRC_DIR
|
||||||
|
|
||||||
with gzip.open(SD_CARD_GZ, "rb") as gz:
|
|
||||||
with open(self.workdir.name + "/trezor.sdcard", "wb") as sd:
|
|
||||||
sd.write(gz.read())
|
|
||||||
|
|
||||||
self.client = None
|
|
||||||
|
|
||||||
def _get_params_core(self):
|
|
||||||
env = ENV.copy()
|
|
||||||
args = [self.executable, "-m", "main"]
|
|
||||||
# for firmware 2.1.2 and newer
|
|
||||||
env["TREZOR_PROFILE_DIR"] = self.workdir.name
|
|
||||||
# for firmware 2.1.1 and older
|
|
||||||
env["TREZOR_PROFILE"] = self.workdir.name
|
|
||||||
|
|
||||||
if self.executable == LOCAL_BUILD_PATHS["core"]:
|
|
||||||
cwd = ROOT + "/core/src"
|
|
||||||
else:
|
else:
|
||||||
cwd = self.workdir.name
|
workdir = None
|
||||||
|
|
||||||
return env, args, cwd
|
if gen == "legacy":
|
||||||
|
self.emulator = LegacyEmulator(
|
||||||
def _get_params_legacy(self):
|
executable, self.profile_dir.name, storage=storage, headless=True,
|
||||||
env = ENV.copy()
|
)
|
||||||
args = [self.executable]
|
elif gen == "core":
|
||||||
cwd = self.workdir.name
|
with gzip.open(SD_CARD_GZ, "rb") as gz:
|
||||||
return env, args, cwd
|
self.emulator = CoreEmulator(
|
||||||
|
executable,
|
||||||
def _get_params(self):
|
self.profile_dir.name,
|
||||||
if self.gen == "core":
|
storage=storage,
|
||||||
return self._get_params_core()
|
workdir=workdir,
|
||||||
elif self.gen == "legacy":
|
sdcard=gz.read(),
|
||||||
return self._get_params_legacy()
|
headless=True,
|
||||||
else:
|
)
|
||||||
raise ValueError("Unknown gen")
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
env, args, cwd = self._get_params()
|
|
||||||
self.process = subprocess.Popen(
|
|
||||||
args, cwd=cwd, env=env, stdout=open(os.devnull, "w")
|
|
||||||
)
|
|
||||||
|
|
||||||
# wait until emulator is listening
|
|
||||||
transport = UdpTransport("127.0.0.1:21324")
|
|
||||||
transport.open()
|
|
||||||
for _ in range(300):
|
|
||||||
if transport._ping():
|
|
||||||
break
|
|
||||||
if self.process.poll() is not None:
|
|
||||||
self._cleanup()
|
|
||||||
raise RuntimeError("Emulator proces died")
|
|
||||||
time.sleep(0.1)
|
|
||||||
else:
|
|
||||||
# could not connect after 300 attempts * 0.1s = 30s of waiting
|
|
||||||
self._cleanup()
|
|
||||||
raise RuntimeError("Can't connect to emulator")
|
|
||||||
transport.close()
|
|
||||||
|
|
||||||
self.client = TrezorClientDebugLink(transport)
|
|
||||||
self.client.open()
|
|
||||||
check_version(self.tag, self.client.version)
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
if self.client:
|
|
||||||
self.client.close()
|
|
||||||
self.process.terminate()
|
|
||||||
try:
|
|
||||||
self.process.wait(1)
|
|
||||||
except subprocess.TimeoutExpired:
|
|
||||||
self.process.kill()
|
|
||||||
|
|
||||||
def restart(self):
|
|
||||||
self.stop()
|
|
||||||
self.start()
|
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
self.start()
|
self.emulator.start()
|
||||||
return self
|
return self.emulator
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_value, traceback):
|
def __exit__(self, exc_type, exc_value, traceback):
|
||||||
self._cleanup()
|
self.emulator.stop()
|
||||||
|
self.profile_dir.cleanup()
|
||||||
def _cleanup(self):
|
|
||||||
self.stop()
|
|
||||||
self.workdir.cleanup()
|
|
||||||
|
|
||||||
def _storage_file(self):
|
|
||||||
if self.gen == "legacy":
|
|
||||||
return self.workdir.name + "/emulator.img"
|
|
||||||
elif self.gen == "core":
|
|
||||||
return self.workdir.name + "/trezor.flash"
|
|
||||||
else:
|
|
||||||
raise ValueError("Unknown gen")
|
|
||||||
|
|
||||||
def storage(self):
|
|
||||||
return open(self._storage_file(), "rb").read()
|
|
||||||
|
@ -28,12 +28,11 @@ from ..upgrade_tests import core_only
|
|||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def emulator():
|
def emulator():
|
||||||
emu = EmulatorWrapper("core")
|
with EmulatorWrapper("core") as emu:
|
||||||
with emu:
|
|
||||||
yield emu
|
yield emu
|
||||||
|
|
||||||
|
|
||||||
def _restart(device_handler: BackgroundDeviceHandler, emulator: EmulatorWrapper):
|
def _restart(device_handler, emulator):
|
||||||
device_handler.restart(emulator)
|
device_handler.restart(emulator)
|
||||||
return device_handler.debuglink()
|
return device_handler.debuglink()
|
||||||
|
|
||||||
|
@ -2,7 +2,7 @@ import os
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from ..emulators import EmulatorWrapper
|
from ..emulators import LOCAL_BUILD_PATHS
|
||||||
|
|
||||||
SELECTED_GENS = [
|
SELECTED_GENS = [
|
||||||
gen.strip() for gen in os.environ.get("TREZOR_UPGRADE_TEST", "").split(",") if gen
|
gen.strip() for gen in os.environ.get("TREZOR_UPGRADE_TEST", "").split(",") if gen
|
||||||
@ -15,17 +15,8 @@ if SELECTED_GENS:
|
|||||||
|
|
||||||
else:
|
else:
|
||||||
# if no selection was provided, select those for which we have emulators
|
# if no selection was provided, select those for which we have emulators
|
||||||
try:
|
LEGACY_ENABLED = LOCAL_BUILD_PATHS["legacy"].exists()
|
||||||
EmulatorWrapper("legacy")
|
CORE_ENABLED = LOCAL_BUILD_PATHS["core"].exists()
|
||||||
LEGACY_ENABLED = True
|
|
||||||
except Exception:
|
|
||||||
LEGACY_ENABLED = False
|
|
||||||
|
|
||||||
try:
|
|
||||||
EmulatorWrapper("core")
|
|
||||||
CORE_ENABLED = True
|
|
||||||
except Exception:
|
|
||||||
CORE_ENABLED = False
|
|
||||||
|
|
||||||
|
|
||||||
legacy_only = pytest.mark.skipif(
|
legacy_only = pytest.mark.skipif(
|
||||||
|
@ -96,7 +96,7 @@ def test_upgrade_load(gen, from_tag, to_tag):
|
|||||||
)
|
)
|
||||||
device_id = emu.client.features.device_id
|
device_id = emu.client.features.device_id
|
||||||
asserts(from_tag, emu.client)
|
asserts(from_tag, emu.client)
|
||||||
storage = emu.storage()
|
storage = emu.get_storage()
|
||||||
|
|
||||||
with EmulatorWrapper(gen, to_tag, storage=storage) as emu:
|
with EmulatorWrapper(gen, to_tag, storage=storage) as emu:
|
||||||
assert device_id == emu.client.features.device_id
|
assert device_id == emu.client.features.device_id
|
||||||
@ -128,7 +128,7 @@ def test_upgrade_reset(gen, from_tag, to_tag):
|
|||||||
device_id = emu.client.features.device_id
|
device_id = emu.client.features.device_id
|
||||||
asserts(from_tag, emu.client)
|
asserts(from_tag, emu.client)
|
||||||
address = btc.get_address(emu.client, "Bitcoin", PATH)
|
address = btc.get_address(emu.client, "Bitcoin", PATH)
|
||||||
storage = emu.storage()
|
storage = emu.get_storage()
|
||||||
|
|
||||||
with EmulatorWrapper(gen, to_tag, storage=storage) as emu:
|
with EmulatorWrapper(gen, to_tag, storage=storage) as emu:
|
||||||
assert device_id == emu.client.features.device_id
|
assert device_id == emu.client.features.device_id
|
||||||
@ -162,7 +162,7 @@ def test_upgrade_reset_skip_backup(gen, from_tag, to_tag):
|
|||||||
device_id = emu.client.features.device_id
|
device_id = emu.client.features.device_id
|
||||||
asserts(from_tag, emu.client)
|
asserts(from_tag, emu.client)
|
||||||
address = btc.get_address(emu.client, "Bitcoin", PATH)
|
address = btc.get_address(emu.client, "Bitcoin", PATH)
|
||||||
storage = emu.storage()
|
storage = emu.get_storage()
|
||||||
|
|
||||||
with EmulatorWrapper(gen, to_tag, storage=storage) as emu:
|
with EmulatorWrapper(gen, to_tag, storage=storage) as emu:
|
||||||
assert device_id == emu.client.features.device_id
|
assert device_id == emu.client.features.device_id
|
||||||
@ -196,7 +196,7 @@ def test_upgrade_reset_no_backup(gen, from_tag, to_tag):
|
|||||||
device_id = emu.client.features.device_id
|
device_id = emu.client.features.device_id
|
||||||
asserts(from_tag, emu.client)
|
asserts(from_tag, emu.client)
|
||||||
address = btc.get_address(emu.client, "Bitcoin", PATH)
|
address = btc.get_address(emu.client, "Bitcoin", PATH)
|
||||||
storage = emu.storage()
|
storage = emu.get_storage()
|
||||||
|
|
||||||
with EmulatorWrapper(gen, to_tag, storage=storage) as emu:
|
with EmulatorWrapper(gen, to_tag, storage=storage) as emu:
|
||||||
assert device_id == emu.client.features.device_id
|
assert device_id == emu.client.features.device_id
|
||||||
@ -222,7 +222,7 @@ def test_upgrade_shamir_recovery(gen, from_tag, to_tag):
|
|||||||
assert "2 more shares" in layout.text
|
assert "2 more shares" in layout.text
|
||||||
|
|
||||||
device_id = emu.client.features.device_id
|
device_id = emu.client.features.device_id
|
||||||
storage = emu.storage()
|
storage = emu.get_storage()
|
||||||
device_handler.check_finalize()
|
device_handler.check_finalize()
|
||||||
|
|
||||||
with EmulatorWrapper(gen, to_tag, storage=storage) as emu, BackgroundDeviceHandler(
|
with EmulatorWrapper(gen, to_tag, storage=storage) as emu, BackgroundDeviceHandler(
|
||||||
@ -258,7 +258,7 @@ def test_upgrade_u2f(gen, from_tag, to_tag):
|
|||||||
|
|
||||||
counter = fido.get_next_counter(emu.client)
|
counter = fido.get_next_counter(emu.client)
|
||||||
assert counter == 11
|
assert counter == 11
|
||||||
storage = emu.storage()
|
storage = emu.get_storage()
|
||||||
|
|
||||||
with EmulatorWrapper(gen, to_tag, storage=storage) as emu:
|
with EmulatorWrapper(gen, to_tag, storage=storage) as emu:
|
||||||
counter = fido.get_next_counter(emu.client)
|
counter = fido.get_next_counter(emu.client)
|
||||||
|
Loading…
Reference in New Issue
Block a user