#!/usr/bin/env python # # Copyright (C) 2016 GNS3 Technologies Inc. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os import uuid import json import pytest import socket import aiohttp from unittest.mock import MagicMock from tests.utils import AsyncioMagicMock, asyncio_patch from gns3server.controller import Controller from gns3server.controller.compute import Compute from gns3server.controller.project import Project from gns3server.config import Config from gns3server.version import __version__ def test_save(controller, controller_config_path): controller.save() assert os.path.exists(controller_config_path) with open(controller_config_path) as f: data = json.load(f) assert data["computes"] == [] assert data["version"] == __version__ assert data["settings"] == {} assert data["gns3vm"] == controller.gns3vm.__json__() def test_load(controller, controller_config_path, async_run): controller.save() with open(controller_config_path) as f: data = json.load(f) data["computes"] = [ { "host": "localhost", "port": 8000, "protocol": "http", "user": "admin", "password": "root", "compute_id": "test1" } ] data["settings"] = {"IOU": True} data["gns3vm"] = {"vmname": "Test VM"} with open(controller_config_path, "w+") as f: json.dump(data, f) async_run(controller.load()) assert controller.settings["IOU"] assert controller.computes["test1"].__json__() == { "compute_id": "test1", "connected": False, "host": "localhost", "port": 8000, "protocol": "http", "user": "admin", "name": "http://admin@localhost:8000", "cpu_usage_percent": None, "memory_usage_percent": None, "capabilities": { "version": None, "node_types": [] } } assert controller.gns3vm.settings["vmname"] == "Test VM" def test_import_computes(controller, controller_config_path, async_run): """ At first start the server should import the computes from the gns3_gui """ gns3_gui_conf = { "Servers": { "remote_servers": [ { "host": "127.0.0.1", "password": "", "port": 3081, "protocol": "http", "url": "http://127.0.0.1:3081", "user": "" } ] } } config_dir = os.path.dirname(controller_config_path) os.makedirs(config_dir, exist_ok=True) with open(os.path.join(config_dir, "gns3_gui.conf"), "w+") as f: json.dump(gns3_gui_conf, f) async_run(controller.load()) for compute in controller.computes.values(): if compute.id != "local": assert compute.host == "127.0.0.1" assert compute.port == 3081 assert compute.protocol == "http" assert compute.name == "http://127.0.0.1:3081" assert compute.user is None assert compute.password is None def test_settings(controller): controller._notification = MagicMock() controller.settings = {"a": 1} controller._notification.emit.assert_called_with("settings.updated", {"a": 1}) def test_load_projects(controller, projects_dir, async_run): controller.save() os.makedirs(os.path.join(projects_dir, "project1")) with open(os.path.join(projects_dir, "project1", "project1.gns3"), "w+") as f: f.write("") with asyncio_patch("gns3server.controller.Controller.load_project") as mock_load_project: async_run(controller.load_projects()) mock_load_project.assert_called_with(os.path.join(projects_dir, "project1", "project1.gns3"), load=False) def test_isEnabled(controller): Config.instance().set("Server", "controller", False) assert not controller.is_enabled() Config.instance().set("Server", "controller", True) assert controller.is_enabled() def test_add_compute(controller, controller_config_path, async_run): controller._notification = MagicMock() c = async_run(controller.add_compute(compute_id="test1", connect=False)) controller._notification.emit.assert_called_with("compute.created", c.__json__()) assert len(controller.computes) == 1 async_run(controller.add_compute(compute_id="test1", connect=False)) controller._notification.emit.assert_called_with("compute.updated", c.__json__()) assert len(controller.computes) == 1 async_run(controller.add_compute(compute_id="test2", connect=False)) assert len(controller.computes) == 2 def test_addDuplicateCompute(controller, controller_config_path, async_run): controller._notification = MagicMock() c = async_run(controller.add_compute(compute_id="test1", name="Test", connect=False)) assert len(controller.computes) == 1 with pytest.raises(aiohttp.web.HTTPConflict): async_run(controller.add_compute(compute_id="test2", name="Test", connect=False)) def test_deleteCompute(controller, controller_config_path, async_run): c = async_run(controller.add_compute(compute_id="test1", connect=False)) assert len(controller.computes) == 1 controller._notification = MagicMock() c._connected = True async_run(controller.delete_compute("test1")) assert len(controller.computes) == 0 controller._notification.emit.assert_called_with("compute.deleted", c.__json__()) with open(controller_config_path) as f: data = json.load(f) assert len(data["computes"]) == 0 assert c.connected is False def test_deleteComputeProjectOpened(controller, controller_config_path, async_run): """ When you delete a compute the project using it are close """ c = async_run(controller.add_compute(compute_id="test1", connect=False)) c.post = AsyncioMagicMock() assert len(controller.computes) == 1 project1 = async_run(controller.add_project(name="Test1")) async_run(project1.open()) # We simulate that the project use this compute project1._project_created_on_compute.add(c) project2 = async_run(controller.add_project(name="Test2")) async_run(project2.open()) controller._notification = MagicMock() c._connected = True async_run(controller.delete_compute("test1")) assert len(controller.computes) == 0 controller._notification.emit.assert_called_with("compute.deleted", c.__json__()) with open(controller_config_path) as f: data = json.load(f) assert len(data["computes"]) == 0 assert c.connected is False # Project 1 use this compute it should be close before deleting the compute assert project1.status == "closed" assert project2.status == "opened" def test_addComputeConfigFile(controller, controller_config_path, async_run): async_run(controller.add_compute(compute_id="test1", name="Test", connect=False)) assert len(controller.computes) == 1 with open(controller_config_path) as f: data = json.load(f) assert data["computes"] == [ { 'compute_id': 'test1', 'name': 'Test', 'host': 'localhost', 'port': 3080, 'protocol': 'http', 'user': None, 'password': None } ] def test_getCompute(controller, async_run): compute = async_run(controller.add_compute(compute_id="test1", connect=False)) assert controller.get_compute("test1") == compute with pytest.raises(aiohttp.web.HTTPNotFound): assert controller.get_compute("dsdssd") def test_has_compute(controller, async_run): compute = async_run(controller.add_compute(compute_id="test1", connect=False)) assert controller.has_compute("test1") assert not controller.has_compute("test2") def test_add_project(controller, async_run): uuid1 = str(uuid.uuid4()) uuid2 = str(uuid.uuid4()) async_run(controller.add_project(project_id=uuid1, name="Test")) assert len(controller.projects) == 1 async_run(controller.add_project(project_id=uuid1, name="Test")) assert len(controller.projects) == 1 async_run(controller.add_project(project_id=uuid2, name="Test 2")) assert len(controller.projects) == 2 def test_addDuplicateProject(controller, async_run): uuid1 = str(uuid.uuid4()) uuid2 = str(uuid.uuid4()) async_run(controller.add_project(project_id=uuid1, name="Test")) assert len(controller.projects) == 1 with pytest.raises(aiohttp.web.HTTPConflict): async_run(controller.add_project(project_id=uuid2, name="Test")) def test_remove_project(controller, async_run): uuid1 = str(uuid.uuid4()) project1 = async_run(controller.add_project(project_id=uuid1, name="Test")) assert len(controller.projects) == 1 controller.remove_project(project1) assert len(controller.projects) == 0 def test_addProject_with_compute(controller, async_run): uuid1 = str(uuid.uuid4()) compute = Compute("test1", controller=MagicMock()) compute.post = MagicMock() controller._computes = {"test1": compute} project1 = async_run(controller.add_project(project_id=uuid1, name="Test")) def test_getProject(controller, async_run): uuid1 = str(uuid.uuid4()) project = async_run(controller.add_project(project_id=uuid1, name="Test")) assert controller.get_project(uuid1) == project with pytest.raises(aiohttp.web.HTTPNotFound): assert controller.get_project("dsdssd") def test_start(controller, async_run): async_run(controller.start()) assert len(controller.computes) == 1 # Local compute is created assert controller.computes["local"].name == socket.gethostname() def test_start_vm(controller, async_run): """ Start the controller with a GNS3 VM """ controller.gns3vm.settings = { "enable": True, "engine": "vmware" } with asyncio_patch("gns3server.controller.gns3vm.vmware_gns3_vm.VMwareGNS3VM.start") as mock: async_run(controller.start()) assert mock.called assert "local" in controller.computes assert "vm" in controller.computes assert len(controller.computes) == 2 # Local compute and vm are created def test_stop(controller, async_run): c = async_run(controller.add_compute(compute_id="test1", connect=False)) c._connected = True async_run(controller.stop()) assert c.connected is False def test_stop_vm(controller, async_run): """ Start the controller with a GNS3 VM running """ controller.gns3vm.settings = { "enable": True, "engine": "vmware", "auto_stop": True } controller.gns3vm._current_engine().running = True with asyncio_patch("gns3server.controller.gns3vm.vmware_gns3_vm.VMwareGNS3VM.stop") as mock: async_run(controller.stop()) assert mock.called def test_load_project(controller, async_run, tmpdir): data = { "name": "Experience", "project_id": "c8d07a5a-134f-4c3f-8599-e35eac85eb17", "revision": 5, "type": "topology", "version": "2.0.0dev1", "topology": { "drawings": [], "computes": [ { "compute_id": "my_remote", "host": "127.0.0.1", "name": "My remote", "port": 3080, "protocol": "http", } ], "links": [ { "link_id": "c44331d2-2da4-490d-9aad-7f5c126ae271", "nodes": [ {"node_id": "c067b922-7f77-4680-ac00-0226c6583598", "adapter_number": 0, "port_number": 0}, {"node_id": "50d66d7b-0dd7-4e9f-b720-6eb621ae6543", "adapter_number": 0, "port_number": 0}, ], } ], "nodes": [ { "compute_id": "my_remote", "name": "PC2", "node_id": "c067b922-7f77-4680-ac00-0226c6583598", "node_type": "vpcs", "properties": { "startup_script": "set pcname PC2\n", "startup_script_path": "startup.vpc" }, }, { "compute_id": "my_remote", "name": "PC1", "node_id": "50d66d7b-0dd7-4e9f-b720-6eb621ae6543", "node_type": "vpcs", "properties": { "startup_script": "set pcname PC1\n", "startup_script_path": "startup.vpc" }, } ] } } with open(str(tmpdir / "test.gns3"), "w+") as f: json.dump(data, f) controller.add_compute = AsyncioMagicMock() controller._computes["my_remote"] = MagicMock() with asyncio_patch("gns3server.controller.node.Node.create") as mock_node_create: project = async_run(controller.load_project(str(tmpdir / "test.gns3"))) assert project._topology_file() == str(tmpdir / "test.gns3") controller.add_compute.assert_called_with(compute_id='my_remote', host='127.0.0.1', name='My remote', port=3080, protocol='http') project = controller.get_project('c8d07a5a-134f-4c3f-8599-e35eac85eb17') assert project.name == "Experience" assert project.path == str(tmpdir) link = project.get_link("c44331d2-2da4-490d-9aad-7f5c126ae271") assert len(link.nodes) == 2 node1 = project.get_node("50d66d7b-0dd7-4e9f-b720-6eb621ae6543") assert node1.name == "PC1" def test_get_free_project_name(controller, async_run): async_run(controller.add_project(project_id=str(uuid.uuid4()), name="Test")) assert controller.get_free_project_name("Test") == "Test-1" async_run(controller.add_project(project_id=str(uuid.uuid4()), name="Test-1")) assert controller.get_free_project_name("Test") == "Test-2" assert controller.get_free_project_name("Hello") == "Hello"