You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
gns3-server/tests/controller/test_project.py

706 lines
24 KiB

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import pytest
import aiohttp
import zipstream
from unittest.mock import MagicMock
from tests.utils import AsyncioMagicMock, asyncio_patch
from unittest.mock import patch
from uuid import uuid4
from gns3server.controller.project import Project
from gns3server.controller.appliance import Appliance
from gns3server.controller.ports.ethernet_port import EthernetPort
from gns3server.config import Config
@pytest.fixture
def project(controller):
return Project(controller=controller, name="Test")
@pytest.fixture
def node(controller, project, async_run):
compute = MagicMock()
compute.id = "local"
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
node = async_run(project.add_node(compute, "test", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
return node
def test_affect_uuid():
p = Project(name="Test")
assert len(p.id) == 36
p = Project(project_id='00010203-0405-0607-0809-0a0b0c0d0e0f', name="Test 2")
assert p.id == '00010203-0405-0607-0809-0a0b0c0d0e0f'
def test_json(tmpdir):
p = Project(name="Test")
assert p.__json__() == {
"name": "Test",
"project_id": p.id,
"path": p.path,
"status": "opened",
"filename": "Test.gns3",
"auto_start": False,
"auto_close": True,
"auto_open": False,
"scene_width": 2000,
"scene_height": 1000,
"zoom": 100,
"show_grid": False,
"show_interface_labels": False,
"show_layers": False,
"snap_to_grid": False,
"grid_size": 0,
"supplier": None,
"variables": None
}
def test_update(controller, async_run):
project = Project(controller=controller, name="Hello")
controller._notification = MagicMock()
assert project.name == "Hello"
async_run(project.update(name="World"))
assert project.name == "World"
controller.notification.emit.assert_any_call("project.updated", project.__json__())
def test_update_on_compute(controller, async_run):
variables = [{"name": "TEST", "value": "VAL1"}]
compute = MagicMock()
compute.id = "local"
project = Project(controller=controller, name="Test")
project._project_created_on_compute = [compute]
controller._notification = MagicMock()
async_run(project.update(variables=variables))
compute.put.assert_any_call('/projects/{}'.format(project.id), {
"variables": variables
})
def test_path(tmpdir):
directory = Config.instance().get_section_config("Server").get("projects_path")
with patch("gns3server.utils.path.get_default_project_directory", return_value=directory):
p = Project(project_id=str(uuid4()), name="Test")
assert p.path == os.path.join(directory, p.id)
assert os.path.exists(os.path.join(directory, p.id))
def test_path_exist(tmpdir):
"""
Should raise an error when you try to owerwrite
an existing project
"""
os.makedirs(str(tmpdir / "demo"))
with pytest.raises(aiohttp.web.HTTPForbidden):
p = Project(name="Test", path=str(tmpdir / "demo"))
def test_init_path(tmpdir):
p = Project(path=str(tmpdir), project_id=str(uuid4()), name="Test")
assert p.path == str(tmpdir)
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
def test_changing_path_with_quote_not_allowed(tmpdir):
with pytest.raises(aiohttp.web.HTTPForbidden):
p = Project(project_id=str(uuid4()), name="Test")
p.path = str(tmpdir / "project\"53")
def test_captures_directory(tmpdir):
p = Project(path=str(tmpdir / "capturestest"), name="Test")
assert p.captures_directory == str(tmpdir / "capturestest" / "project-files" / "captures")
assert os.path.exists(p.captures_directory)
def test_add_node_local(async_run, controller):
"""
For a local server we send the project path
"""
compute = MagicMock()
compute.id = "local"
project = Project(controller=controller, name="Test")
controller._notification = MagicMock()
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
node = async_run(project.add_node(compute, "test", None, node_type="vpcs", properties={"startup_script": "test.cfg"}))
assert node.id in project._nodes
compute.post.assert_any_call('/projects', data={
"name": project._name,
"project_id": project._id,
"path": project._path,
"variables": None
})
compute.post.assert_any_call('/projects/{}/vpcs/nodes'.format(project.id),
data={'node_id': node.id,
'startup_script': 'test.cfg',
'name': 'test'},
timeout=1200)
assert compute in project._project_created_on_compute
controller.notification.emit.assert_any_call("node.created", node.__json__())
def test_add_node_non_local(async_run, controller):
"""
For a non local server we do not send the project path
"""
compute = MagicMock()
compute.id = "remote"
project = Project(controller=controller, name="Test")
controller._notification = MagicMock()
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
node = async_run(project.add_node(compute, "test", None, node_type="vpcs", properties={"startup_script": "test.cfg"}))
compute.post.assert_any_call('/projects', data={
"name": project._name,
"project_id": project._id,
"variables": None
})
compute.post.assert_any_call('/projects/{}/vpcs/nodes'.format(project.id),
data={'node_id': node.id,
'startup_script': 'test.cfg',
'name': 'test'},
timeout=1200)
assert compute in project._project_created_on_compute
controller.notification.emit.assert_any_call("node.created", node.__json__())
def test_add_node_from_appliance(async_run, controller):
"""
For a local server we send the project path
"""
compute = MagicMock()
compute.id = "local"
project = Project(controller=controller, name="Test")
controller._notification = MagicMock()
controller._appliances["fakeid"] = Appliance("fakeid", {
"server": "local",
"name": "Test",
"default_name_format": "{name}-{0}",
"node_type": "vpcs",
"properties": {
"a": 1
}
})
controller._computes["local"] = compute
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
node = async_run(project.add_node_from_appliance("fakeid", x=23, y=12))
compute.post.assert_any_call('/projects', data={
"name": project._name,
"project_id": project._id,
"path": project._path,
"variables": None
})
compute.post.assert_any_call('/projects/{}/vpcs/nodes'.format(project.id),
data={'node_id': node.id,
'name': 'Test-1',
'a': 1,
},
timeout=1200)
assert compute in project._project_created_on_compute
controller.notification.emit.assert_any_call("node.created", node.__json__())
# Make sure we can call twice the node creation
node = async_run(project.add_node_from_appliance("fakeid", x=13, y=12))
compute.post.assert_any_call('/projects/{}/vpcs/nodes'.format(project.id),
data={'node_id': node.id,
'name': 'Test-2',
'a': 1
},
timeout=1200)
def test_delete_node(async_run, controller):
"""
For a local server we send the project path
"""
compute = MagicMock()
project = Project(controller=controller, name="Test")
controller._notification = MagicMock()
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
node = async_run(project.add_node(compute, "test", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
assert node.id in project._nodes
async_run(project.delete_node(node.id))
assert node.id not in project._nodes
compute.delete.assert_any_call('/projects/{}/vpcs/nodes/{}'.format(project.id, node.id))
controller.notification.emit.assert_any_call("node.deleted", node.__json__())
def test_delete_node_delete_link(async_run, controller):
"""
Delete a node delete all the node connected
"""
compute = MagicMock()
project = Project(controller=controller, name="Test")
controller._notification = MagicMock()
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
node = async_run(project.add_node(compute, "test", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
link = async_run(project.add_link())
async_run(link.add_node(node, 0, 0))
async_run(project.delete_node(node.id))
assert node.id not in project._nodes
assert link.id not in project._links
compute.delete.assert_any_call('/projects/{}/vpcs/nodes/{}'.format(project.id, node.id))
controller.notification.emit.assert_any_call("node.deleted", node.__json__())
controller.notification.emit.assert_any_call("link.deleted", link.__json__())
def test_get_node(async_run, controller):
compute = MagicMock()
project = Project(controller=controller, name="Test")
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
vm = async_run(project.add_node(compute, "test", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
assert project.get_node(vm.id) == vm
with pytest.raises(aiohttp.web_exceptions.HTTPNotFound):
project.get_node("test")
# Raise an error if the project is not opened
async_run(project.close())
with pytest.raises(aiohttp.web.HTTPForbidden):
project.get_node(vm.id)
def test_list_nodes(async_run, controller):
compute = MagicMock()
project = Project(controller=controller, name="Test")
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
vm = async_run(project.add_node(compute, "test", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
assert len(project.nodes) == 1
assert isinstance(project.nodes, dict)
async_run(project.close())
assert len(project.nodes) == 1
assert isinstance(project.nodes, dict)
def test_add_link(async_run, project, controller):
compute = MagicMock()
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
vm1 = async_run(project.add_node(compute, "test1", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
vm1._ports = [EthernetPort("E0", 0, 3, 1)]
vm2 = async_run(project.add_node(compute, "test2", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
vm2._ports = [EthernetPort("E0", 0, 4, 2)]
controller._notification = MagicMock()
link = async_run(project.add_link())
async_run(link.add_node(vm1, 3, 1))
with asyncio_patch("gns3server.controller.udp_link.UDPLink.create") as mock_udp_create:
async_run(link.add_node(vm2, 4, 2))
assert mock_udp_create.called
assert len(link._nodes) == 2
controller.notification.emit.assert_any_call("link.created", link.__json__())
def test_list_links(async_run, project):
compute = MagicMock()
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
link = async_run(project.add_link())
assert len(project.links) == 1
async_run(project.close())
assert len(project.links) == 1
def test_get_link(async_run, project):
compute = MagicMock()
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
link = async_run(project.add_link())
assert project.get_link(link.id) == link
with pytest.raises(aiohttp.web_exceptions.HTTPNotFound):
project.get_link("test")
def test_delete_link(async_run, project, controller):
compute = MagicMock()
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
assert len(project._links) == 0
link = async_run(project.add_link())
assert len(project._links) == 1
controller._notification = MagicMock()
async_run(project.delete_link(link.id))
controller.notification.emit.assert_any_call("link.deleted", link.__json__())
assert len(project._links) == 0
def test_add_drawing(async_run, project, controller):
controller.notification.emit = MagicMock()
drawing = async_run(project.add_drawing(None, svg="<svg></svg>"))
assert len(project._drawings) == 1
controller.notification.emit.assert_any_call("drawing.created", drawing.__json__())
def test_get_drawing(async_run, project):
drawing = async_run(project.add_drawing(None))
assert project.get_drawing(drawing.id) == drawing
with pytest.raises(aiohttp.web_exceptions.HTTPNotFound):
project.get_drawing("test")
def test_list_drawing(async_run, project):
drawing = async_run(project.add_drawing(None))
assert len(project.drawings) == 1
async_run(project.close())
assert len(project.drawings) == 1
def test_delete_drawing(async_run, project, controller):
assert len(project._drawings) == 0
drawing = async_run(project.add_drawing())
assert len(project._drawings) == 1
controller._notification = MagicMock()
async_run(project.delete_drawing(drawing.id))
controller.notification.emit.assert_any_call("drawing.deleted", drawing.__json__())
assert len(project._drawings) == 0
def test_clean_pictures(async_run, project, controller):
"""
When a project is close old pictures should be removed
"""
drawing = async_run(project.add_drawing())
drawing._svg = "test.png"
open(os.path.join(project.pictures_directory, "test.png"), "w+").close()
open(os.path.join(project.pictures_directory, "test2.png"), "w+").close()
async_run(project.close())
assert os.path.exists(os.path.join(project.pictures_directory, "test.png"))
assert not os.path.exists(os.path.join(project.pictures_directory, "test2.png"))
def test_clean_pictures_and_keep_supplier_logo(async_run, project, controller):
"""
When a project is close old pictures should be removed
"""
project.supplier = {
'logo': 'logo.png'
}
drawing = async_run(project.add_drawing())
drawing._svg = "test.png"
open(os.path.join(project.pictures_directory, "test.png"), "w+").close()
open(os.path.join(project.pictures_directory, "test2.png"), "w+").close()
open(os.path.join(project.pictures_directory, "logo.png"), "w+").close()
async_run(project.close())
assert os.path.exists(os.path.join(project.pictures_directory, "test.png"))
assert not os.path.exists(os.path.join(project.pictures_directory, "test2.png"))
assert os.path.exists(os.path.join(project.pictures_directory, "logo.png"))
def test_delete(async_run, project, controller):
assert os.path.exists(project.path)
async_run(project.delete())
assert not os.path.exists(project.path)
def test_dump():
directory = Config.instance().get_section_config("Server").get("projects_path")
with patch("gns3server.utils.path.get_default_project_directory", return_value=directory):
p = Project(project_id='00010203-0405-0607-0809-0a0b0c0d0e0f', name="Test")
p.dump()
with open(os.path.join(directory, p.id, "Test.gns3")) as f:
content = f.read()
assert "00010203-0405-0607-0809-0a0b0c0d0e0f" in content
def test_open_close(async_run, controller):
project = Project(controller=controller, name="Test")
assert project.status == "opened"
async_run(project.close())
project.start_all = AsyncioMagicMock()
async_run(project.open())
assert not project.start_all.called
assert project.status == "opened"
controller._notification = MagicMock()
async_run(project.close())
assert project.status == "closed"
controller.notification.emit.assert_any_call("project.closed", project.__json__())
def test_open_auto_start(async_run, controller):
project = Project(controller=controller, name="Test", auto_start=True)
assert project.status == "opened"
async_run(project.close())
project.start_all = AsyncioMagicMock()
async_run(project.open())
assert project.start_all.called
def test_is_running(project, async_run, node):
"""
If a node is started or paused return True
"""
assert project.is_running() is False
node._status = "started"
assert project.is_running() is True
def test_duplicate(project, async_run, controller):
"""
Duplicate a project, the node should remain on the remote server
if they were on remote server
"""
compute = MagicMock()
compute.id = "remote"
compute.list_files = AsyncioMagicMock(return_value=[])
controller._computes["remote"] = compute
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
remote_vpcs = async_run(project.add_node(compute, "test", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
# We allow node not allowed for standard import / export
remote_virtualbox = async_run(project.add_node(compute, "test", None, node_type="vmware", properties={"startup_config": "test.cfg"}))
new_project = async_run(project.duplicate(name="Hello"))
assert new_project.id != project.id
assert new_project.name == "Hello"
async_run(new_project.open())
assert list(new_project.nodes.values())[0].compute.id == "remote"
assert list(new_project.nodes.values())[1].compute.id == "remote"
def test_duplicate_with_zipfile_encoding_issues(project, async_run, controller):
zf = zipstream.ZipFile()
zf.writestr('test\udcc3', "data")
with asyncio_patch('gns3server.controller.project.export_project', return_value=zf):
with pytest.raises(aiohttp.web.HTTPConflict):
async_run(project.duplicate(name="Hello"))
def test_snapshots(project):
"""
List the snapshots
"""
os.makedirs(os.path.join(project.path, "snapshots"))
open(os.path.join(project.path, "snapshots", "test1_260716_103713.gns3project"), "w+").close()
project.reset()
assert len(project.snapshots) == 1
assert list(project.snapshots.values())[0].name == "test1"
def test_get_snapshot(project):
os.makedirs(os.path.join(project.path, "snapshots"))
open(os.path.join(project.path, "snapshots", "test1.gns3project"), "w+").close()
project.reset()
snapshot = list(project.snapshots.values())[0]
assert project.get_snapshot(snapshot.id) == snapshot
with pytest.raises(aiohttp.web_exceptions.HTTPNotFound):
project.get_snapshot("BLU")
def test_delete_snapshot(project, async_run):
os.makedirs(os.path.join(project.path, "snapshots"))
open(os.path.join(project.path, "snapshots", "test1_260716_103713.gns3project"), "w+").close()
project.reset()
snapshot = list(project.snapshots.values())[0]
assert project.get_snapshot(snapshot.id) == snapshot
async_run(project.delete_snapshot(snapshot.id))
with pytest.raises(aiohttp.web_exceptions.HTTPNotFound):
project.get_snapshot(snapshot.id)
assert not os.path.exists(os.path.join(project.path, "snapshots", "test1.gns3project"))
def test_snapshot(project, async_run):
"""
Create a snapshot
"""
assert len(project.snapshots) == 0
snapshot = async_run(project.snapshot("test1"))
assert snapshot.name == "test1"
assert len(project.snapshots) == 1
assert list(project.snapshots.values())[0].name == "test1"
# Raise a conflict if name is already use
with pytest.raises(aiohttp.web_exceptions.HTTPConflict):
snapshot = async_run(project.snapshot("test1"))
def test_start_all(project, async_run):
compute = MagicMock()
compute.id = "local"
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
for node_i in range(0, 10):
async_run(project.add_node(compute, "test", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
compute.post = AsyncioMagicMock()
async_run(project.start_all())
assert len(compute.post.call_args_list) == 10
def test_stop_all(project, async_run):
compute = MagicMock()
compute.id = "local"
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
for node_i in range(0, 10):
async_run(project.add_node(compute, "test", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
compute.post = AsyncioMagicMock()
async_run(project.stop_all())
assert len(compute.post.call_args_list) == 10
def test_suspend_all(project, async_run):
compute = MagicMock()
compute.id = "local"
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
for node_i in range(0, 10):
async_run(project.add_node(compute, "test", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
compute.post = AsyncioMagicMock()
async_run(project.suspend_all())
assert len(compute.post.call_args_list) == 10
def test_node_name(project, async_run):
compute = MagicMock()
compute.id = "local"
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
node = async_run(project.add_node(compute, "test-{0}", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
assert node.name == "test-1"
node = async_run(project.add_node(compute, "test-{0}", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
assert node.name == "test-2"
node = async_run(project.add_node(compute, "hello world-{0}", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
assert node.name == "helloworld-1"
node = async_run(project.add_node(compute, "hello world-{0}", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
assert node.name == "helloworld-2"
node = async_run(project.add_node(compute, "VPCS-1", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
assert node.name == "VPCS-1"
node = async_run(project.add_node(compute, "VPCS-1", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
assert node.name == "VPCS-2"
node = async_run(project.add_node(compute, "R3", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
assert node.name == "R3"
def test_duplicate_node(project, async_run):
compute = MagicMock()
compute.id = "local"
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
original = async_run(project.add_node(
compute,
"test",
None,
node_type="vpcs",
properties={
"startup_config": "test.cfg"
}))
new_node = async_run(project.duplicate_node(original, 42, 10, 11))
assert new_node.x == 42