@ -14,6 +14,7 @@
# You should have received a copy of the License along with this library.
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
import gzip
import os
import subprocess
import tempfile
@ -21,15 +22,17 @@ import time
from collections import defaultdict
from trezorlib . debuglink import TrezorClientDebugLink
from trezorlib . transport import TransportException , get_t ransport
from trezorlib . transport . udp import UdpT ransport
BINDI R = os . path . dirname( os . path . abspath ( __file__ ) ) + " /emulators "
ROOT = os . path . dirname ( os . path . abspath ( __file__ ) ) + " /../ "
ROOT = os . path . abspath( os . path . dirname ( __file__ ) + " /.. " )
BINDIR = ROOT + " /tests/emulators "
LOCAL_BUILD_PATHS = {
" core " : ROOT + " core/build/unix/micropython" ,
" legacy " : ROOT + " legacy/firmware/trezor.elf" ,
" core " : ROOT + " / core/build/unix/micropython" ,
" legacy " : ROOT + " / legacy/firmware/trezor.elf" ,
}
SD_CARD_GZ = ROOT + " /tests/trezor.sdcard.gz "
ENV = { " SDL_VIDEODRIVER " : " dummy " }
@ -84,46 +87,68 @@ class EmulatorWrapper:
if storage :
open ( self . _storage_file ( ) , " wb " ) . write ( storage )
with gzip . open ( SD_CARD_GZ , " rb " ) as gz :
with open ( self . workdir . name + " /trezor.sdcard " , " wb " ) as sd :
sd . write ( gz . read ( ) )
self . client = None
def __enter__ ( self ) :
def _get_params_core ( self ) :
env = ENV . copy ( )
args = [ self . executable , " -m " , " main " ]
# for firmware 2.1.2 and newer
env [ " TREZOR_PROFILE_DIR " ] = self . workdir . name
# for firmware 2.1.1 and older
env [ " TREZOR_PROFILE " ] = self . workdir . name
if self . executable == LOCAL_BUILD_PATHS [ " core " ] :
cwd = ROOT + " /core/src "
else :
cwd = self . workdir . name
return env , args , cwd
def _get_params_legacy ( self ) :
env = ENV . copy ( )
args = [ self . executable ]
env = ENV
cwd = self . workdir . name
return env , args , cwd
def _get_params ( self ) :
if self . gen == " core " :
args + = [ " -m " , " main " ]
# for firmware 2.1.2 and newer
env [ " TREZOR_PROFILE_DIR " ] = self . workdir . name
# for firmware 2.1.1 and older
env [ " TREZOR_PROFILE " ] = self . workdir . name
return self . _get_params_core ( )
elif self . gen == " legacy " :
return self . _get_params_legacy ( )
else :
raise ValueError ( " Unknown gen " )
def start ( self ) :
env , args , cwd = self . _get_params ( )
self . process = subprocess . Popen (
args , cwd = self . workdir . name , env = env , stdout = open ( os . devnull , " w " )
args , cwd = c wd, env = env , stdout = open ( os . devnull , " w " )
)
# wait until emulator is listening
transport = UdpTransport ( " 127.0.0.1:21324 " )
transport . open ( )
for _ in range ( 300 ) :
try :
time . sleep ( 0.1 )
transport = get_transport ( " udp:127.0.0.1:21324 " )
if transport . _ping ( ) :
break
except TransportException :
pass
if self . process . poll ( ) is not None :
self . _cleanup ( )
raise RuntimeError ( " Emulator proces died " )
time . sleep ( 0.1 )
else :
# could not connect after 300 attempts * 0.1s = 30s of waiting
self . _cleanup ( )
raise RuntimeError ( " Can ' t connect to emulator " )
transport . close ( )
self . client = TrezorClientDebugLink ( transport )
self . client . open ( )
check_version ( self . tag , self . client . version )
return self
def __exit__ ( self , exc_type , exc_value , traceback ) :
self . _cleanup ( )
return False
def _cleanup ( self ) :
def stop ( self ) :
if self . client :
self . client . close ( )
self . process . terminate ( )
@ -131,6 +156,20 @@ class EmulatorWrapper:
self . process . wait ( 1 )
except subprocess . TimeoutExpired :
self . process . kill ( )
def restart ( self ) :
self . stop ( )
self . start ( )
def __enter__ ( self ) :
self . start ( )
return self
def __exit__ ( self , exc_type , exc_value , traceback ) :
self . _cleanup ( )
def _cleanup ( self ) :
self . stop ( )
self . workdir . cleanup ( )
def _storage_file ( self ) :