#!/usr/bin/env python # # Copyright (C) 2020 GNS3 Technologies Inc. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import pytest import os import uuid import json import zipfile from pathlib import Path from tests.utils import asyncio_patch, AsyncioMagicMock from unittest.mock import patch, MagicMock from gns3server.utils.asyncio import aiozipstream from gns3server.controller.project import Project from gns3server.controller.export_project import export_project from gns3server.controller.import_project import import_project, _move_files_to_compute from gns3server.version import __version__ @pytest.mark.asyncio async def test_import_project(tmpdir, controller): project_id = str(uuid.uuid4()) topology = { "project_id": str(uuid.uuid4()), "name": "test", "auto_open": True, "auto_start": True, "topology": { }, "version": "2.0.0" } with open(str(tmpdir / "project.gns3"), 'w+') as f: json.dump(topology, f) with open(str(tmpdir / "b.png"), 'w+') as f: f.write("B") zip_path = str(tmpdir / "project.zip") with zipfile.ZipFile(zip_path, 'w') as myzip: myzip.write(str(tmpdir / "project.gns3"), "project.gns3") myzip.write(str(tmpdir / "b.png"), "b.png") myzip.write(str(tmpdir / "b.png"), "project-files/dynamips/test") myzip.write(str(tmpdir / "b.png"), "project-files/qemu/test") with open(zip_path, "rb") as f: project = await import_project(controller, project_id, f) assert project.name == "test" assert project.id == project_id assert os.path.exists(os.path.join(project.path, "b.png")) assert not os.path.exists(os.path.join(project.path, "project.gns3")) assert os.path.exists(os.path.join(project.path, "test.gns3")) assert os.path.exists(os.path.join(project.path, "project-files/dynamips/test")) assert os.path.exists(os.path.join(project.path, "project-files/qemu/test")) # A new project name is generated when you import twice the same name with open(zip_path, "rb") as f: project = await import_project(controller, str(uuid.uuid4()), f) assert project.auto_open is False assert project.auto_start is False assert project.name != "test" @pytest.mark.asyncio async def test_import_project_override(projects_dir, controller): """ In the case of snapshot we will import a project for override the previous keeping the same project id & location """ tmpdir = Path(projects_dir) project_id = str(uuid.uuid4()) topology = { "project_id": project_id, "name": "test", "topology": { }, "version": "2.0.0" } with open(str(tmpdir / "project.gns3"), 'w+') as f: json.dump(topology, f) zip_path = str(tmpdir / "project.zip") with zipfile.ZipFile(zip_path, 'w') as myzip: myzip.write(str(tmpdir / "project.gns3"), "project.gns3") with open(zip_path, "rb") as f: project = await import_project(controller, project_id, f, location=str(tmpdir)) assert project.name == "test" assert project.id == project_id # Overide the project with same project with open(zip_path, "rb") as f: project = await import_project(controller, project_id, f, location=str(tmpdir)) assert project.id == project_id assert project.name == "test" async def write_file(path, z): with open(path, 'wb') as f: async for chunk in z: f.write(chunk) @pytest.mark.asyncio async def test_import_project_containing_symlink(tmpdir, controller): project = Project(controller=controller, name="test") project.dump = MagicMock() path = project.path project_id = str(uuid.uuid4()) topology = { "project_id": str(uuid.uuid4()), "name": "test", "auto_open": True, "auto_start": True, "topology": { }, "version": "2.0.0" } with open(os.path.join(path, "project.gns3"), 'w+') as f: json.dump(topology, f) os.makedirs(os.path.join(path, "vm1", "dynamips")) symlink_path = os.path.join(project.path, "vm1", "dynamips", "symlink") symlink_target = "/tmp/anywhere" os.symlink(symlink_target, symlink_path) zip_path = str(tmpdir / "project.zip") with aiozipstream.ZipFile() as z: with patch("gns3server.compute.Dynamips.get_images_directory", return_value=str(tmpdir / "IOS"),): await export_project(z, project, str(tmpdir), include_images=False) await write_file(zip_path, z) with open(zip_path, "rb") as f: project = await import_project(controller, project_id, f) assert project.name == "test" assert project.id == project_id symlink_path = os.path.join(project.path, "vm1", "dynamips", "symlink") assert os.path.islink(symlink_path) assert os.readlink(symlink_path) == symlink_target @pytest.mark.asyncio async def test_import_upgrade(tmpdir, controller): """ Topology made for previous GNS3 version are upgraded during the process """ project_id = str(uuid.uuid4()) topology = { "project_id": str(uuid.uuid4()), "name": "test", "topology": { }, "version": "1.4.2" } with open(str(tmpdir / "project.gns3"), 'w+') as f: json.dump(topology, f) zip_path = str(tmpdir / "project.zip") with zipfile.ZipFile(zip_path, 'w') as myzip: myzip.write(str(tmpdir / "project.gns3"), "project.gns3") with open(zip_path, "rb") as f: project = await import_project(controller, project_id, f) with open(os.path.join(project.path, "test.gns3")) as f: topo = json.load(f) assert topo["version"] == __version__ @pytest.mark.asyncio async def test_import_with_images(config, tmpdir, controller): project_id = str(uuid.uuid4()) topology = { "project_id": str(uuid.uuid4()), "name": "test", "topology": { }, "version": "2.0.0" } with open(str(tmpdir / "project.gns3"), 'w+') as f: json.dump(topology, f) with open(str(tmpdir / "test.image"), 'w+') as f: f.write("B") zip_path = str(tmpdir / "project.zip") with zipfile.ZipFile(zip_path, 'w') as myzip: myzip.write(str(tmpdir / "project.gns3"), "project.gns3") myzip.write(str(tmpdir / "test.image"), "images/IOS/test.image") with open(zip_path, "rb") as f: project = await import_project(controller, project_id, f) assert not os.path.exists(os.path.join(project.path, "images/IOS/test.image")) path = os.path.join(config.settings.Server.images_path, "IOS", "test.image") assert os.path.exists(path), path @pytest.mark.asyncio async def test_import_iou_linux_no_vm(linux_platform, tmpdir, controller): """ On non linux host IOU should be local if we don't have a GNS3 VM """ project_id = str(uuid.uuid4()) controller._computes["local"] = AsyncioMagicMock() topology = { "project_id": str(uuid.uuid4()), "name": "test", "type": "topology", "topology": { "nodes": [ { "compute_id": "local", "node_type": "iou", "name": "test", "properties": {} } ], "links": [], "computes": [], "drawings": [] }, "revision": 5, "version": "2.0.0" } with open(str(tmpdir / "project.gns3"), 'w+') as f: json.dump(topology, f) zip_path = str(tmpdir / "project.zip") with zipfile.ZipFile(zip_path, 'w') as myzip: myzip.write(str(tmpdir / "project.gns3"), "project.gns3") with open(zip_path, "rb") as f: project = await import_project(controller, project_id, f) with open(os.path.join(project.path, "test.gns3")) as f: topo = json.load(f) assert topo["topology"]["nodes"][0]["compute_id"] == "local" @pytest.mark.asyncio async def test_import_iou_linux_with_vm(linux_platform, tmpdir, controller): """ On non linux host IOU should be vm if we have a GNS3 VM configured """ project_id = str(uuid.uuid4()) controller._computes["vm"] = AsyncioMagicMock() topology = { "project_id": str(uuid.uuid4()), "name": "test", "type": "topology", "topology": { "nodes": [ { "compute_id": "local", "node_id": "0fd3dd4d-dc93-4a04-a9b9-7396a9e22e8b", "node_type": "iou", "name": "test", "properties": {} } ], "links": [], "computes": [], "drawings": [] }, "revision": 5, "version": "2.0.0" } with open(str(tmpdir / "project.gns3"), 'w+') as f: json.dump(topology, f) zip_path = str(tmpdir / "project.zip") with zipfile.ZipFile(zip_path, 'w') as myzip: myzip.write(str(tmpdir / "project.gns3"), "project.gns3") with open(zip_path, "rb") as f: project = await import_project(controller, project_id, f) with open(os.path.join(project.path, "test.gns3")) as f: topo = json.load(f) assert topo["topology"]["nodes"][0]["compute_id"] == "vm" @pytest.mark.asyncio async def test_import_nat_non_linux(windows_platform, tmpdir, controller): """ On non linux host NAT should be moved to the GNS3 VM """ project_id = str(uuid.uuid4()) controller._computes["vm"] = AsyncioMagicMock() topology = { "project_id": str(uuid.uuid4()), "name": "test", "type": "topology", "topology": { "nodes": [ { "compute_id": "local", "node_id": "0fd3dd4d-dc93-4a04-a9b9-7396a9e22e8b", "node_type": "nat", "name": "test", "properties": {} } ], "links": [], "computes": [], "drawings": [] }, "revision": 5, "version": "2.0.0" } with open(str(tmpdir / "project.gns3"), 'w+') as f: json.dump(topology, f) zip_path = str(tmpdir / "project.zip") with zipfile.ZipFile(zip_path, 'w') as myzip: myzip.write(str(tmpdir / "project.gns3"), "project.gns3") with open(zip_path, "rb") as f: project = await import_project(controller, project_id, f) with open(os.path.join(project.path, "test.gns3")) as f: topo = json.load(f) assert topo["topology"]["nodes"][0]["compute_id"] == "vm" @pytest.mark.asyncio async def test_import_iou_non_linux(windows_platform, tmpdir, controller): """ On non linux host IOU should be moved to the GNS3 VM """ project_id = str(uuid.uuid4()) controller._computes["vm"] = AsyncioMagicMock() topology = { "project_id": str(uuid.uuid4()), "name": "test", "type": "topology", "topology": { "nodes": [ { "compute_id": "local", "node_id": "0fd3dd4d-dc93-4a04-a9b9-7396a9e22e8b", "node_type": "iou", "name": "test", "properties": {} }, { "compute_id": "local", "node_type": "vpcs", "name": "test2", "properties": {} } ], "links": [], "computes": [], "drawings": [] }, "revision": 5, "version": "2.0.0" } with open(str(tmpdir / "project.gns3"), 'w+') as f: json.dump(topology, f) zip_path = str(tmpdir / "project.zip") with zipfile.ZipFile(zip_path, 'w') as myzip: myzip.write(str(tmpdir / "project.gns3"), "project.gns3") with open(zip_path, "rb") as f: with asyncio_patch("gns3server.controller.import_project._move_files_to_compute") as mock: project = await import_project(controller, project_id, f) controller._computes["vm"].post.assert_called_with('/projects', data={'name': 'test', 'project_id': project_id}) with open(os.path.join(project.path, "test.gns3")) as f: topo = json.load(f) assert topo["topology"]["nodes"][0]["compute_id"] == "vm" assert topo["topology"]["nodes"][1]["compute_id"] == "local" mock.assert_called_with(controller._computes["vm"], project_id, project.path, os.path.join('project-files', 'iou', topo["topology"]["nodes"][0]['node_id'])) @pytest.mark.asyncio async def test_import_node_id(linux_platform, tmpdir, controller): """ When importing a node, node_id should change """ project_id = str(uuid.uuid4()) controller._computes["local"] = AsyncioMagicMock() topology = { "project_id": str(uuid.uuid4()), "name": "test", "type": "topology", "topology": { "nodes": [ { "compute_id": "local", "node_id": "0fd3dd4d-dc93-4a04-a9b9-7396a9e22e8b", "node_type": "iou", "name": "test", "properties": {} }, { "compute_id": "local", "node_id": "c3ae286c-c81f-40d9-a2d0-5874b2f2478d", "node_type": "iou", "name": "test2", "properties": {} } ], "links": [ { "link_id": "b570a150-c09f-47d9-8d32-9ca5b03234d6", "nodes": [ { "adapter_number": 0, "node_id": "0fd3dd4d-dc93-4a04-a9b9-7396a9e22e8b", "port_number": 0 }, { "adapter_number": 0, "node_id": "c3ae286c-c81f-40d9-a2d0-5874b2f2478d", "port_number": 0 } ] } ], "computes": [], "drawings": [ { "drawing_id": "08d665ba-e982-4d54-82b4-aa0c4d5ba6a3", "rotation": 0, "x": -210, "y": -108, "z": 0 } ] }, "revision": 5, "version": "2.0.0" } with open(str(tmpdir / "project.gns3"), 'w+') as f: json.dump(topology, f) # Fake .gns3project zip_path = str(tmpdir / "project.zip") with zipfile.ZipFile(zip_path, 'w') as myzip: myzip.write(str(tmpdir / "project.gns3"), "project.gns3") myzip.writestr("project-files/iou/0fd3dd4d-dc93-4a04-a9b9-7396a9e22e8b/startup.cfg", "test") myzip.writestr("project-files/iou/c3ae286c-c81f-40d9-a2d0-5874b2f2478d/startup.cfg", "test") with open(zip_path, "rb") as f: project = await import_project(controller, project_id, f) with open(os.path.join(project.path, "test.gns3")) as f: topo = json.load(f) # Node id should have change assert topo["topology"]["nodes"][0]["node_id"] not in ["0fd3dd4d-dc93-4a04-a9b9-7396a9e22e8b", "c3ae286c-c81f-40d9-a2d0-5874b2f2478d"] # Link should have change link = topo["topology"]["links"][0] assert link["link_id"] != "b570a150-c09f-47d9-8d32-9ca5b03234d6" assert link["nodes"][0]["node_id"] not in ["0fd3dd4d-dc93-4a04-a9b9-7396a9e22e8b", "c3ae286c-c81f-40d9-a2d0-5874b2f2478d"] # Drawing id should change assert topo["topology"]["drawings"][0]["drawing_id"] != "08d665ba-e982-4d54-82b4-aa0c4d5ba6a3" # Node files should have moved to the new node id assert not os.path.exists(os.path.join(project.path, "project-files", "iou", "0fd3dd4d-dc93-4a04-a9b9-7396a9e22e8b", "startup.cfg")) assert not os.path.exists(os.path.join(project.path, "project-files", "iou", "c3ae286c-c81f-40d9-a2d0-5874b2f2478d", "startup.cfg")) assert os.path.exists(os.path.join(project.path, "project-files", "iou", topo["topology"]["nodes"][0]["node_id"], "startup.cfg")) @pytest.mark.asyncio async def test_import_keep_compute_ids(windows_platform, tmpdir, controller): """ On linux host IOU should be moved to the GNS3 VM """ project_id = str(uuid.uuid4()) controller._computes["vm"] = AsyncioMagicMock() topology = { "project_id": str(uuid.uuid4()), "name": "test", "type": "topology", "topology": { "nodes": [ { "compute_id": "local", "node_id": "0fd3dd4d-dc93-4a04-a9b9-7396a9e22e8b", "node_type": "iou", "name": "test", "properties": {} } ], "links": [], "computes": [], "drawings": [] }, "revision": 5, "version": "2.0.0" } with open(str(tmpdir / "project.gns3"), 'w+') as f: json.dump(topology, f) zip_path = str(tmpdir / "project.zip") with zipfile.ZipFile(zip_path, 'w') as myzip: myzip.write(str(tmpdir / "project.gns3"), "project.gns3") with open(zip_path, "rb") as f: project = await import_project(controller, project_id, f, keep_compute_ids=True) with open(os.path.join(project.path, "test.gns3")) as f: topo = json.load(f) assert topo["topology"]["nodes"][0]["compute_id"] == "local" @pytest.mark.asyncio async def test_move_files_to_compute(tmpdir): project_id = str(uuid.uuid4()) os.makedirs(str(tmpdir / "project-files" / "docker")) (tmpdir / "project-files" / "docker" / "test").open("w").close() (tmpdir / "project-files" / "docker" / "test2").open("w").close() with asyncio_patch("gns3server.controller.import_project._upload_file") as mock: await _move_files_to_compute(None, project_id, str(tmpdir), os.path.join("project-files", "docker")) mock.assert_any_call(None, project_id, str(tmpdir / "project-files" / "docker" / "test"), os.path.join("project-files", "docker", "test")) mock.assert_any_call(None, project_id, str(tmpdir / "project-files" / "docker" / "test2"), os.path.join("project-files", "docker", "test2")) assert not os.path.exists(str(tmpdir / "project-files" / "docker")) @pytest.mark.asyncio async def test_import_project_name_and_location(projects_dir, controller): """ Import a project with a different location and name """ tmpdir = Path(projects_dir) project_id = str(uuid.uuid4()) topology = { "project_id": str(uuid.uuid4()), "name": "test", "topology": { }, "version": "2.0.0" } with open(str(tmpdir / "project.gns3"), 'w+') as f: json.dump(topology, f) zip_path = str(tmpdir / "project.zip") with zipfile.ZipFile(zip_path, 'w') as myzip: myzip.write(str(tmpdir / "project.gns3"), "project.gns3") with open(zip_path, "rb") as f: project = await import_project(controller, project_id, f, name="hello", location=str(tmpdir / "hello")) assert project.name == "hello" assert os.path.exists(str(tmpdir / "hello" / "hello.gns3")) # A new project name is generated when you import twice the same name with open(zip_path, "rb") as f: project = await import_project(controller, str(uuid.uuid4()), f, name="hello", location=str(tmpdir / "test")) assert project.name == "hello-1"