diff --git a/README.rst b/README.rst index e3ebfde5..8d6fd28b 100644 --- a/README.rst +++ b/README.rst @@ -216,3 +216,8 @@ If you want test coverage: .. code:: bash py.test --cov-report term-missing --cov=gns3server + +Security issues +---------------- +Please contact us using contact informations available here: +http://docs.gns3.com/1ON9JBXSeR7Nt2-Qum2o3ZX0GU86BZwlmNSUgvmqNWGY/index.html diff --git a/gns3server/handlers/api/vpcs_handler.py b/gns3server/handlers/api/vpcs_handler.py new file mode 100644 index 00000000..6af82ad7 --- /dev/null +++ b/gns3server/handlers/api/vpcs_handler.py @@ -0,0 +1,230 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2015 GNS3 Technologies Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from aiohttp.web import HTTPConflict +from ...web.route import Route +from ...schemas.nio import NIO_SCHEMA +from ...schemas.vpcs import VPCS_CREATE_SCHEMA +from ...schemas.vpcs import VPCS_UPDATE_SCHEMA +from ...schemas.vpcs import VPCS_OBJECT_SCHEMA +from ...modules.vpcs import VPCS + + +class VPCSHandler: + + """ + API entry points for VPCS. + """ + + @classmethod + @Route.post( + r"/projects/{project_id}/vpcs/vms", + parameters={ + "project_id": "UUID for the project" + }, + status_codes={ + 201: "Instance created", + 400: "Invalid request", + 409: "Conflict" + }, + description="Create a new VPCS instance", + input=VPCS_CREATE_SCHEMA, + output=VPCS_OBJECT_SCHEMA) + def create(request, response): + + vpcs = VPCS.instance() + vm = yield from vpcs.create_vm(request.json["name"], + request.match_info["project_id"], + request.json.get("vm_id"), + console=request.json.get("console"), + startup_script=request.json.get("startup_script")) + response.set_status(201) + response.json(vm) + + @classmethod + @Route.get( + r"/projects/{project_id}/vpcs/vms/{vm_id}", + parameters={ + "project_id": "UUID for the project", + "vm_id": "UUID for the instance" + }, + status_codes={ + 200: "Success", + 400: "Invalid request", + 404: "Instance doesn't exist" + }, + description="Get a VPCS instance", + output=VPCS_OBJECT_SCHEMA) + def show(request, response): + + vpcs_manager = VPCS.instance() + vm = vpcs_manager.get_vm(request.match_info["vm_id"], project_id=request.match_info["project_id"]) + response.json(vm) + + @classmethod + @Route.put( + r"/projects/{project_id}/vpcs/vms/{vm_id}", + parameters={ + "project_id": "UUID for the project", + "vm_id": "UUID for the instance" + }, + status_codes={ + 200: "Instance updated", + 400: "Invalid request", + 404: "Instance doesn't exist", + 409: "Conflict" + }, + description="Update a VPCS instance", + input=VPCS_UPDATE_SCHEMA, + output=VPCS_OBJECT_SCHEMA) + def update(request, response): + + vpcs_manager = VPCS.instance() + vm = vpcs_manager.get_vm(request.match_info["vm_id"], project_id=request.match_info["project_id"]) + vm.name = request.json.get("name", vm.name) + vm.console = request.json.get("console", vm.console) + vm.startup_script = request.json.get("startup_script", vm.startup_script) + response.json(vm) + + @classmethod + @Route.delete( + r"/projects/{project_id}/vpcs/vms/{vm_id}", + parameters={ + "project_id": "UUID for the project", + "vm_id": "UUID for the instance" + }, + status_codes={ + 204: "Instance deleted", + 400: "Invalid request", + 404: "Instance doesn't exist" + }, + description="Delete a VPCS instance") + def delete(request, response): + + yield from VPCS.instance().delete_vm(request.match_info["vm_id"]) + response.set_status(204) + + @classmethod + @Route.post( + r"/projects/{project_id}/vpcs/vms/{vm_id}/start", + parameters={ + "project_id": "UUID for the project", + "vm_id": "UUID for the instance" + }, + status_codes={ + 204: "Instance started", + 400: "Invalid request", + 404: "Instance doesn't exist" + }, + description="Start a VPCS instance", + output=VPCS_OBJECT_SCHEMA) + def start(request, response): + + vpcs_manager = VPCS.instance() + vm = vpcs_manager.get_vm(request.match_info["vm_id"], project_id=request.match_info["project_id"]) + yield from vm.start() + response.json(vm) + + @classmethod + @Route.post( + r"/projects/{project_id}/vpcs/vms/{vm_id}/stop", + parameters={ + "project_id": "UUID for the project", + "vm_id": "UUID for the instance" + }, + status_codes={ + 204: "Instance stopped", + 400: "Invalid request", + 404: "Instance doesn't exist" + }, + description="Stop a VPCS instance") + def stop(request, response): + + vpcs_manager = VPCS.instance() + vm = vpcs_manager.get_vm(request.match_info["vm_id"], project_id=request.match_info["project_id"]) + yield from vm.stop() + response.set_status(204) + + @classmethod + @Route.post( + r"/projects/{project_id}/vpcs/vms/{vm_id}/reload", + parameters={ + "project_id": "UUID for the project", + "vm_id": "UUID for the instance", + }, + status_codes={ + 204: "Instance reloaded", + 400: "Invalid request", + 404: "Instance doesn't exist" + }, + description="Reload a VPCS instance") + def reload(request, response): + + vpcs_manager = VPCS.instance() + vm = vpcs_manager.get_vm(request.match_info["vm_id"], project_id=request.match_info["project_id"]) + yield from vm.reload() + response.set_status(204) + + @Route.post( + r"/projects/{project_id}/vpcs/vms/{vm_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio", + parameters={ + "project_id": "UUID for the project", + "vm_id": "UUID for the instance", + "adapter_number": "Network adapter where the nio is located", + "port_number": "Port where the nio should be added" + }, + status_codes={ + 201: "NIO created", + 400: "Invalid request", + 404: "Instance doesn't exist" + }, + description="Add a NIO to a VPCS instance", + input=NIO_SCHEMA, + output=NIO_SCHEMA) + def create_nio(request, response): + + vpcs_manager = VPCS.instance() + vm = vpcs_manager.get_vm(request.match_info["vm_id"], project_id=request.match_info["project_id"]) + nio_type = request.json["type"] + if nio_type not in ("nio_udp", "nio_tap"): + raise HTTPConflict(text="NIO of type {} is not supported".format(nio_type)) + nio = vpcs_manager.create_nio(vm.vpcs_path(), request.json) + vm.port_add_nio_binding(int(request.match_info["port_number"]), nio) + response.set_status(201) + response.json(nio) + + @classmethod + @Route.delete( + r"/projects/{project_id}/vpcs/vms/{vm_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio", + parameters={ + "project_id": "UUID for the project", + "vm_id": "UUID for the instance", + "adapter_number": "Network adapter where the nio is located", + "port_number": "Port from where the nio should be removed" + }, + status_codes={ + 204: "NIO deleted", + 400: "Invalid request", + 404: "Instance doesn't exist" + }, + description="Remove a NIO from a VPCS instance") + def delete_nio(request, response): + + vpcs_manager = VPCS.instance() + vm = vpcs_manager.get_vm(request.match_info["vm_id"], project_id=request.match_info["project_id"]) + vm.port_remove_nio_binding(int(request.match_info["port_number"])) + response.set_status(204) diff --git a/tests/modules/test_project.py b/tests/modules/test_project.py new file mode 100644 index 00000000..d6946770 --- /dev/null +++ b/tests/modules/test_project.py @@ -0,0 +1,493 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (C) 2015 GNS3 Technologies Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import os +import uuid +import json +import asyncio +import pytest +import aiohttp +import zipfile +from uuid import uuid4 +from unittest.mock import patch + +from tests.utils import asyncio_patch +from gns3server.modules.project import Project +from gns3server.modules.vpcs import VPCS, VPCSVM + + +@pytest.fixture(scope="module") +def manager(port_manager): + m = VPCS.instance() + m.port_manager = port_manager + return m + + +@pytest.fixture(scope="function") +def vm(project, manager, loop): + vm = manager.create_vm("test", project.id, "00010203-0405-0607-0809-0a0b0c0d0e0f") + return loop.run_until_complete(asyncio.async(vm)) + + +def test_affect_uuid(): + p = Project() + assert len(p.id) == 36 + + p = Project(project_id='00010203-0405-0607-0809-0a0b0c0d0e0f') + assert p.id == '00010203-0405-0607-0809-0a0b0c0d0e0f' + + +def test_path(tmpdir): + with patch("gns3server.modules.project.Project.is_local", return_value=True): + p = Project(location=str(tmpdir)) + assert p.path == os.path.join(str(tmpdir), p.id) + assert os.path.exists(os.path.join(str(tmpdir), p.id)) + assert not os.path.exists(os.path.join(p.path, ".gns3_temporary")) + + +def test_init_path(tmpdir): + + with patch("gns3server.modules.project.Project.is_local", return_value=True): + p = Project(path=str(tmpdir)) + assert p.path == str(tmpdir) + + +def test_changing_path_temporary_flag(tmpdir): + + with patch("gns3server.modules.project.Project.is_local", return_value=True): + p = Project(temporary=True) + assert os.path.exists(p.path) + original_path = p.path + assert os.path.exists(os.path.join(p.path, ".gns3_temporary")) + + p.path = str(tmpdir) + + +def test_temporary_path(): + p = Project(temporary=True) + assert os.path.exists(p.path) + assert os.path.exists(os.path.join(p.path, ".gns3_temporary")) + + +def test_remove_temporary_flag(): + p = Project(temporary=True) + assert os.path.exists(p.path) + assert os.path.exists(os.path.join(p.path, ".gns3_temporary")) + p.temporary = False + assert not os.path.exists(os.path.join(p.path, ".gns3_temporary")) + + +def test_changing_location_not_allowed(tmpdir): + with patch("gns3server.modules.project.Project.is_local", return_value=False): + with pytest.raises(aiohttp.web.HTTPForbidden): + p = Project(location=str(tmpdir)) + + +def test_changing_path_not_allowed(tmpdir): + with patch("gns3server.modules.project.Project.is_local", return_value=False): + with pytest.raises(aiohttp.web.HTTPForbidden): + p = Project() + p.path = str(tmpdir) + + +def test_changing_path_with_quote_not_allowed(tmpdir): + with patch("gns3server.modules.project.Project.is_local", return_value=True): + with pytest.raises(aiohttp.web.HTTPForbidden): + p = Project() + p.path = str(tmpdir / "project\"53") + + +def test_json(tmpdir): + p = Project() + assert p.__json__() == {"name": p.name, "location": p.location, "path": p.path, "project_id": p.id, "temporary": False} + + +def test_vm_working_directory(tmpdir, vm): + with patch("gns3server.modules.project.Project.is_local", return_value=True): + p = Project(location=str(tmpdir)) + assert p.vm_working_directory(vm) == os.path.join(str(tmpdir), p.id, 'project-files', vm.module_name, vm.id) + assert os.path.exists(p.vm_working_directory(vm)) + + +def test_mark_vm_for_destruction(vm): + project = Project() + project.add_vm(vm) + project.mark_vm_for_destruction(vm) + assert len(project._vms_to_destroy) == 1 + assert len(project.vms) == 0 + + +def test_commit(manager, loop): + project = Project() + vm = VPCSVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", project, manager) + project.add_vm(vm) + directory = project.vm_working_directory(vm) + project.mark_vm_for_destruction(vm) + assert len(project._vms_to_destroy) == 1 + assert os.path.exists(directory) + loop.run_until_complete(asyncio.async(project.commit())) + assert len(project._vms_to_destroy) == 0 + assert os.path.exists(directory) is False + assert len(project.vms) == 0 + + +def test_commit_permission_issue(manager, loop): + """ + GNS3 will fix the permission and continue to delete + """ + project = Project() + vm = VPCSVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", project, manager) + project.add_vm(vm) + directory = project.vm_working_directory(vm) + project.mark_vm_for_destruction(vm) + assert len(project._vms_to_destroy) == 1 + assert os.path.exists(directory) + os.chmod(directory, 0) + loop.run_until_complete(asyncio.async(project.commit())) + + +def test_project_delete(loop): + project = Project() + directory = project.path + assert os.path.exists(directory) + loop.run_until_complete(asyncio.async(project.delete())) + assert os.path.exists(directory) is False + + +def test_project_delete_permission_issue(loop): + project = Project() + directory = project.path + assert os.path.exists(directory) + os.chmod(directory, 0) + with pytest.raises(aiohttp.web.HTTPInternalServerError): + loop.run_until_complete(asyncio.async(project.delete())) + os.chmod(directory, 700) + + +def test_project_add_vm(manager): + project = Project() + vm = VPCSVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", project, manager) + project.add_vm(vm) + assert len(project.vms) == 1 + + +def test_project_close(loop, vm, project): + + with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM.close") as mock: + loop.run_until_complete(asyncio.async(project.close())) + assert mock.called + assert vm.id not in vm.manager._vms + + +def test_project_close_temporary_project(loop, manager): + """A temporary project is deleted when closed""" + + project = Project(temporary=True) + directory = project.path + assert os.path.exists(directory) + loop.run_until_complete(asyncio.async(project.close())) + assert os.path.exists(directory) is False + + +def test_get_default_project_directory(monkeypatch): + + monkeypatch.undo() + project = Project() + path = os.path.normpath(os.path.expanduser("~/GNS3/projects")) + assert project._get_default_project_directory() == path + assert os.path.exists(path) + + +def test_clean_project_directory(tmpdir): + + # A non anonymous project with uuid. + project1 = tmpdir / str(uuid4()) + project1.mkdir() + + # A non anonymous project. + oldproject = tmpdir / str(uuid4()) + oldproject.mkdir() + + # an anonymous project + project2 = tmpdir / str(uuid4()) + project2.mkdir() + tmp = (project2 / ".gns3_temporary") + with open(str(tmp), 'w+') as f: + f.write("1") + + with patch("gns3server.config.Config.get_section_config", return_value={"project_directory": str(tmpdir)}): + Project.clean_project_directory() + + assert os.path.exists(str(project1)) + assert os.path.exists(str(oldproject)) + assert not os.path.exists(str(project2)) + + +def test_list_files(tmpdir, loop): + + with patch("gns3server.config.Config.get_section_config", return_value={"project_directory": str(tmpdir)}): + project = Project() + path = project.path + os.makedirs(os.path.join(path, "vm-1", "dynamips")) + with open(os.path.join(path, "vm-1", "dynamips", "test.bin"), "w+") as f: + f.write("test") + open(os.path.join(path, "vm-1", "dynamips", "test.ghost"), "w+").close() + with open(os.path.join(path, "test.txt"), "w+") as f: + f.write("test2") + + files = loop.run_until_complete(asyncio.async(project.list_files())) + + assert files == [ + { + "path": "test.txt", + "md5sum": "ad0234829205b9033196ba818f7a872b" + }, + { + "path": os.path.join("vm-1", "dynamips", "test.bin"), + "md5sum": "098f6bcd4621d373cade4e832627b4f6" + } + ] + + +def test_export(tmpdir): + project = Project() + path = project.path + os.makedirs(os.path.join(path, "vm-1", "dynamips")) + + # The .gns3 should be renamed project.gns3 in order to simplify import + with open(os.path.join(path, "test.gns3"), 'w+') as f: + f.write("{}") + + with open(os.path.join(path, "vm-1", "dynamips", "test"), 'w+') as f: + f.write("HELLO") + with open(os.path.join(path, "vm-1", "dynamips", "test_log.txt"), 'w+') as f: + f.write("LOG") + os.makedirs(os.path.join(path, "project-files", "snapshots")) + with open(os.path.join(path, "project-files", "snapshots", "test"), 'w+') as f: + f.write("WORLD") + + z = project.export() + + with open(str(tmpdir / 'zipfile.zip'), 'wb') as f: + for data in z: + f.write(data) + + with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip: + with myzip.open("vm-1/dynamips/test") as myfile: + content = myfile.read() + assert content == b"HELLO" + + assert 'test.gns3' not in myzip.namelist() + assert 'project.gns3' in myzip.namelist() + assert 'project-files/snapshots/test' not in myzip.namelist() + assert 'vm-1/dynamips/test_log.txt' not in myzip.namelist() + + +def test_export_fix_path(tmpdir): + """ + Fix absolute image path + """ + project = Project() + path = project.path + + topology = { + "topology": { + "nodes": [ + { + "properties": { + "image": "/tmp/c3725-adventerprisek9-mz.124-25d.image" + }, + "type": "C3725" + } + ] + } + } + + with open(os.path.join(path, "test.gns3"), 'w+') as f: + json.dump(topology, f) + + z = project.export() + with open(str(tmpdir / 'zipfile.zip'), 'wb') as f: + for data in z: + f.write(data) + + with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip: + with myzip.open("project.gns3") as myfile: + content = myfile.read().decode() + topology = json.loads(content) + assert topology["topology"]["nodes"][0]["properties"]["image"] == "c3725-adventerprisek9-mz.124-25d.image" + + +def test_export_with_images(tmpdir): + """ + Fix absolute image path + """ + project = Project() + path = project.path + + os.makedirs(str(tmpdir / "IOS")) + with open(str(tmpdir / "IOS" / "test.image"), "w+") as f: + f.write("AAA") + + topology = { + "topology": { + "nodes": [ + { + "properties": { + "image": "test.image" + }, + "type": "C3725" + } + ] + } + } + + with open(os.path.join(path, "test.gns3"), 'w+') as f: + json.dump(topology, f) + + with patch("gns3server.modules.Dynamips.get_images_directory", return_value=str(tmpdir / "IOS"),): + z = project.export(include_images=True) + with open(str(tmpdir / 'zipfile.zip'), 'wb') as f: + for data in z: + f.write(data) + + with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip: + myzip.getinfo("images/IOS/test.image") + + +def test_export_with_vm(tmpdir): + project = Project() + path = project.path + os.makedirs(os.path.join(path, "vm-1", "dynamips")) + + # The .gns3 should be renamed project.gns3 in order to simplify import + with open(os.path.join(path, "test.gns3"), 'w+') as f: + f.write("{}") + + with open(os.path.join(path, "vm-1", "dynamips", "test"), 'w+') as f: + f.write("HELLO") + with open(os.path.join(path, "vm-1", "dynamips", "test_log.txt"), 'w+') as f: + f.write("LOG") + os.makedirs(os.path.join(path, "project-files", "snapshots")) + with open(os.path.join(path, "project-files", "snapshots", "test"), 'w+') as f: + f.write("WORLD") + + os.makedirs(os.path.join(path, "servers", "vm", "project-files", "docker")) + with open(os.path.join(path, "servers", "vm", "project-files", "docker", "busybox"), 'w+') as f: + f.write("DOCKER") + + z = project.export() + + with open(str(tmpdir / 'zipfile.zip'), 'wb') as f: + for data in z: + f.write(data) + + with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip: + with myzip.open("vm-1/dynamips/test") as myfile: + content = myfile.read() + assert content == b"HELLO" + + assert 'test.gns3' not in myzip.namelist() + assert 'project.gns3' in myzip.namelist() + assert 'project-files/snapshots/test' not in myzip.namelist() + assert 'vm-1/dynamips/test_log.txt' not in myzip.namelist() + assert 'servers/vm/project-files/docker/busybox' not in myzip.namelist() + assert 'project-files/docker/busybox' in myzip.namelist() + + +def test_import(tmpdir): + + project_id = str(uuid.uuid4()) + project = Project(name="test", project_id=project_id) + + topology = { + "project_id": str(uuid.uuid4()), + "name": "testtest", + "topology": { + "nodes": [ + { + "server_id": 3, + "type": "VPCSDevice" + }, + { + "server_id": 3, + "type": "QemuVM" + } + ] + } + } + + with open(str(tmpdir / "project.gns3"), 'w+') as f: + json.dump(topology, f) + with open(str(tmpdir / "b.png"), 'w+') as f: + f.write("B") + + zip_path = str(tmpdir / "project.zip") + with zipfile.ZipFile(zip_path, 'w') as myzip: + myzip.write(str(tmpdir / "project.gns3"), "project.gns3") + myzip.write(str(tmpdir / "b.png"), "b.png") + myzip.write(str(tmpdir / "b.png"), "project-files/dynamips/test") + myzip.write(str(tmpdir / "b.png"), "project-files/qemu/test") + + with open(zip_path, "rb") as f: + project.import_zip(f) + + assert os.path.exists(os.path.join(project.path, "b.png")) + assert os.path.exists(os.path.join(project.path, "test.gns3")) + assert os.path.exists(os.path.join(project.path, "project-files/dynamips/test")) + assert os.path.exists(os.path.join(project.path, "servers/vm/project-files/qemu/test")) + + with open(os.path.join(project.path, "test.gns3")) as f: + content = json.load(f) + + assert content["name"] == "test" + assert content["project_id"] == project_id + assert content["topology"]["servers"] == [ + { + "id": 1, + "local": True, + "vm": False + }, + { + "id": 2, + "local": False, + "vm": True + }, + ] + assert content["topology"]["nodes"][0]["server_id"] == 1 + assert content["topology"]["nodes"][1]["server_id"] == 2 + + +def test_import_with_images(tmpdir): + + project_id = str(uuid.uuid4()) + project = Project(name="test", project_id=project_id) + + with open(str(tmpdir / "test.image"), 'w+') as f: + f.write("B") + + zip_path = str(tmpdir / "project.zip") + with zipfile.ZipFile(zip_path, 'w') as myzip: + myzip.write(str(tmpdir / "test.image"), "images/IOS/test.image") + + with open(zip_path, "rb") as f: + project.import_zip(f) + + # TEST import images + path = os.path.join(project._config().get("images_path"), "IOS", "test.image") + assert os.path.exists(path), path diff --git a/tests/modules/vpcs/test_vpcs_vm.py b/tests/modules/vpcs/test_vpcs_vm.py new file mode 100644 index 00000000..949bdf74 --- /dev/null +++ b/tests/modules/vpcs/test_vpcs_vm.py @@ -0,0 +1,284 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2015 GNS3 Technologies Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import pytest +import aiohttp +import asyncio +import os +import sys + +from tests.utils import asyncio_patch +from gns3server.utils import parse_version +from unittest.mock import patch, MagicMock + +from gns3server.modules.vpcs.vpcs_vm import VPCSVM +from gns3server.modules.vpcs.vpcs_error import VPCSError +from gns3server.modules.vpcs import VPCS + + +@pytest.fixture(scope="module") +def manager(port_manager): + m = VPCS.instance() + m.port_manager = port_manager + return m + + +@pytest.fixture(scope="function") +def vm(project, manager): + vm = VPCSVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", project, manager) + vm._vpcs_version = parse_version("0.9") + return vm + + +def test_vm(project, manager): + vm = VPCSVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", project, manager) + assert vm.name == "test" + assert vm.id == "00010203-0405-0607-0809-0a0b0c0d0e0f" + + +def test_vm_check_vpcs_version(loop, vm, manager): + with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.subprocess_check_output", return_value="Welcome to Virtual PC Simulator, version 0.9"): + loop.run_until_complete(asyncio.async(vm._check_vpcs_version())) + assert vm._vpcs_version == parse_version("0.9") + + +def test_vm_check_vpcs_version_0_6_1(loop, vm, manager): + with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.subprocess_check_output", return_value="Welcome to Virtual PC Simulator, version 0.6.1"): + loop.run_until_complete(asyncio.async(vm._check_vpcs_version())) + assert vm._vpcs_version == parse_version("0.6.1") + + +def test_vm_invalid_vpcs_version(loop, manager, vm): + with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.subprocess_check_output", return_value="Welcome to Virtual PC Simulator, version 0.1"): + with pytest.raises(VPCSError): + nio = manager.create_nio(vm.vpcs_path(), {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"}) + vm.port_add_nio_binding(0, nio) + loop.run_until_complete(asyncio.async(vm._check_vpcs_version())) + assert vm.name == "test" + assert vm.id == "00010203-0405-0607-0809-0a0b0c0d0e0f" + + +def test_vm_invalid_vpcs_path(vm, manager, loop): + with patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM.vpcs_path", return_value="/tmp/fake/path/vpcs"): + with pytest.raises(VPCSError): + nio = manager.create_nio(vm.vpcs_path(), {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"}) + vm.port_add_nio_binding(0, nio) + loop.run_until_complete(asyncio.async(vm.start())) + assert vm.name == "test" + assert vm.id == "00010203-0405-0607-0809-0a0b0c0d0e0e" + + +def test_start(loop, vm): + process = MagicMock() + process.returncode = None + queue = vm.project.get_listen_queue() + + with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True): + with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec: + nio = VPCS.instance().create_nio(vm.vpcs_path(), {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"}) + vm.port_add_nio_binding(0, nio) + loop.run_until_complete(asyncio.async(vm.start())) + assert mock_exec.call_args[0] == (vm.vpcs_path(), + '-p', + str(vm.console), + '-m', '1', + '-i', + '1', + '-F', + '-R', + '-s', + '4242', + '-c', + '4243', + '-t', + '127.0.0.1') + assert vm.is_running() + assert vm.command_line == ' '.join(mock_exec.call_args[0]) + (action, event) = queue.get_nowait() + assert action == "vm.started" + assert event == vm + + +def test_start_0_6_1(loop, vm): + """ + Version 0.6.1 doesn't have the -R options. It's not require + because GNS3 provide a patch for this. + """ + process = MagicMock() + process.returncode = None + queue = vm.project.get_listen_queue() + vm._vpcs_version = parse_version("0.6.1") + + with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True): + with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec: + nio = VPCS.instance().create_nio(vm.vpcs_path(), {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"}) + vm.port_add_nio_binding(0, nio) + loop.run_until_complete(asyncio.async(vm.start())) + assert mock_exec.call_args[0] == (vm.vpcs_path(), + '-p', + str(vm.console), + '-m', '1', + '-i', + '1', + '-F', + '-s', + '4242', + '-c', + '4243', + '-t', + '127.0.0.1') + assert vm.is_running() + (action, event) = queue.get_nowait() + assert action == "vm.started" + assert event == vm + + +def test_stop(loop, vm): + process = MagicMock() + + # Wait process kill success + future = asyncio.Future() + future.set_result(True) + process.wait.return_value = future + process.returncode = None + + with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True): + with asyncio_patch("asyncio.create_subprocess_exec", return_value=process): + nio = VPCS.instance().create_nio(vm.vpcs_path(), {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"}) + vm.port_add_nio_binding(0, nio) + + loop.run_until_complete(asyncio.async(vm.start())) + assert vm.is_running() + + queue = vm.project.get_listen_queue() + + with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"): + loop.run_until_complete(asyncio.async(vm.stop())) + assert vm.is_running() is False + + if sys.platform.startswith("win"): + process.send_signal.assert_called_with(1) + else: + process.terminate.assert_called_with() + + (action, event) = queue.get_nowait() + assert action == "vm.stopped" + assert event == vm + + +def test_reload(loop, vm): + process = MagicMock() + + # Wait process kill success + future = asyncio.Future() + future.set_result(True) + process.wait.return_value = future + process.returncode = None + + with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True): + with asyncio_patch("asyncio.create_subprocess_exec", return_value=process): + nio = VPCS.instance().create_nio(vm.vpcs_path(), {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"}) + vm.port_add_nio_binding(0, nio) + loop.run_until_complete(asyncio.async(vm.start())) + assert vm.is_running() + + with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"): + loop.run_until_complete(asyncio.async(vm.reload())) + assert vm.is_running() is True + + if sys.platform.startswith("win"): + process.send_signal.assert_called_with(1) + else: + process.terminate.assert_called_with() + + +def test_add_nio_binding_udp(vm): + nio = VPCS.instance().create_nio(vm.vpcs_path(), {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"}) + vm.port_add_nio_binding(0, nio) + assert nio.lport == 4242 + + +@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows") +def test_add_nio_binding_tap(vm, ethernet_device): + with patch("gns3server.modules.base_manager.BaseManager.has_privileged_access", return_value=True): + nio = VPCS.instance().create_nio(vm.vpcs_path(), {"type": "nio_tap", "tap_device": ethernet_device}) + vm.port_add_nio_binding(0, nio) + assert nio.tap_device == ethernet_device + + +def test_port_remove_nio_binding(vm): + nio = VPCS.instance().create_nio(vm.vpcs_path(), {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"}) + vm.port_add_nio_binding(0, nio) + vm.port_remove_nio_binding(0) + assert vm._ethernet_adapter.ports[0] is None + + +def test_update_startup_script(vm): + content = "echo GNS3 VPCS\nip 192.168.1.2\n" + vm.startup_script = content + filepath = os.path.join(vm.working_dir, 'startup.vpc') + assert os.path.exists(filepath) + with open(filepath) as f: + assert f.read() == content + + +def test_update_startup_script_h(vm): + content = "setname %h\n" + vm.name = "pc1" + vm.startup_script = content + assert os.path.exists(vm.script_file) + with open(vm.script_file) as f: + assert f.read() == "setname pc1\n" + + +def test_get_startup_script(vm): + content = "echo GNS3 VPCS\nip 192.168.1.2" + vm.startup_script = content + assert vm.startup_script == os.linesep.join(["echo GNS3 VPCS", "ip 192.168.1.2"]) + + +def test_get_startup_script_using_default_script(vm): + content = "echo GNS3 VPCS\nip 192.168.1.2\n" + + # Reset script file location + vm._script_file = None + + filepath = os.path.join(vm.working_dir, 'startup.vpc') + with open(filepath, 'wb+') as f: + assert f.write(content.encode("utf-8")) + + assert vm.startup_script == content + assert vm.script_file == filepath + + +def test_change_name(vm, tmpdir): + path = os.path.join(vm.working_dir, 'startup.vpc') + vm.name = "world" + with open(path, 'w+') as f: + f.write("name world") + vm.name = "hello" + assert vm.name == "hello" + with open(path) as f: + assert f.read() == "name hello" + + +def test_close(vm, port_manager, loop): + with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True): + with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()): + vm.start() + loop.run_until_complete(asyncio.async(vm.close())) + assert vm.is_running() is False