diff --git a/gns3server/controller/compute.py b/gns3server/controller/compute.py index d40f5640..7d42c0a7 100644 --- a/gns3server/controller/compute.py +++ b/gns3server/controller/compute.py @@ -410,7 +410,7 @@ class Compute: self._memory_usage_percent = event["memory_usage_percent"] self._controller.notification.emit("compute.updated", self.__json__()) else: - self._controller.notification.dispatch(action, event, compute_id=self.id) + yield from self._controller.notification.dispatch(action, event, compute_id=self.id) if self._ws: yield from self._ws.close() diff --git a/gns3server/controller/link.py b/gns3server/controller/link.py index 3a294c78..0df8fc87 100644 --- a/gns3server/controller/link.py +++ b/gns3server/controller/link.py @@ -91,6 +91,8 @@ class Link: if len(self._nodes) == 2: yield from self.create() + for n in self._nodes: + n["node"].add_link(self) self._created = True self._project.controller.notification.emit("link.created", self.__json__()) @@ -121,8 +123,8 @@ class Link: """ Delete the link """ - - raise NotImplementedError + for port in self._nodes: + port["node"].remove_link(self) @asyncio.coroutine def start_capture(self, data_link_type="DLT_EN10MB", capture_file_name=None): @@ -173,6 +175,13 @@ class Link: raise NotImplementedError + @asyncio.coroutine + def node_updated(self, node): + """ + Called when a node member of the link is updated + """ + raise NotImplementedError + def default_capture_file_name(self): """ :returns: File name for a capture on this link @@ -214,6 +223,9 @@ class Link: return False return self.id == other.id + def __hash__(self): + return hash(self._id) + def __json__(self, topology_dump=False): """ :param topology_dump: Filter to keep only properties require for saving on disk diff --git a/gns3server/controller/node.py b/gns3server/controller/node.py index 98cf5759..02017819 100644 --- a/gns3server/controller/node.py +++ b/gns3server/controller/node.py @@ -59,6 +59,7 @@ class Node: self._node_type = node_type self._label = None + self._links = set() self._name = None self.name = name self._console = None @@ -268,6 +269,22 @@ class Node: def first_port_name(self, val): self._first_port_name = val + def add_link(self, link): + """ + A link is connected to the node + """ + self._links.add(link) + + def remove_link(self, link): + """ + A link is connected to the node + """ + self._links.remove(link) + + @property + def link(self): + return self._links + @asyncio.coroutine def create(self): """ @@ -291,7 +308,7 @@ class Node: else: raise e else: - self.parse_node_response(response.json) + yield from self.parse_node_response(response.json) return True trial += 1 @@ -328,9 +345,10 @@ class Node: if update_compute: data = self._node_data(properties=compute_properties) response = yield from self.put(None, data=data) - self.parse_node_response(response.json) + yield from self.parse_node_response(response.json) self.project.dump() + @asyncio.coroutine def parse_node_response(self, response): """ Update the object with the remote node object @@ -353,6 +371,8 @@ class Node: else: self._properties[key] = value self._list_ports() + for link in self._links: + yield from link.node_updated(self) def _node_data(self, properties=None): """ diff --git a/gns3server/controller/notification.py b/gns3server/controller/notification.py index f89797eb..23bc1b18 100644 --- a/gns3server/controller/notification.py +++ b/gns3server/controller/notification.py @@ -17,6 +17,7 @@ import os import aiohttp +import asyncio from contextlib import contextmanager from ..notification_queue import NotificationQueue @@ -51,6 +52,7 @@ class Notification: """ return project.id in self._listeners and len(self._listeners[project.id]) > 0 + @asyncio.coroutine def dispatch(self, action, event, compute_id): """ Notification received from compute node. Send it directly @@ -65,7 +67,7 @@ class Notification: # Update controller node data and send the event node.updated project = self._controller.get_project(event["project_id"]) node = project.get_node(event["node_id"]) - node.parse_node_response(event) + yield from node.parse_node_response(event) self.emit("node.updated", node.__json__()) except aiohttp.web.HTTPNotFound: diff --git a/gns3server/controller/udp_link.py b/gns3server/controller/udp_link.py index a4e87780..fc321a15 100644 --- a/gns3server/controller/udp_link.py +++ b/gns3server/controller/udp_link.py @@ -115,6 +115,7 @@ class UDPLink(Link): # If the node is already delete (user selected multiple element and delete all in the same time) except aiohttp.web.HTTPNotFound: pass + yield from super().delete() @asyncio.coroutine def start_capture(self, data_link_type="DLT_EN10MB", capture_file_name=None): @@ -145,21 +146,31 @@ class UDPLink(Link): """ Run capture on the best candidate. - The ideal candidate is a node who support capture on controller server + The ideal candidate is a node who on controller server and always + running (capture will not be cut off) :returns: Node where the capture should run """ - # use the local node first to save bandwidth + ALWAYS_RUNNING_NODES_TYPE = ("cloud", "nat", "ethernet_switch", "ethernet_hub") + for node in self._nodes: - if node["node"].compute.id == "local" and node["node"].node_type not in [""]: # FIXME + if node["node"].compute.id == "local" and node["node"].node_type in ALWAYS_RUNNING_NODES_TYPE and node["node"].status == "started": return node for node in self._nodes: - if node["node"].node_type not in [""]: # FIXME + if node["node"].node_type in ALWAYS_RUNNING_NODES_TYPE and node["node"].status == "started": return node - raise aiohttp.web.HTTPConflict(text="Capture is not supported for this link") + for node in self._nodes: + if node["node"].compute.id == "local" and node["node"].status == "started": + return node + + for node in self._nodes: + if node["node"].node_type and node["node"].status == "started": + return node + + raise aiohttp.web.HTTPConflict(text="Can not capture because no running device on this link") @asyncio.coroutine def read_pcap_from_source(self): @@ -169,3 +180,11 @@ class UDPLink(Link): if self._capture_node: compute = self._capture_node["node"].compute return compute.stream_file(self._project, "tmp/captures/" + self._capture_file_name) + + @asyncio.coroutine + def node_updated(self, node): + """ + Called when a node member of the link is updated + """ + if self._capture_node and node == self._capture_node["node"] and node.status != "started": + yield from self.stop_capture() diff --git a/tests/controller/test_link.py b/tests/controller/test_link.py index 33cdca2b..e6056f77 100644 --- a/tests/controller/test_link.py +++ b/tests/controller/test_link.py @@ -97,6 +97,7 @@ def test_add_node(async_run, project, compute): assert link.create.called link._project.controller.notification.emit.assert_called_with("link.created", link.__json__()) + assert link in node2.link def test_add_node_cloud(async_run, project, compute): @@ -302,3 +303,23 @@ def test_stop_capture(link, async_run, tmpdir, project, controller): async_run(link.stop_capture()) assert link._capturing is False controller._notification.emit.assert_called_with("link.updated", link.__json__()) + + +def test_delete(async_run, project, compute): + node1 = Node(project, compute, "node1", node_type="qemu") + node1._ports = [EthernetPort("E0", 0, 0, 4)] + + link = Link(project) + link.create = AsyncioMagicMock() + link._project.controller.notification.emit = MagicMock() + project.dump = AsyncioMagicMock() + async_run(link.add_node(node1, 0, 4)) + + node2 = Node(project, compute, "node2", node_type="qemu") + node2._ports = [EthernetPort("E0", 0, 0, 4)] + async_run(link.add_node(node2, 0, 4)) + + assert link in node2.link + + async_run(link.delete()) + assert link not in node2.link diff --git a/tests/controller/test_node.py b/tests/controller/test_node.py index 0624cdf5..1536cf70 100644 --- a/tests/controller/test_node.py +++ b/tests/controller/test_node.py @@ -474,3 +474,14 @@ def test_get_port(node): assert port.adapter_number == 1 with pytest.raises(aiohttp.web.HTTPNotFound): port = node.get_port(42, 0) + + +def test_parse_node_response(node, async_run): + """ + When a node is updated we notify the links connected to it + """ + link = MagicMock() + link.node_updated = AsyncioMagicMock() + node.add_link(link) + async_run(node.parse_node_response({"status": "started"})) + assert link.node_updated.called diff --git a/tests/controller/test_notification.py b/tests/controller/test_notification.py index b4f14ec5..5a93df93 100644 --- a/tests/controller/test_notification.py +++ b/tests/controller/test_notification.py @@ -77,7 +77,7 @@ def test_dispatch(async_run, controller, project): with notif.queue(project) as queue: assert len(notif._listeners[project.id]) == 1 async_run(queue.get(0.1)) # ping - notif.dispatch("test", {}, compute_id=1) + async_run(notif.dispatch("test", {}, compute_id=1)) msg = async_run(queue.get(5)) assert msg == ('test', {}, {}) @@ -87,7 +87,7 @@ def test_dispatch_ping(async_run, controller, project): with notif.queue(project) as queue: assert len(notif._listeners[project.id]) == 1 async_run(queue.get(0.1)) # ping - notif.dispatch("ping", {}, compute_id=12) + async_run(notif.dispatch("ping", {}, compute_id=12)) msg = async_run(queue.get(5)) assert msg == ('ping', {'compute_id': 12}, {}) @@ -102,13 +102,13 @@ def test_dispatch_node_updated(async_run, controller, node, project): with notif.queue(project) as queue: assert len(notif._listeners[project.id]) == 1 async_run(queue.get(0.1)) # ping - notif.dispatch("node.updated", { + async_run(notif.dispatch("node.updated", { "node_id": node.id, "project_id": project.id, "name": "hello", "startup_config": "ip 192" }, - compute_id=1) + compute_id=1)) assert node.name == "hello" action, event, _ = async_run(queue.get(5)) assert action == "node.updated" diff --git a/tests/controller/test_udp_link.py b/tests/controller/test_udp_link.py index 3e0d060c..023630bb 100644 --- a/tests/controller/test_udp_link.py +++ b/tests/controller/test_udp_link.py @@ -187,6 +187,37 @@ def test_choose_capture_side(async_run, project): compute2 = MagicMock() compute2.id = "local" + # Capture should run on the local node + node_vpcs = Node(project, compute1, "node1", node_type="vpcs") + node_vpcs._status = "started" + node_vpcs._ports = [EthernetPort("E0", 0, 0, 4)] + node_iou = Node(project, compute2, "node2", node_type="iou") + node_iou._status = "started" + node_iou._ports = [EthernetPort("E0", 0, 3, 1)] + + link = UDPLink(project) + link.create = AsyncioMagicMock() + async_run(link.add_node(node_vpcs, 0, 4)) + async_run(link.add_node(node_iou, 3, 1)) + + assert link._choose_capture_side()["node"] == node_iou + + # Capture should choose always running node + node_iou = Node(project, compute1, "node5", node_type="iou") + node_iou._status = "started" + node_iou._ports = [EthernetPort("E0", 0, 0, 4)] + node_switch = Node(project, compute1, "node6", node_type="ethernet_switch") + node_switch._status = "started" + node_switch._ports = [EthernetPort("E0", 0, 3, 1)] + + link = UDPLink(project) + link.create = AsyncioMagicMock() + async_run(link.add_node(node_iou, 0, 4)) + async_run(link.add_node(node_switch, 3, 1)) + + assert link._choose_capture_side()["node"] == node_switch + + # Capture should raise error if node are not started node_vpcs = Node(project, compute1, "node1", node_type="vpcs") node_vpcs._ports = [EthernetPort("E0", 0, 0, 4)] node_iou = Node(project, compute2, "node2", node_type="iou") @@ -197,36 +228,18 @@ def test_choose_capture_side(async_run, project): async_run(link.add_node(node_vpcs, 0, 4)) async_run(link.add_node(node_iou, 3, 1)) - assert link._choose_capture_side()["node"] == node_iou - - node_vpcs = Node(project, compute1, "node3", node_type="vpcs") - node_vpcs._ports = [EthernetPort("E0", 0, 0, 4)] - node_vpcs2 = Node(project, compute1, "node4", node_type="vpcs") - node_vpcs2._ports = [EthernetPort("E0", 0, 3, 1)] - - link = UDPLink(project) - link.create = AsyncioMagicMock() - async_run(link.add_node(node_vpcs, 0, 4)) - async_run(link.add_node(node_vpcs2, 3, 1)) - - # Capture should run on the local node - node_iou = Node(project, compute1, "node5", node_type="iou") - node_iou._ports = [EthernetPort("E0", 0, 0, 4)] - node_iou2 = Node(project, compute2, "node6", node_type="iou") - node_iou2._ports = [EthernetPort("E0", 0, 3, 1)] - - link = UDPLink(project) - link.create = AsyncioMagicMock() - async_run(link.add_node(node_iou, 0, 4)) - async_run(link.add_node(node_iou2, 3, 1)) - - assert link._choose_capture_side()["node"] == node_iou2 + with pytest.raises(aiohttp.web.HTTPConflict): + link._choose_capture_side() + # If you start a node you can capture on it + node_vpcs._status = "started" + assert link._choose_capture_side()["node"] == node_vpcs def test_capture(async_run, project): compute1 = MagicMock() node_vpcs = Node(project, compute1, "V1", node_type="vpcs") + node_vpcs._status = "started" node_vpcs._ports = [EthernetPort("E0", 0, 0, 4)] node_iou = Node(project, compute1, "I1", node_type="iou") node_iou._ports = [EthernetPort("E0", 0, 3, 1)] @@ -254,6 +267,7 @@ def test_read_pcap_from_source(project, async_run): compute1 = MagicMock() node_vpcs = Node(project, compute1, "V1", node_type="vpcs") + node_vpcs._status = "started" node_vpcs._ports = [EthernetPort("E0", 0, 0, 4)] node_iou = Node(project, compute1, "I1", node_type="iou") node_iou._ports = [EthernetPort("E0", 0, 3, 1)] @@ -268,3 +282,23 @@ def test_read_pcap_from_source(project, async_run): async_run(link.read_pcap_from_source()) link._capture_node["node"].compute.stream_file.assert_called_with(project, "tmp/captures/" + link._capture_file_name) + + +def test_node_updated(project, async_run): + """ + If a node stop when capturing we stop the capture + """ + compute1 = MagicMock() + node_vpcs = Node(project, compute1, "V1", node_type="vpcs") + node_vpcs._status = "started" + + link = UDPLink(project) + link._capture_node = {"node": node_vpcs} + link.stop_capture = AsyncioMagicMock() + + async_run(link.node_updated(node_vpcs)) + assert not link.stop_capture.called + + node_vpcs._status = "stopped" + async_run(link.node_updated(node_vpcs)) + assert link.stop_capture.called