diff --git a/tests/emulators.py b/tests/emulators.py index 71ee6cf734..c1df8cb120 100644 --- a/tests/emulators.py +++ b/tests/emulators.py @@ -15,23 +15,22 @@ # If not, see . import gzip -import os -import subprocess import tempfile -import time from collections import defaultdict +from pathlib import Path -from trezorlib.debuglink import TrezorClientDebugLink -from trezorlib.transport.udp import UdpTransport +from trezorlib._internal.emulator import CoreEmulator, LegacyEmulator + +ROOT = Path(__file__).parent.parent.resolve() +BINDIR = ROOT / "tests" / "emulators" -ROOT = os.path.abspath(os.path.dirname(__file__) + "/..") -BINDIR = ROOT + "/tests/emulators" LOCAL_BUILD_PATHS = { - "core": ROOT + "/core/build/unix/micropython", - "legacy": ROOT + "/legacy/firmware/trezor.elf", + "core": ROOT / "core" / "build" / "unix" / "micropython", + "legacy": ROOT / "legacy" / "firmware" / "trezor.elf", } -SD_CARD_GZ = ROOT + "/tests/trezor.sdcard.gz" +CORE_SRC_DIR = ROOT / "core" / "src" +SD_CARD_GZ = ROOT / "core" / "trezor.sdcard.gz" ENV = {"SDL_VIDEODRIVER": "dummy"} @@ -44,11 +43,11 @@ def check_version(tag, version_tuple): def filename_from_tag(gen, tag): - return f"{BINDIR}/trezor-emu-{gen}-{tag}" + return BINDIR / f"trezor-emu-{gen}-{tag}" def get_tags(): - files = os.listdir(BINDIR) + files = list(BINDIR.iterdir()) if not files: raise ValueError( "No files found. Use download_emulators.sh to download emulators." @@ -58,7 +57,7 @@ def get_tags(): for f in sorted(files): try: # example: "trezor-emu-core-v2.1.1" - _, _, gen, tag = f.split("-", maxsplit=3) + _, _, gen, tag = f.name.split("-", maxsplit=3) result[gen].append(tag) except ValueError: pass @@ -69,116 +68,40 @@ ALL_TAGS = get_tags() class EmulatorWrapper: - def __init__(self, gen, tag=None, executable=None, storage=None): - self.gen = gen - self.tag = tag - - if executable is not None: - self.executable = executable - elif tag is not None: - self.executable = filename_from_tag(gen, tag) + def __init__(self, gen, tag=None, storage=None): + if tag is not None: + executable = filename_from_tag(gen, tag) else: - self.executable = LOCAL_BUILD_PATHS[gen] + executable = LOCAL_BUILD_PATHS[gen] - if not os.path.exists(self.executable): - raise ValueError(f"emulator executable not found: {self.executable}") + if not executable.exists(): + raise ValueError(f"emulator executable not found: {executable}") - self.workdir = tempfile.TemporaryDirectory() - if storage: - open(self._storage_file(), "wb").write(storage) - - with gzip.open(SD_CARD_GZ, "rb") as gz: - with open(self.workdir.name + "/trezor.sdcard", "wb") as sd: - sd.write(gz.read()) - - self.client = None - - def _get_params_core(self): - env = ENV.copy() - args = [self.executable, "-m", "main"] - # for firmware 2.1.2 and newer - env["TREZOR_PROFILE_DIR"] = self.workdir.name - # for firmware 2.1.1 and older - env["TREZOR_PROFILE"] = self.workdir.name - - if self.executable == LOCAL_BUILD_PATHS["core"]: - cwd = ROOT + "/core/src" + self.profile_dir = tempfile.TemporaryDirectory() + if executable == LOCAL_BUILD_PATHS["core"]: + workdir = CORE_SRC_DIR else: - cwd = self.workdir.name + workdir = None - return env, args, cwd - - def _get_params_legacy(self): - env = ENV.copy() - args = [self.executable] - cwd = self.workdir.name - return env, args, cwd - - def _get_params(self): - if self.gen == "core": - return self._get_params_core() - elif self.gen == "legacy": - return self._get_params_legacy() - else: - raise ValueError("Unknown gen") - - def start(self): - env, args, cwd = self._get_params() - self.process = subprocess.Popen( - args, cwd=cwd, env=env, stdout=open(os.devnull, "w") - ) - - # wait until emulator is listening - transport = UdpTransport("127.0.0.1:21324") - transport.open() - for _ in range(300): - if transport._ping(): - break - if self.process.poll() is not None: - self._cleanup() - raise RuntimeError("Emulator proces died") - time.sleep(0.1) - else: - # could not connect after 300 attempts * 0.1s = 30s of waiting - self._cleanup() - raise RuntimeError("Can't connect to emulator") - transport.close() - - self.client = TrezorClientDebugLink(transport) - self.client.open() - check_version(self.tag, self.client.version) - - def stop(self): - if self.client: - self.client.close() - self.process.terminate() - try: - self.process.wait(1) - except subprocess.TimeoutExpired: - self.process.kill() - - def restart(self): - self.stop() - self.start() + if gen == "legacy": + self.emulator = LegacyEmulator( + executable, self.profile_dir.name, storage=storage, headless=True, + ) + elif gen == "core": + with gzip.open(SD_CARD_GZ, "rb") as gz: + self.emulator = CoreEmulator( + executable, + self.profile_dir.name, + storage=storage, + workdir=workdir, + sdcard=gz.read(), + headless=True, + ) def __enter__(self): - self.start() - return self + self.emulator.start() + return self.emulator def __exit__(self, exc_type, exc_value, traceback): - self._cleanup() - - def _cleanup(self): - self.stop() - self.workdir.cleanup() - - def _storage_file(self): - if self.gen == "legacy": - return self.workdir.name + "/emulator.img" - elif self.gen == "core": - return self.workdir.name + "/trezor.flash" - else: - raise ValueError("Unknown gen") - - def storage(self): - return open(self._storage_file(), "rb").read() + self.emulator.stop() + self.profile_dir.cleanup() diff --git a/tests/persistence_tests/test_shamir_persistence.py b/tests/persistence_tests/test_shamir_persistence.py index 33a6f8e4db..7118ebc7c4 100644 --- a/tests/persistence_tests/test_shamir_persistence.py +++ b/tests/persistence_tests/test_shamir_persistence.py @@ -28,12 +28,11 @@ from ..upgrade_tests import core_only @pytest.fixture def emulator(): - emu = EmulatorWrapper("core") - with emu: + with EmulatorWrapper("core") as emu: yield emu -def _restart(device_handler: BackgroundDeviceHandler, emulator: EmulatorWrapper): +def _restart(device_handler, emulator): device_handler.restart(emulator) return device_handler.debuglink() diff --git a/tests/upgrade_tests/__init__.py b/tests/upgrade_tests/__init__.py index efeb28409d..781b5720a7 100644 --- a/tests/upgrade_tests/__init__.py +++ b/tests/upgrade_tests/__init__.py @@ -2,7 +2,7 @@ import os import pytest -from ..emulators import EmulatorWrapper +from ..emulators import LOCAL_BUILD_PATHS SELECTED_GENS = [ gen.strip() for gen in os.environ.get("TREZOR_UPGRADE_TEST", "").split(",") if gen @@ -15,17 +15,8 @@ if SELECTED_GENS: else: # if no selection was provided, select those for which we have emulators - try: - EmulatorWrapper("legacy") - LEGACY_ENABLED = True - except Exception: - LEGACY_ENABLED = False - - try: - EmulatorWrapper("core") - CORE_ENABLED = True - except Exception: - CORE_ENABLED = False + LEGACY_ENABLED = LOCAL_BUILD_PATHS["legacy"].exists() + CORE_ENABLED = LOCAL_BUILD_PATHS["core"].exists() legacy_only = pytest.mark.skipif( diff --git a/tests/upgrade_tests/test_firmware_upgrades.py b/tests/upgrade_tests/test_firmware_upgrades.py index ff7ede5985..bfd1f94574 100644 --- a/tests/upgrade_tests/test_firmware_upgrades.py +++ b/tests/upgrade_tests/test_firmware_upgrades.py @@ -96,7 +96,7 @@ def test_upgrade_load(gen, from_tag, to_tag): ) device_id = emu.client.features.device_id asserts(from_tag, emu.client) - storage = emu.storage() + storage = emu.get_storage() with EmulatorWrapper(gen, to_tag, storage=storage) as emu: assert device_id == emu.client.features.device_id @@ -128,7 +128,7 @@ def test_upgrade_reset(gen, from_tag, to_tag): device_id = emu.client.features.device_id asserts(from_tag, emu.client) address = btc.get_address(emu.client, "Bitcoin", PATH) - storage = emu.storage() + storage = emu.get_storage() with EmulatorWrapper(gen, to_tag, storage=storage) as emu: assert device_id == emu.client.features.device_id @@ -162,7 +162,7 @@ def test_upgrade_reset_skip_backup(gen, from_tag, to_tag): device_id = emu.client.features.device_id asserts(from_tag, emu.client) address = btc.get_address(emu.client, "Bitcoin", PATH) - storage = emu.storage() + storage = emu.get_storage() with EmulatorWrapper(gen, to_tag, storage=storage) as emu: assert device_id == emu.client.features.device_id @@ -196,7 +196,7 @@ def test_upgrade_reset_no_backup(gen, from_tag, to_tag): device_id = emu.client.features.device_id asserts(from_tag, emu.client) address = btc.get_address(emu.client, "Bitcoin", PATH) - storage = emu.storage() + storage = emu.get_storage() with EmulatorWrapper(gen, to_tag, storage=storage) as emu: assert device_id == emu.client.features.device_id @@ -222,7 +222,7 @@ def test_upgrade_shamir_recovery(gen, from_tag, to_tag): assert "2 more shares" in layout.text device_id = emu.client.features.device_id - storage = emu.storage() + storage = emu.get_storage() device_handler.check_finalize() with EmulatorWrapper(gen, to_tag, storage=storage) as emu, BackgroundDeviceHandler( @@ -258,7 +258,7 @@ def test_upgrade_u2f(gen, from_tag, to_tag): counter = fido.get_next_counter(emu.client) assert counter == 11 - storage = emu.storage() + storage = emu.get_storage() with EmulatorWrapper(gen, to_tag, storage=storage) as emu: counter = fido.get_next_counter(emu.client)