2015-02-11 13:31:21 +00:00
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest
import aiohttp
import asyncio
import os
import stat
2015-03-16 10:52:22 +00:00
import socket
2015-04-27 13:09:42 +00:00
import sys
2015-10-21 12:28:39 +00:00
import uuid
import shutil
2015-02-11 13:31:21 +00:00
from tests . utils import asyncio_patch
2015-03-26 16:43:00 +00:00
from unittest . mock import patch , MagicMock , PropertyMock
2015-04-27 13:09:42 +00:00
pytestmark = pytest . mark . skipif ( sys . platform . startswith ( " win " ) , reason = " Not supported on Windows " )
if not sys . platform . startswith ( " win " ) :
from gns3server . modules . iou . iou_vm import IOUVM
from gns3server . modules . iou . iou_error import IOUError
from gns3server . modules . iou import IOU
2015-03-19 14:36:06 +00:00
from gns3server . config import Config
2015-02-11 13:31:21 +00:00
2015-03-19 16:42:43 +00:00
2015-02-11 13:31:21 +00:00
@pytest.fixture ( scope = " module " )
def manager ( port_manager ) :
m = IOU . instance ( )
m . port_manager = port_manager
return m
@pytest.fixture ( scope = " function " )
2015-03-16 10:52:22 +00:00
def vm ( project , manager , tmpdir , fake_iou_bin , iourc_file ) :
2015-02-17 15:40:45 +00:00
fake_file = str ( tmpdir / " iouyap " )
2015-02-11 13:31:21 +00:00
with open ( fake_file , " w+ " ) as f :
f . write ( " 1 " )
2015-10-21 12:28:39 +00:00
vm = IOUVM ( " test " , str ( uuid . uuid4 ( ) ) , project , manager )
2015-02-11 13:31:21 +00:00
config = manager . config . get_section_config ( " IOU " )
config [ " iouyap_path " ] = fake_file
2015-03-16 10:52:22 +00:00
config [ " iourc_path " ] = iourc_file
2015-02-11 13:31:21 +00:00
manager . config . set_section_config ( " IOU " , config )
2015-11-10 14:21:10 +00:00
vm . path = " iou.bin "
2015-02-11 13:31:21 +00:00
return vm
2015-03-16 10:52:22 +00:00
@pytest.fixture
def iourc_file ( tmpdir ) :
path = str ( tmpdir / " iourc " )
with open ( path , " w+ " ) as f :
hostname = socket . gethostname ( )
f . write ( " [license] \n {} = aaaaaaaaaaaaaaaa; " . format ( hostname ) )
return path
2015-02-11 13:31:21 +00:00
@pytest.fixture
def fake_iou_bin ( tmpdir ) :
""" Create a fake IOU image on disk """
2015-03-16 10:52:22 +00:00
os . makedirs ( str ( tmpdir / " IOU " ) , exist_ok = True )
path = str ( tmpdir / " IOU " / " iou.bin " )
2015-02-11 13:31:21 +00:00
with open ( path , " w+ " ) as f :
f . write ( ' \x7f ELF \x01 \x01 \x01 ' )
os . chmod ( path , stat . S_IREAD | stat . S_IEXEC )
return path
def test_vm ( project , manager ) :
vm = IOUVM ( " test " , " 00010203-0405-0607-0809-0a0b0c0d0e0f " , project , manager )
assert vm . name == " test "
assert vm . id == " 00010203-0405-0607-0809-0a0b0c0d0e0f "
2015-06-06 21:15:03 +00:00
def test_vm_startup_config_content ( project , manager ) :
2015-04-12 21:09:37 +00:00
vm = IOUVM ( " test " , " 00010203-0405-0607-0808-0a0b0c0d0e0f " , project , manager )
2015-06-06 21:15:03 +00:00
vm . startup_config_content = " hostname % h "
2015-02-16 09:05:17 +00:00
assert vm . name == " test "
2015-06-06 21:15:03 +00:00
assert vm . startup_config_content == " hostname test "
2015-02-16 09:05:17 +00:00
assert vm . id == " 00010203-0405-0607-0808-0a0b0c0d0e0f "
2015-02-13 21:16:43 +00:00
2015-02-11 13:31:21 +00:00
@patch ( " gns3server.config.Config.get_section_config " , return_value = { " iouyap_path " : " /bin/test_fake " } )
2015-06-19 14:35:19 +00:00
def test_vm_invalid_iouyap_path ( project , manager , loop , fake_iou_bin ) :
2015-02-11 13:31:21 +00:00
with pytest . raises ( IOUError ) :
vm = IOUVM ( " test " , " 00010203-0405-0607-0809-0a0b0c0d0e0e " , project , manager )
2015-11-10 14:21:10 +00:00
vm . path = " iou.bin "
2015-02-11 13:31:21 +00:00
loop . run_until_complete ( asyncio . async ( vm . start ( ) ) )
2015-02-12 14:20:47 +00:00
def test_start ( loop , vm , monkeypatch ) :
2015-03-12 05:09:01 +00:00
2015-03-26 16:43:00 +00:00
mock_process = MagicMock ( )
2015-03-12 05:09:01 +00:00
with patch ( " gns3server.modules.iou.iou_vm.IOUVM._check_requirements " , return_value = True ) :
2015-03-14 19:16:27 +00:00
with asyncio_patch ( " gns3server.modules.iou.iou_vm.IOUVM._check_iou_licence " , return_value = True ) :
with asyncio_patch ( " gns3server.modules.iou.iou_vm.IOUVM._start_ioucon " , return_value = True ) :
with asyncio_patch ( " gns3server.modules.iou.iou_vm.IOUVM._start_iouyap " , return_value = True ) :
2016-02-02 17:25:17 +00:00
with asyncio_patch ( " asyncio.create_subprocess_exec " , return_value = mock_process ) as mock_exec :
2015-03-26 16:43:00 +00:00
mock_process . returncode = None
2015-03-14 19:16:27 +00:00
loop . run_until_complete ( asyncio . async ( vm . start ( ) ) )
assert vm . is_running ( )
2016-02-02 17:25:17 +00:00
assert vm . command_line == ' ' . join ( mock_exec . call_args [ 0 ] )
2015-02-11 13:31:21 +00:00
2016-03-21 15:53:11 +00:00
2015-02-17 15:40:45 +00:00
def test_start_with_iourc ( loop , vm , monkeypatch , tmpdir ) :
fake_file = str ( tmpdir / " iourc " )
with open ( fake_file , " w+ " ) as f :
f . write ( " 1 " )
2015-03-26 16:43:00 +00:00
mock_process = MagicMock ( )
2015-02-17 15:40:45 +00:00
with patch ( " gns3server.config.Config.get_section_config " , return_value = { " iourc_path " : fake_file , " iouyap_path " : vm . iouyap_path } ) :
with asyncio_patch ( " gns3server.modules.iou.iou_vm.IOUVM._check_requirements " , return_value = True ) :
2015-03-14 19:16:27 +00:00
with asyncio_patch ( " gns3server.modules.iou.iou_vm.IOUVM._check_iou_licence " , return_value = True ) :
with asyncio_patch ( " gns3server.modules.iou.iou_vm.IOUVM._start_ioucon " , return_value = True ) :
with asyncio_patch ( " gns3server.modules.iou.iou_vm.IOUVM._start_iouyap " , return_value = True ) :
2015-03-26 16:43:00 +00:00
with asyncio_patch ( " asyncio.create_subprocess_exec " , return_value = mock_process ) as exec_mock :
mock_process . returncode = None
2015-03-14 19:16:27 +00:00
loop . run_until_complete ( asyncio . async ( vm . start ( ) ) )
assert vm . is_running ( )
arsgs , kwargs = exec_mock . call_args
assert kwargs [ " env " ] [ " IOURC " ] == fake_file
2015-02-17 15:40:45 +00:00
2015-02-13 19:57:20 +00:00
def test_rename_nvram_file ( loop , vm , monkeypatch ) :
"""
It should rename the nvram file to the correct name before launching the VM
"""
with open ( os . path . join ( vm . working_dir , " nvram_0000 {} " . format ( vm . application_id + 1 ) ) , ' w+ ' ) as f :
f . write ( " 1 " )
2015-03-05 16:00:25 +00:00
with open ( os . path . join ( vm . working_dir , " vlan.dat-0000 {} " . format ( vm . application_id + 1 ) ) , ' w+ ' ) as f :
f . write ( " 1 " )
2015-02-13 19:57:20 +00:00
vm . _rename_nvram_file ( )
assert os . path . exists ( os . path . join ( vm . working_dir , " nvram_0000 {} " . format ( vm . application_id ) ) )
2015-03-05 16:00:25 +00:00
assert os . path . exists ( os . path . join ( vm . working_dir , " vlan.dat-0000 {} " . format ( vm . application_id ) ) )
2015-02-13 19:57:20 +00:00
2015-02-11 13:31:21 +00:00
def test_stop ( loop , vm ) :
process = MagicMock ( )
# Wait process kill success
future = asyncio . Future ( )
future . set_result ( True )
process . wait . return_value = future
with asyncio_patch ( " gns3server.modules.iou.iou_vm.IOUVM._check_requirements " , return_value = True ) :
2015-02-12 14:20:47 +00:00
with asyncio_patch ( " gns3server.modules.iou.iou_vm.IOUVM._start_ioucon " , return_value = True ) :
with asyncio_patch ( " gns3server.modules.iou.iou_vm.IOUVM._start_iouyap " , return_value = True ) :
with asyncio_patch ( " asyncio.create_subprocess_exec " , return_value = process ) :
2015-03-26 16:43:00 +00:00
with asyncio_patch ( " gns3server.utils.asyncio.wait_for_process_termination " ) :
loop . run_until_complete ( asyncio . async ( vm . start ( ) ) )
process . returncode = None
assert vm . is_running ( )
loop . run_until_complete ( asyncio . async ( vm . stop ( ) ) )
assert vm . is_running ( ) is False
process . terminate . assert_called_with ( )
2015-02-11 13:31:21 +00:00
def test_reload ( loop , vm , fake_iou_bin ) :
process = MagicMock ( )
# Wait process kill success
future = asyncio . Future ( )
future . set_result ( True )
process . wait . return_value = future
2015-03-26 16:43:00 +00:00
process . returncode = None
2015-02-11 13:31:21 +00:00
with asyncio_patch ( " gns3server.modules.iou.iou_vm.IOUVM._check_requirements " , return_value = True ) :
2015-02-12 14:20:47 +00:00
with asyncio_patch ( " gns3server.modules.iou.iou_vm.IOUVM._start_ioucon " , return_value = True ) :
with asyncio_patch ( " gns3server.modules.iou.iou_vm.IOUVM._start_iouyap " , return_value = True ) :
with asyncio_patch ( " asyncio.create_subprocess_exec " , return_value = process ) :
2015-03-26 16:43:00 +00:00
with asyncio_patch ( " gns3server.utils.asyncio.wait_for_process_termination " ) :
loop . run_until_complete ( asyncio . async ( vm . start ( ) ) )
assert vm . is_running ( )
loop . run_until_complete ( asyncio . async ( vm . reload ( ) ) )
assert vm . is_running ( ) is True
process . terminate . assert_called_with ( )
2015-02-11 13:31:21 +00:00
2015-02-16 18:14:45 +00:00
def test_close ( vm , port_manager , loop ) :
2015-02-11 13:31:21 +00:00
with asyncio_patch ( " gns3server.modules.iou.iou_vm.IOUVM._check_requirements " , return_value = True ) :
with asyncio_patch ( " asyncio.create_subprocess_exec " , return_value = MagicMock ( ) ) :
vm . start ( )
port = vm . console
2015-02-16 18:14:45 +00:00
loop . run_until_complete ( asyncio . async ( vm . close ( ) ) )
2015-02-11 13:31:21 +00:00
# Raise an exception if the port is not free
2015-03-21 23:19:12 +00:00
port_manager . reserve_tcp_port ( port , vm . project )
2015-02-11 13:31:21 +00:00
assert vm . is_running ( ) is False
2015-02-12 14:20:47 +00:00
def test_path ( vm , fake_iou_bin ) :
2015-11-10 14:21:10 +00:00
with patch ( " gns3server.config.Config.get_section_config " , return_value = { " local " : True } ) :
vm . path = fake_iou_bin
assert vm . path == fake_iou_bin
2015-02-11 13:31:21 +00:00
2015-04-07 13:25:53 +00:00
def test_path_12_location ( vm , fake_iou_bin ) :
# In 1.2 users uploaded images to the images roots
# after the migration their images are inside images/IOU
# but old topologies use old path
2015-11-10 14:21:10 +00:00
with patch ( " gns3server.config.Config.get_section_config " , return_value = { " local " : True } ) :
vm . path = fake_iou_bin . replace ( " /IOU " , " " )
assert vm . path == fake_iou_bin
2015-04-07 13:25:53 +00:00
2015-02-25 15:26:17 +00:00
def test_path_relative ( vm , fake_iou_bin , tmpdir ) :
2015-03-19 14:36:06 +00:00
config = Config . instance ( )
config . set ( " Server " , " images_path " , str ( tmpdir ) )
vm . path = " iou.bin "
2015-02-25 15:26:17 +00:00
assert vm . path == fake_iou_bin
2015-02-11 13:31:21 +00:00
def test_path_invalid_bin ( vm , tmpdir ) :
2015-11-10 14:21:10 +00:00
with patch ( " gns3server.config.Config.get_section_config " , return_value = { " local " : True } ) :
path = str ( tmpdir / " test.bin " )
with pytest . raises ( IOUError ) :
vm . path = path
vm . _check_requirements ( )
2015-02-11 13:31:21 +00:00
2015-11-10 14:21:10 +00:00
with open ( path , " w+ " ) as f :
f . write ( " BUG " )
2015-02-11 13:31:21 +00:00
2015-11-10 14:21:10 +00:00
with pytest . raises ( IOUError ) :
vm . path = path
vm . _check_requirements ( )
2015-02-11 13:31:21 +00:00
def test_create_netmap_config ( vm ) :
vm . _create_netmap_config ( )
netmap_path = os . path . join ( vm . working_dir , " NETMAP " )
with open ( netmap_path ) as f :
content = f . read ( )
assert " 513:0/0 1:0/0 " in content
assert " 513:15/3 1:15/3 " in content
2015-02-16 16:20:07 +00:00
def test_build_command ( vm , loop ) :
2015-02-11 13:31:21 +00:00
2015-02-16 16:20:07 +00:00
assert loop . run_until_complete ( asyncio . async ( vm . _build_command ( ) ) ) == [ vm . path , " -L " , str ( vm . application_id ) ]
2015-02-13 21:16:43 +00:00
2015-06-19 14:35:19 +00:00
2015-06-06 21:15:03 +00:00
def test_get_startup_config ( vm ) :
2015-02-13 21:16:43 +00:00
content = " service timestamps debug datetime msec \n service timestamps log datetime msec \n no service password-encryption "
2015-06-06 21:15:03 +00:00
vm . startup_config = content
assert vm . startup_config == content
2015-02-13 21:16:43 +00:00
2015-06-06 21:15:03 +00:00
def test_update_startup_config ( vm ) :
2015-02-13 21:16:43 +00:00
content = " service timestamps debug datetime msec \n service timestamps log datetime msec \n no service password-encryption "
2015-06-06 21:15:03 +00:00
vm . startup_config_content = content
filepath = os . path . join ( vm . working_dir , " startup-config.cfg " )
2015-02-13 21:16:43 +00:00
assert os . path . exists ( filepath )
with open ( filepath ) as f :
assert f . read ( ) == content
2015-06-06 21:15:03 +00:00
def test_update_startup_config_empty ( vm ) :
2015-04-06 19:30:57 +00:00
content = " service timestamps debug datetime msec \n service timestamps log datetime msec \n no service password-encryption "
2015-06-06 21:15:03 +00:00
vm . startup_config_content = content
filepath = os . path . join ( vm . working_dir , " startup-config.cfg " )
2015-04-06 19:30:57 +00:00
assert os . path . exists ( filepath )
with open ( filepath ) as f :
assert f . read ( ) == content
2015-06-06 21:15:03 +00:00
vm . startup_config_content = " "
2015-04-06 19:30:57 +00:00
with open ( filepath ) as f :
assert f . read ( ) == content
2015-06-06 21:15:03 +00:00
def test_update_startup_config_content_hostname ( vm ) :
2015-02-13 21:16:43 +00:00
content = " hostname % h \n "
vm . name = " pc1 "
2015-06-06 21:15:03 +00:00
vm . startup_config_content = content
with open ( vm . startup_config_file ) as f :
2015-02-13 21:16:43 +00:00
assert f . read ( ) == " hostname pc1 \n "
def test_change_name ( vm , tmpdir ) :
2015-06-06 21:15:03 +00:00
path = os . path . join ( vm . working_dir , " startup-config.cfg " )
2015-02-13 21:16:43 +00:00
vm . name = " world "
with open ( path , ' w+ ' ) as f :
f . write ( " hostname world " )
vm . name = " hello "
assert vm . name == " hello "
with open ( path ) as f :
assert f . read ( ) == " hostname hello "
2015-02-16 16:20:07 +00:00
def test_library_check ( loop , vm ) :
with asyncio_patch ( " gns3server.utils.asyncio.subprocess_check_output " , return_value = " " ) as mock :
loop . run_until_complete ( asyncio . async ( vm . _library_check ( ) ) )
with asyncio_patch ( " gns3server.utils.asyncio.subprocess_check_output " , return_value = " libssl => not found " ) as mock :
with pytest . raises ( IOUError ) :
loop . run_until_complete ( asyncio . async ( vm . _library_check ( ) ) )
def test_enable_l1_keepalives ( loop , vm ) :
with asyncio_patch ( " gns3server.utils.asyncio.subprocess_check_output " , return_value = " *************************************************************** \n \n -l Enable Layer 1 keepalive messages \n -u <n> UDP port base for distributed networks \n " ) as mock :
command = [ " test " ]
loop . run_until_complete ( asyncio . async ( vm . _enable_l1_keepalives ( command ) ) )
assert command == [ " test " , " -l " ]
with asyncio_patch ( " gns3server.utils.asyncio.subprocess_check_output " , return_value = " *************************************************************** \n \n -u <n> UDP port base for distributed networks \n " ) as mock :
command = [ " test " ]
with pytest . raises ( IOUError ) :
loop . run_until_complete ( asyncio . async ( vm . _enable_l1_keepalives ( command ) ) )
assert command == [ " test " ]
2015-02-16 19:08:04 +00:00
2015-02-17 09:37:09 +00:00
def test_start_capture ( vm , tmpdir , manager , free_console_port , loop ) :
2015-02-16 19:08:04 +00:00
output_file = str ( tmpdir / " test.pcap " )
2015-05-01 09:20:55 +00:00
nio = manager . create_nio ( vm . iouyap_path , { " type " : " nio_udp " , " lport " : free_console_port , " rport " : free_console_port , " rhost " : " 127.0.0.1 " } )
2016-01-11 14:19:15 +00:00
loop . run_until_complete ( asyncio . async ( vm . adapter_add_nio_binding ( 0 , 0 , nio ) ) )
2015-02-17 09:37:09 +00:00
loop . run_until_complete ( asyncio . async ( vm . start_capture ( 0 , 0 , output_file ) ) )
2015-02-16 19:08:04 +00:00
assert vm . _adapters [ 0 ] . get_nio ( 0 ) . capturing
2015-02-17 09:37:09 +00:00
def test_stop_capture ( vm , tmpdir , manager , free_console_port , loop ) :
2015-02-16 19:08:04 +00:00
output_file = str ( tmpdir / " test.pcap " )
2015-05-01 09:20:55 +00:00
nio = manager . create_nio ( vm . iouyap_path , { " type " : " nio_udp " , " lport " : free_console_port , " rport " : free_console_port , " rhost " : " 127.0.0.1 " } )
2016-01-11 14:19:15 +00:00
loop . run_until_complete ( asyncio . async ( vm . adapter_add_nio_binding ( 0 , 0 , nio ) ) )
2015-02-17 09:37:09 +00:00
loop . run_until_complete ( vm . start_capture ( 0 , 0 , output_file ) )
2015-02-16 19:08:04 +00:00
assert vm . _adapters [ 0 ] . get_nio ( 0 ) . capturing
2015-02-17 09:37:09 +00:00
loop . run_until_complete ( asyncio . async ( vm . stop_capture ( 0 , 0 ) ) )
2015-02-17 15:40:45 +00:00
assert vm . _adapters [ 0 ] . get_nio ( 0 ) . capturing is False
2015-02-25 15:35:13 +00:00
2015-02-26 01:55:35 +00:00
def test_get_legacy_vm_workdir ( ) :
2015-02-25 15:35:13 +00:00
2015-02-26 09:45:37 +00:00
assert IOU . get_legacy_vm_workdir ( 42 , " bla " ) == " iou/device-42 "
2015-03-16 10:52:22 +00:00
def test_invalid_iou_file ( loop , vm , iourc_file ) :
hostname = socket . gethostname ( )
loop . run_until_complete ( asyncio . async ( vm . _check_iou_licence ( ) ) )
# Missing ;
with pytest . raises ( IOUError ) :
with open ( iourc_file , " w+ " ) as f :
f . write ( " [license] \n {} = aaaaaaaaaaaaaaaa " . format ( hostname ) )
loop . run_until_complete ( asyncio . async ( vm . _check_iou_licence ( ) ) )
# Key too short
with pytest . raises ( IOUError ) :
with open ( iourc_file , " w+ " ) as f :
f . write ( " [license] \n {} = aaaaaaaaaaaaaa; " . format ( hostname ) )
loop . run_until_complete ( asyncio . async ( vm . _check_iou_licence ( ) ) )
# Invalid hostname
with pytest . raises ( IOUError ) :
with open ( iourc_file , " w+ " ) as f :
f . write ( " [license] \n bla = aaaaaaaaaaaaaa; " )
loop . run_until_complete ( asyncio . async ( vm . _check_iou_licence ( ) ) )
# Missing licence section
with pytest . raises ( IOUError ) :
with open ( iourc_file , " w+ " ) as f :
f . write ( " [licensetest] \n {} = aaaaaaaaaaaaaaaa; " )
loop . run_until_complete ( asyncio . async ( vm . _check_iou_licence ( ) ) )
# Broken config file
with pytest . raises ( IOUError ) :
with open ( iourc_file , " w+ " ) as f :
f . write ( " [ " )
loop . run_until_complete ( asyncio . async ( vm . _check_iou_licence ( ) ) )
# Missing file
with pytest . raises ( IOUError ) :
os . remove ( iourc_file )
loop . run_until_complete ( asyncio . async ( vm . _check_iou_licence ( ) ) )
2015-03-17 15:31:45 +00:00
def test_iourc_content ( vm ) :
vm . iourc_content = " test "
with open ( os . path . join ( vm . temporary_directory , " iourc " ) ) as f :
assert f . read ( ) == " test "
2015-10-21 12:28:39 +00:00
2016-03-21 15:53:11 +00:00
def test_iourc_content_fix_carriage_return ( vm ) :
vm . iourc_content = " test \r \n 12 "
with open ( os . path . join ( vm . temporary_directory , " iourc " ) ) as f :
assert f . read ( ) == " test \n 12 "
2015-10-21 12:28:39 +00:00
def test_extract_configs ( vm ) :
assert vm . extract_configs ( ) == ( None , None )
with open ( os . path . join ( vm . working_dir , " nvram_00001 " ) , " w+ " ) as f :
f . write ( " CORRUPTED " )
assert vm . extract_configs ( ) == ( None , None )
with open ( os . path . join ( vm . working_dir , " nvram_00001 " ) , " w+ " ) as f :
f . write ( " CORRUPTED " )
assert vm . extract_configs ( ) == ( None , None )
shutil . copy ( " tests/resources/nvram_iou " , os . path . join ( vm . working_dir , " nvram_00001 " ) )
startup_config , private_config = vm . extract_configs ( )
assert len ( startup_config ) == 1392
assert len ( private_config ) == 0