Fix traceng tests.

pull/1325/head
grossmj 6 years ago
parent 47e5dfabd8
commit 297bbd91ec

@ -59,6 +59,7 @@ class TraceNGVM(BaseNode):
self._process = None
self._started = False
self._ip_address = None
self._destination = None
self._local_udp_tunnel = None
self._ethernet_adapter = EthernetAdapter() # one adapter with 1 Ethernet interface
@ -181,7 +182,9 @@ class TraceNGVM(BaseNode):
yield from self._stop_ubridge() # make use we start with a fresh uBridge instance
try:
log.info("Starting TraceNG: {}".format(command))
flags = subprocess.CREATE_NEW_CONSOLE
flags = 0
if hasattr(subprocess, "CREATE_NEW_CONSOLE"):
flags = subprocess.CREATE_NEW_CONSOLE
self.command_line = ' '.join(command)
self._process = yield from asyncio.create_subprocess_exec(*command,
cwd=self.working_dir,
@ -246,7 +249,7 @@ class TraceNGVM(BaseNode):
"""
yield from self.stop()
yield from self.start()
yield from self.start(self._destination)
def _terminate_process(self):
"""
@ -402,6 +405,7 @@ class TraceNGVM(BaseNode):
if not self._ip_address:
raise TraceNGError("Please configure an IP address for this TraceNG node")
self._destination = destination
command = [self._traceng_path()]
# use the local UDP tunnel to uBridge instead
if not self._local_udp_tunnel:

@ -904,9 +904,11 @@ class Project:
Start all nodes
"""
pool = Pool(concurrency=3)
emit_warning = True
for node in self.nodes.values():
if node.node_type == "traceng":
#self.controller.notification.emit("log.warning", "TraceNG nodes must be started one by one")
if node.node_type == "traceng" and emit_warning:
self.controller.notification.emit("log.warning", {"message": "TraceNG nodes must be started one by one"})
emit_warning = False
continue
pool.append(node.start)
yield from pool.join()

@ -17,12 +17,10 @@
import pytest
import asyncio
import os
import sys
from tests.utils import asyncio_patch, AsyncioMagicMock
from gns3server.utils import parse_version
from unittest.mock import patch, MagicMock, ANY
from unittest.mock import patch, MagicMock, ANY, PropertyMock
from gns3server.compute.traceng.traceng_vm import TraceNGVM
from gns3server.compute.traceng.traceng_error import TraceNGError
@ -30,6 +28,7 @@ from gns3server.compute.traceng import TraceNG
from gns3server.compute.notification_manager import NotificationManager
@pytest.fixture
def manager(port_manager):
m = TraceNG.instance()
@ -69,19 +68,26 @@ def test_start(loop, vm, async_run):
with NotificationManager.instance().queue() as queue:
async_run(queue.get(0)) # Ping
with asyncio_patch("gns3server.compute.traceng.traceng_vm.TraceNGVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec:
loop.run_until_complete(asyncio.async(vm.start()))
assert mock_exec.call_args[0] == (vm._traceng_path(),
'-c',
ANY,
'-v',
ANY,
'-b',
'127.0.0.1',
'10.0.0.1')
assert vm.is_running()
assert vm.command_line == ' '.join(mock_exec.call_args[0])
vm.ip_address = "192.168.1.1"
with patch("sys.platform", return_value="win"):
with asyncio_patch("gns3server.compute.traceng.traceng_vm.TraceNGVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec:
loop.run_until_complete(asyncio.async(vm.start("192.168.1.2")))
assert mock_exec.call_args[0] == (vm._traceng_path(),
'-u',
'-c',
ANY,
'-v',
ANY,
'-b',
'127.0.0.1',
'-s',
'ICMP',
'-f',
'192.168.1.1',
'192.168.1.2')
assert vm.is_running()
assert vm.command_line == ' '.join(mock_exec.call_args[0])
(action, event, kwargs) = async_run(queue.get(0))
assert action == "node.updated"
assert event == vm
@ -96,30 +102,30 @@ def test_stop(loop, vm, async_run):
process.wait.return_value = future
process.returncode = None
vm.ip_address = "192.168.1.1"
with NotificationManager.instance().queue() as queue:
with asyncio_patch("gns3server.compute.traceng.traceng_vm.TraceNGVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
nio = TraceNG.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1", "filters": {}})
async_run(vm.port_add_nio_binding(0, nio))
with patch("sys.platform", return_value="win"):
with asyncio_patch("gns3server.compute.traceng.traceng_vm.TraceNGVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
nio = TraceNG.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1", "filters": {}})
async_run(vm.port_add_nio_binding(0, nio))
async_run(vm.start())
assert vm.is_running()
vm._ubridge_send = AsyncioMagicMock()
async_run(vm.start("192.168.1.2"))
assert vm.is_running()
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
loop.run_until_complete(asyncio.async(vm.stop()))
assert vm.is_running() is False
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
loop.run_until_complete(asyncio.async(vm.stop()))
assert vm.is_running() is False
if sys.platform.startswith("win"):
process.send_signal.assert_called_with(1)
else:
process.terminate.assert_called_with()
async_run(queue.get(0)) #  Ping
async_run(queue.get(0)) #  Started
async_run(queue.get(0)) #  Ping
async_run(queue.get(0)) #  Started
(action, event, kwargs) = async_run(queue.get(0))
assert action == "node.updated"
assert event == vm
(action, event, kwargs) = async_run(queue.get(0))
assert action == "node.updated"
assert event == vm
def test_reload(loop, vm, async_run):
@ -131,22 +137,25 @@ def test_reload(loop, vm, async_run):
process.wait.return_value = future
process.returncode = None
with asyncio_patch("gns3server.compute.traceng.traceng_vm.TraceNGVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
nio = TraceNG.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1", "filters": {}})
async_run(vm.port_add_nio_binding(0, nio))
async_run(vm.start())
assert vm.is_running()
vm._ubridge_send = AsyncioMagicMock()
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
async_run(vm.reload())
assert vm.is_running() is True
#if sys.platform.startswith("win"):
# process.send_signal.assert_called_with(1)
#else:
process.terminate.assert_called_with()
vm.ip_address = "192.168.1.1"
with patch("sys.platform", return_value="win"):
with asyncio_patch("gns3server.compute.traceng.traceng_vm.TraceNGVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
nio = TraceNG.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1", "filters": {}})
async_run(vm.port_add_nio_binding(0, nio))
vm._ubridge_send = AsyncioMagicMock()
async_run(vm.start("192.168.1.2"))
assert vm.is_running()
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
async_run(vm.reload())
assert vm.is_running() is True
#if sys.platform.startswith("win"):
# process.send_signal.assert_called_with(1)
#else:
process.terminate.assert_called_with()
def test_add_nio_binding_udp(vm, async_run):

@ -31,11 +31,11 @@ from gns3server.version import __version__
def test_get(http_compute, windows_platform):
response = http_compute.get('/capabilities', example=True)
assert response.status == 200
assert response.json == {'node_types': ['cloud', 'ethernet_hub', 'ethernet_switch', 'nat', 'vpcs', 'virtualbox', 'dynamips', 'frame_relay_switch', 'atm_switch', 'qemu', 'vmware', 'docker', 'iou', 'traceng'], 'version': __version__, 'platform': sys.platform}
assert response.json == {'node_types': ['cloud', 'ethernet_hub', 'ethernet_switch', 'nat', 'vpcs', 'virtualbox', 'dynamips', 'frame_relay_switch', 'atm_switch', 'qemu', 'vmware', 'traceng', 'docker', 'iou'], 'version': __version__, 'platform': sys.platform}
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
def test_get_on_gns3vm(http_compute, on_gns3vm):
response = http_compute.get('/capabilities', example=True)
assert response.status == 200
assert response.json == {'node_types': ['cloud', 'ethernet_hub', 'ethernet_switch', 'nat', 'vpcs', 'virtualbox', 'dynamips', 'frame_relay_switch', 'atm_switch', 'qemu', 'vmware', 'docker', 'iou', 'traceng'], 'version': __version__, 'platform': sys.platform}
assert response.json == {'node_types': ['cloud', 'ethernet_hub', 'ethernet_switch', 'nat', 'vpcs', 'virtualbox', 'dynamips', 'frame_relay_switch', 'atm_switch', 'qemu', 'vmware', 'traceng', 'docker', 'iou'], 'version': __version__, 'platform': sys.platform}

@ -96,7 +96,7 @@ def test_traceng_delete_nio(http_compute, vm):
def test_traceng_start(http_compute, vm):
with asyncio_patch("gns3server.compute.traceng.traceng_vm.TraceNGVM.start", return_value=True) as mock:
response = http_compute.post("/projects/{project_id}/traceng/nodes/{node_id}/start".format(project_id=vm["project_id"], node_id=vm["node_id"]), example=True)
response = http_compute.post("/projects/{project_id}/traceng/nodes/{node_id}/start".format(project_id=vm["project_id"], node_id=vm["node_id"]), {"destination": "192.168.1.2"}, example=True)
assert mock.called
assert response.status == 200
assert response.json["name"] == "TraceNG TEST 1"
@ -139,9 +139,9 @@ def test_traceng_duplicate(http_compute, vm):
def test_traceng_update(http_compute, vm, tmpdir, free_console_port):
response = http_compute.put("/projects/{project_id}/traceng/nodes/{node_id}".format(project_id=vm["project_id"], node_id=vm["node_id"]), {"name": "test",
"console": free_console_port,
},
"ip_address": "192.168.1.1",
},
example=True)
assert response.status == 200
assert response.json["name"] == "test"
assert response.json["console"] == free_console_port
assert response.json["ip_address"] == "192.168.1.1"

Loading…
Cancel
Save