mirror of
https://github.com/GNS3/gns3-server
synced 2024-12-01 04:38:12 +00:00
Some fixes for tests. Ref #1784
This commit is contained in:
parent
51b76b1e30
commit
7fd8fde020
@ -43,7 +43,7 @@ async def manager(loop, port_manager):
|
|||||||
async def node(compute_project, manager):
|
async def node(compute_project, manager):
|
||||||
|
|
||||||
node = manager.create_node("test", compute_project.id, "00010203-0405-0607-0809-0a0b0c0d0e0f")
|
node = manager.create_node("test", compute_project.id, "00010203-0405-0607-0809-0a0b0c0d0e0f")
|
||||||
return await asyncio.ensure_future(node)
|
return await node
|
||||||
|
|
||||||
|
|
||||||
async def test_affect_uuid():
|
async def test_affect_uuid():
|
||||||
@ -156,7 +156,7 @@ async def test_project_delete_permission_issue():
|
|||||||
assert os.path.exists(directory)
|
assert os.path.exists(directory)
|
||||||
os.chmod(directory, 0)
|
os.chmod(directory, 0)
|
||||||
with pytest.raises(aiohttp.web.HTTPInternalServerError):
|
with pytest.raises(aiohttp.web.HTTPInternalServerError):
|
||||||
await asyncio.ensure_future(project.delete())
|
await project.delete()
|
||||||
os.chmod(directory, 700)
|
os.chmod(directory, 700)
|
||||||
|
|
||||||
|
|
||||||
|
@ -171,7 +171,7 @@ async def test_add_nio_binding_udp(vm):
|
|||||||
async def test_port_remove_nio_binding(vm):
|
async def test_port_remove_nio_binding(vm):
|
||||||
|
|
||||||
nio = TraceNG.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
|
nio = TraceNG.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
|
||||||
await asyncio.ensure_future(vm.port_add_nio_binding(0, nio))
|
await vm.port_add_nio_binding(0, nio)
|
||||||
await vm.port_remove_nio_binding(0)
|
await vm.port_remove_nio_binding(0)
|
||||||
assert vm._ethernet_adapter.ports[0] is None
|
assert vm._ethernet_adapter.ports[0] is None
|
||||||
|
|
||||||
|
@ -70,5 +70,5 @@ async def test_stop_capture(vm, tmpdir, manager, free_console_port):
|
|||||||
await vm.adapter_add_nio_binding(0, nio)
|
await vm.adapter_add_nio_binding(0, nio)
|
||||||
await vm.start_capture(0, output_file)
|
await vm.start_capture(0, output_file)
|
||||||
assert vm._ethernet_adapters[0].get_nio(0).capturing
|
assert vm._ethernet_adapters[0].get_nio(0).capturing
|
||||||
await asyncio.ensure_future(vm.stop_capture(0))
|
await vm.stop_capture(0)
|
||||||
assert vm._ethernet_adapters[0].get_nio(0).capturing is False
|
assert vm._ethernet_adapters[0].get_nio(0).capturing is False
|
||||||
|
@ -59,7 +59,7 @@ async def test_vm(compute_project, manager):
|
|||||||
async def test_vm_check_vpcs_version(vm):
|
async def test_vm_check_vpcs_version(vm):
|
||||||
|
|
||||||
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.subprocess_check_output", return_value="Welcome to Virtual PC Simulator, version 0.9"):
|
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.subprocess_check_output", return_value="Welcome to Virtual PC Simulator, version 0.9"):
|
||||||
await asyncio.ensure_future(vm._check_vpcs_version())
|
await vm._check_vpcs_version()
|
||||||
assert vm._vpcs_version == parse_version("0.9")
|
assert vm._vpcs_version == parse_version("0.9")
|
||||||
|
|
||||||
|
|
||||||
@ -75,8 +75,8 @@ async def test_vm_invalid_vpcs_version(vm, manager):
|
|||||||
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.subprocess_check_output", return_value="Welcome to Virtual PC Simulator, version 0.1"):
|
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.subprocess_check_output", return_value="Welcome to Virtual PC Simulator, version 0.1"):
|
||||||
with pytest.raises(VPCSError):
|
with pytest.raises(VPCSError):
|
||||||
nio = manager.create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1", "filters": {}})
|
nio = manager.create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1", "filters": {}})
|
||||||
await asyncio.ensure_future(vm.port_add_nio_binding(0, nio))
|
await vm.port_add_nio_binding(0, nio)
|
||||||
await asyncio.ensure_future(vm._check_vpcs_version())
|
await vm._check_vpcs_version()
|
||||||
assert vm.name == "test"
|
assert vm.name == "test"
|
||||||
assert vm.id == "00010203-0405-0607-0809-0a0b0c0d0e0f"
|
assert vm.id == "00010203-0405-0607-0809-0a0b0c0d0e0f"
|
||||||
|
|
||||||
@ -86,8 +86,8 @@ async def test_vm_invalid_vpcs_path(vm, manager):
|
|||||||
with patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM._vpcs_path", return_value="/tmp/fake/path/vpcs"):
|
with patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM._vpcs_path", return_value="/tmp/fake/path/vpcs"):
|
||||||
with pytest.raises(VPCSError):
|
with pytest.raises(VPCSError):
|
||||||
nio = manager.create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
|
nio = manager.create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
|
||||||
await asyncio.ensure_future(vm.port_add_nio_binding(0, nio))
|
await vm.port_add_nio_binding(0, nio)
|
||||||
await asyncio.ensure_future(vm.start())
|
await vm.start()
|
||||||
assert vm.name == "test"
|
assert vm.name == "test"
|
||||||
assert vm.id == "00010203-0405-0607-0809-0a0b0c0d0e0e"
|
assert vm.id == "00010203-0405-0607-0809-0a0b0c0d0e0e"
|
||||||
|
|
||||||
@ -177,7 +177,7 @@ async def test_stop(vm):
|
|||||||
assert vm.is_running()
|
assert vm.is_running()
|
||||||
|
|
||||||
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
|
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
|
||||||
await asyncio.ensure_future(vm.stop())
|
await vm.stop()
|
||||||
assert vm.is_running() is False
|
assert vm.is_running() is False
|
||||||
|
|
||||||
if sys.platform.startswith("win"):
|
if sys.platform.startswith("win"):
|
||||||
@ -233,14 +233,14 @@ async def test_add_nio_binding_tap(vm, ethernet_device):
|
|||||||
|
|
||||||
with patch("gns3server.compute.base_manager.BaseManager.has_privileged_access", return_value=True):
|
with patch("gns3server.compute.base_manager.BaseManager.has_privileged_access", return_value=True):
|
||||||
nio = VPCS.instance().create_nio({"type": "nio_tap", "tap_device": ethernet_device})
|
nio = VPCS.instance().create_nio({"type": "nio_tap", "tap_device": ethernet_device})
|
||||||
await asyncio.ensure_future(vm.port_add_nio_binding(0, nio))
|
await vm.port_add_nio_binding(0, nio)
|
||||||
assert nio.tap_device == ethernet_device
|
assert nio.tap_device == ethernet_device
|
||||||
|
|
||||||
|
|
||||||
async def test_port_remove_nio_binding(vm):
|
async def test_port_remove_nio_binding(vm):
|
||||||
|
|
||||||
nio = VPCS.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
|
nio = VPCS.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
|
||||||
await asyncio.ensure_future(vm.port_add_nio_binding(0, nio))
|
await vm.port_add_nio_binding(0, nio)
|
||||||
await vm.port_remove_nio_binding(0)
|
await vm.port_remove_nio_binding(0)
|
||||||
assert vm._ethernet_adapter.ports[0] is None
|
assert vm._ethernet_adapter.ports[0] is None
|
||||||
|
|
||||||
@ -317,6 +317,6 @@ async def test_close(vm):
|
|||||||
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
|
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
|
||||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()):
|
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()):
|
||||||
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM.start_wrap_console"):
|
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM.start_wrap_console"):
|
||||||
await asyncio.ensure_future(vm.start())
|
await vm.start()
|
||||||
await asyncio.ensure_future(vm.close())
|
await vm.close()
|
||||||
assert vm.is_running() is False
|
assert vm.is_running() is False
|
||||||
|
@ -187,10 +187,10 @@ async def test_upload_image(compute_api, images_dir):
|
|||||||
async def test_upload_image_permission_denied(compute_api, tmpdir, images_dir):
|
async def test_upload_image_permission_denied(compute_api, tmpdir, images_dir):
|
||||||
|
|
||||||
os.makedirs(os.path.join(images_dir, "IOS"), exist_ok=True)
|
os.makedirs(os.path.join(images_dir, "IOS"), exist_ok=True)
|
||||||
with open(os.path.join(images_dir, "IOS", "test2.tmp"), "w+") as f:
|
with open(os.path.join(images_dir, "IOS", "test3.tmp"), "w+") as f:
|
||||||
f.write("")
|
f.write("")
|
||||||
os.chmod(os.path.join(images_dir, "IOS", "test2.tmp"), 0)
|
os.chmod(os.path.join(images_dir, "IOS", "test3.tmp"), 0)
|
||||||
|
|
||||||
with patch("gns3server.utils.images.default_images_directory", return_value=str(tmpdir)):
|
with patch("gns3server.utils.images.default_images_directory", return_value=str(tmpdir)):
|
||||||
response = await compute_api.post("/dynamips/images/test2", body="TEST", raw=True)
|
response = await compute_api.post("/dynamips/images/test3", "TEST", raw=True)
|
||||||
assert response.status == 409
|
assert response.status == 409
|
||||||
|
@ -317,12 +317,12 @@ async def test_upload_image_forbiden_location(compute_api, tmpdir):
|
|||||||
|
|
||||||
async def test_upload_image_permission_denied(compute_api, tmpdir):
|
async def test_upload_image_permission_denied(compute_api, tmpdir):
|
||||||
|
|
||||||
with open(str(tmpdir / "test2.tmp"), "w+") as f:
|
with open(str(tmpdir / "test3.tmp"), "w+") as f:
|
||||||
f.write("")
|
f.write("")
|
||||||
os.chmod(str(tmpdir / "test2.tmp"), 0)
|
os.chmod(str(tmpdir / "test3.tmp"), 0)
|
||||||
|
|
||||||
with patch("gns3server.compute.Qemu.get_images_directory", return_value=str(tmpdir)):
|
with patch("gns3server.compute.Qemu.get_images_directory", return_value=str(tmpdir)):
|
||||||
response = await compute_api.post("/qemu/images/test2", body="TEST", raw=True)
|
response = await compute_api.post("/qemu/images/test3", "TEST", raw=True)
|
||||||
assert response.status == 409
|
assert response.status == 409
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user