1
0
mirror of https://github.com/GNS3/gns3-server synced 2024-12-01 04:38:12 +00:00

Notification feed for the hypervisor

This commit is contained in:
Julien Duponchelle 2016-03-17 15:15:30 +01:00
parent d81fc25b27
commit b55969d381
No known key found for this signature in database
GPG Key ID: F1E2485547D4595D
15 changed files with 425 additions and 144 deletions

View File

@ -202,14 +202,14 @@ upload and run code on your machine.
Notifications Notifications
============= =============
You can receive notification from the server if you listen the HTTP stream /notifications. You can receive notification from the server if you listen the HTTP stream /notifications or the websocket.
The available notification are: The available notification are:
* ping * ping
* vm.created * vm.created
* vm.started * vm.started
* vm.stopped * vm.stopped
* log.error * log.error
Previous versions Previous versions
================= =================

View File

@ -29,6 +29,7 @@ from .vmware_handler import VMwareHandler
from .config_handler import ConfigHandler from .config_handler import ConfigHandler
from .file_handler import FileHandler from .file_handler import FileHandler
from .version_handler import VersionHandler from .version_handler import VersionHandler
from .notification_handler import NotificationHandler
if sys.platform.startswith("linux") or hasattr(sys, "_called_from_test") or os.environ.get("PYTEST_BUILD_DOCUMENTATION") == "1": if sys.platform.startswith("linux") or hasattr(sys, "_called_from_test") or os.environ.get("PYTEST_BUILD_DOCUMENTATION") == "1":

View File

@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
from ....web.route import Route
from ....hypervisor.notification_manager import NotificationManager
from aiohttp.web import WebSocketResponse
class NotificationHandler:
@classmethod
@Route.get(
r"/notifications/ws",
description="Send notifications about what happend using websockets")
def notifications(request, response):
notifications = NotificationManager.instance()
ws = WebSocketResponse()
yield from ws.prepare(request)
with notifications.queue() as queue:
while True:
notif = yield from queue.get_json(5)
ws.send_str(notif)
return ws

View File

@ -0,0 +1,118 @@
#!/usr/bin/env python
#
# Copyright (C) 2016 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import psutil
import json
from contextlib import contextmanager
class NotificationQueue(asyncio.Queue):
"""
Queue returned by the notification manager.
"""
def __init__(self):
super().__init__()
self._first = True
@asyncio.coroutine
def get(self, timeout):
"""
When timeout is expire we send a ping notification with server informations
"""
# At first get we return a ping so the client receive immediately data
if self._first:
self._first = False
return ("ping", self._getPing(), {})
try:
(action, msg, kwargs) = yield from asyncio.wait_for(super().get(), timeout)
except asyncio.futures.TimeoutError:
return ("ping", self._getPing(), {})
return (action, msg, kwargs)
def _getPing(self):
"""
Return the content of the ping notification
"""
msg = {}
# Non blocking call in order to get cpu usage. First call will return 0
msg["cpu_usage_percent"] = psutil.cpu_percent(interval=None)
msg["memory_usage_percent"] = psutil.virtual_memory().percent
return msg
@asyncio.coroutine
def get_json(self, timeout):
"""
Get a message as a JSON
"""
(action, msg, kwargs) = yield from self.get(timeout)
if hasattr(msg, "__json__"):
msg = {"action": action, "event": msg.__json__()}
else:
msg = {"action": action, "event": msg}
msg.update(kwargs)
return json.dumps(msg, sort_keys=True)
class NotificationManager:
"""
Manage the notification queue where the controller
will connect to get notifications from hypervisors
"""
def __init__(self):
self._listeners = set()
@contextmanager
def queue(self):
"""
Get a queue of notifications
Use it with Python with
"""
queue = NotificationQueue()
self._listeners.add(queue)
yield queue
self._listeners.remove(queue)
def emit(self, action, event, **kwargs):
"""
Send an event to all the client listening for notifications
:param action: Action name
:param event: Event to send
:param kwargs: Add this meta to the notif (project_id for example)
"""
for listener in self._listeners:
listener.put_nowait((action, event, kwargs))
@staticmethod
def reset():
NotificationManager._instance = None
@staticmethod
def instance():
"""
Singleton to return only on instance of NotificationManager.
:returns: instance of NotificationManager
"""
if not hasattr(NotificationManager, '_instance') or NotificationManager._instance is None:
NotificationManager._instance = NotificationManager()
return NotificationManager._instance

View File

@ -23,6 +23,7 @@ import hashlib
from uuid import UUID, uuid4 from uuid import UUID, uuid4
from .port_manager import PortManager from .port_manager import PortManager
from .notification_manager import NotificationManager
from ..config import Config from ..config import Config
from ..utils.asyncio import wait_run_in_executor from ..utils.asyncio import wait_run_in_executor
@ -56,9 +57,6 @@ class Project:
self._used_tcp_ports = set() self._used_tcp_ports = set()
self._used_udp_ports = set() self._used_udp_ports = set()
# clients listening for notifications
self._listeners = set()
if path is None: if path is None:
location = self._config().get("project_directory", self._get_default_project_directory()) location = self._config().get("project_directory", self._get_default_project_directory())
path = os.path.join(location, self._id) path = os.path.join(location, self._id)
@ -422,28 +420,7 @@ class Project:
:param action: Action name :param action: Action name
:param event: Event to send :param event: Event to send
""" """
for listener in self._listeners: NotificationManager.instance().emit(action, event, project_id=self.id)
listener.put_nowait((action, event, ))
def get_listen_queue(self):
"""Get a queue where you receive all the events related to the
project."""
queue = asyncio.Queue()
self._listeners.add(queue)
return queue
def stop_listen_queue(self, queue):
"""Stop sending notification to this clients"""
self._listeners.remove(queue)
@property
def listeners(self):
"""
List of current clients listening for event in this projects
"""
return self._listeners
@asyncio.coroutine @asyncio.coroutine
def list_files(self): def list_files(self):

View File

@ -1,7 +1,22 @@
{% extends "layout.html" %} {% extends "layout.html" %}
{% block head %}
<script>
var socket = new WebSocket("ws://" + location.host + "/v2/hypervisor/notifications/ws");
socket.onopen = function (event) {
document.getElementById("notifications").innerText = "Connected";
};
socket.onmessage = function (event) {
document.getElementById("notifications").innerText = event.data + "\n" + document.getElementById("notifications").innerText;
};
</script>
{% endblock %}
{% block body %} {% block body %}
<h1> <h1>
Server status Hypervisor status
</h1> </h1>
The purpose of this page is to help for GNS3 debug. This can be dropped The purpose of this page is to help for GNS3 debug. This can be dropped
in futur GNS3 versions. in futur GNS3 versions.
@ -39,4 +54,9 @@ in futur GNS3 versions.
<li>{{port}}</li> <li>{{port}}</li>
{% endfor %} {% endfor %}
</ul> </ul>
<h2>Notifications</h2>
<div id="notifications">
</div>
{% endblock %} {% endblock %}

View File

@ -1,9 +1,7 @@
<html> <html>
<head> <head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<script> {% block head %}{% endblock %}
{% block script %}{% endblock %}
</script>
<title>GNS3 Server</title> <title>GNS3 Server</title>
</head> </head>
<body> <body>

View File

@ -1,5 +1,7 @@
{% extends "layout.html" %} {% extends "layout.html" %}
{% block script %}
{% block head %}
<script>
function onSubmit() { function onSubmit() {
if (document.getElementById("uploadInput").files == undefined) { if (document.getElementById("uploadInput").files == undefined) {
//OLD browser //OLD browser
@ -15,7 +17,9 @@ function onSubmit() {
} }
return true; return true;
} }
</script>
{% endblock %} {% endblock %}
{% block body %} {% block body %}
<h1>Select & Upload an image for GNS3</h1> <h1>Select & Upload an image for GNS3</h1>
<form enctype="multipart/form-data" action="/upload" method="post" onSubmit="return onSubmit()"> <form enctype="multipart/form-data" action="/upload" method="post" onSubmit="return onSubmit()">

View File

@ -40,6 +40,11 @@ class Query:
self._host = host self._host = host
self._prefix = prefix self._prefix = prefix
self._api_version = api_version self._api_version = api_version
self._session = None
@asyncio.coroutine
def close(self):
yield from self._session.close()
def post(self, path, body={}, **kwargs): def post(self, path, body={}, **kwargs):
return self._fetch("POST", path, body, **kwargs) return self._fetch("POST", path, body, **kwargs)
@ -58,6 +63,20 @@ class Query:
return "http://{}:{}{}{}".format(self._host, self._port, self._prefix, path) return "http://{}:{}{}{}".format(self._host, self._port, self._prefix, path)
return "http://{}:{}/v{}{}{}".format(self._host, self._port, self._api_version, self._prefix, path) return "http://{}:{}/v{}{}{}".format(self._host, self._port, self._api_version, self._prefix, path)
def websocket(self, path):
"""
Return a websocket connected to the path
"""
self._session = aiohttp.ClientSession()
@asyncio.coroutine
def go_request(future):
response = yield from self._session.ws_connect(self.get_url(path))
future.set_result(response)
future = asyncio.Future()
asyncio.async(go_request(future))
self._loop.run_until_complete(future)
return future.result()
def _fetch(self, method, path, body=None, **kwargs): def _fetch(self, method, path, body=None, **kwargs):
"""Fetch an url, parse the JSON and return response """Fetch an url, parse the JSON and return response

View File

@ -0,0 +1,35 @@
#!/usr/bin/env python
#
# Copyright (C) 2016 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
from gns3server.hypervisor.notification_manager import NotificationManager
def test_notification_ws(http_hypervisor, async_run):
ws = http_hypervisor.websocket("/notifications/ws")
answer = async_run(ws.receive())
answer = json.loads(answer.data)
assert answer["action"] == "ping"
NotificationManager.instance().emit("test", {})
answer = async_run(ws.receive())
answer = json.loads(answer.data)
assert answer["action"] == "test"
async_run(http_hypervisor.close())

View File

@ -199,47 +199,6 @@ def test_close_project_invalid_uuid(http_hypervisor):
assert response.status == 404 assert response.status == 404
def test_notification(http_hypervisor, project, loop):
@asyncio.coroutine
def go(future):
response = yield from aiohttp.request("GET", http_hypervisor.get_url("/projects/{project_id}/notifications".format(project_id=project.id)))
response.body = yield from response.content.read(200)
project.emit("vm.created", {"a": "b"})
response.body += yield from response.content.read(50)
response.close()
future.set_result(response)
future = asyncio.Future()
asyncio.async(go(future))
response = loop.run_until_complete(future)
assert response.status == 200
assert b'"action": "ping"' in response.body
assert b'"cpu_usage_percent"' in response.body
assert b'{"action": "vm.created", "event": {"a": "b"}}\n' in response.body
def test_notification_invalid_id(http_hypervisor):
response = http_hypervisor.get("/projects/{project_id}/notifications".format(project_id=uuid.uuid4()))
assert response.status == 404
def test_list_files(http_hypervisor, project):
files = [
{
"path": "test.txt",
"md5sum": "ad0234829205b9033196ba818f7a872b"
},
{
"path": "vm-1/dynamips/test.bin",
"md5sum": "098f6bcd4621d373cade4e832627b4f6"
}
]
with asyncio_patch("gns3server.hypervisor.project.Project.list_files", return_value=files) as mock:
response = http_hypervisor.get("/projects/{project_id}/files".format(project_id=project.id), example=True)
assert response.status == 200
assert response.json == files
def test_get_file(http_hypervisor, tmpdir): def test_get_file(http_hypervisor, tmpdir):
with patch("gns3server.config.Config.get_section_config", return_value={"project_directory": str(tmpdir)}): with patch("gns3server.config.Config.get_section_config", return_value={"project_directory": str(tmpdir)}):

View File

@ -32,6 +32,7 @@ from gns3server.hypervisor.qemu.qemu_vm import QemuVM
from gns3server.hypervisor.qemu.qemu_error import QemuError from gns3server.hypervisor.qemu.qemu_error import QemuError
from gns3server.hypervisor.qemu import Qemu from gns3server.hypervisor.qemu import Qemu
from gns3server.utils import force_unix_path from gns3server.utils import force_unix_path
from gns3server.hypervisor.notification_manager import NotificationManager
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
@ -136,39 +137,41 @@ def test_stop(loop, vm, running_subprocess_mock):
process.terminate.assert_called_with() process.terminate.assert_called_with()
def test_termination_callback(vm): def test_termination_callback(vm, async_run):
vm.status = "started" vm.status = "started"
queue = vm.project.get_listen_queue()
with NotificationManager.instance().queue() as queue:
vm._termination_callback(0) vm._termination_callback(0)
assert vm.status == "stopped" assert vm.status == "stopped"
(action, event) = queue.get_nowait() async_run(queue.get(0)) # Ping
(action, event, kwargs) = async_run(queue.get(0))
assert action == "vm.stopped" assert action == "vm.stopped"
assert event == vm assert event == vm
with pytest.raises(asyncio.queues.QueueEmpty):
queue.get_nowait()
def test_termination_callback_error(vm, tmpdir, async_run):
def test_termination_callback_error(vm, tmpdir):
with open(str(tmpdir / "qemu.log"), "w+") as f: with open(str(tmpdir / "qemu.log"), "w+") as f:
f.write("BOOMM") f.write("BOOMM")
vm.status = "started" vm.status = "started"
vm._stdout_file = str(tmpdir / "qemu.log") vm._stdout_file = str(tmpdir / "qemu.log")
queue = vm.project.get_listen_queue()
with NotificationManager.instance().queue() as queue:
vm._termination_callback(1) vm._termination_callback(1)
assert vm.status == "stopped" assert vm.status == "stopped"
(action, event) = queue.get_nowait()
async_run(queue.get(0)) # Ping
(action, event, kwargs) = queue.get_nowait()
assert action == "vm.stopped" assert action == "vm.stopped"
assert event == vm assert event == vm
(action, event) = queue.get_nowait() (action, event, kwargs) = queue.get_nowait()
assert action == "log.error" assert action == "log.error"
assert event["message"] == "QEMU process has stopped, return code: 1\nBOOMM" assert event["message"] == "QEMU process has stopped, return code: 1\nBOOMM"

View File

@ -0,0 +1,89 @@
#!/usr/bin/env python
#
# Copyright (C) 2016 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import uuid
from gns3server.hypervisor.notification_manager import NotificationManager
def test_queue(async_run):
NotificationManager.reset()
notifications = NotificationManager.instance()
with notifications.queue() as queue:
assert len(notifications._listeners) == 1
res = async_run(queue.get(5))
assert res[0] == "ping"
notifications.emit("test", {"a": 1})
res = async_run(queue.get(5))
assert res == ('test', {"a": 1}, {})
assert len(notifications._listeners) == 0
def test_queue_json(async_run):
NotificationManager.reset()
notifications = NotificationManager.instance()
with notifications.queue() as queue:
assert len(notifications._listeners) == 1
res = async_run(queue.get(5))
assert "ping" in res
notifications.emit("test", {"a": 1})
res = async_run(queue.get_json(5))
assert res == '{"action": "test", "event": {"a": 1}}'
assert len(notifications._listeners) == 0
def test_queue_json_meta(async_run):
NotificationManager.reset()
project_id = str(uuid.uuid4())
notifications = NotificationManager.instance()
with notifications.queue() as queue:
assert len(notifications._listeners) == 1
res = async_run(queue.get(5))
assert "ping" in res
notifications.emit("test", {"a": 1}, project_id=project_id)
res = async_run(queue.get_json(5))
assert res == '{"action": "test", "event": {"a": 1}, "project_id": "' + project_id + '"}'
assert len(notifications._listeners) == 0
def test_queue_ping(async_run):
"""
If we don't send a message during a long time (0.5 seconds)
a ping is send
"""
NotificationManager.reset()
notifications = NotificationManager.instance()
with notifications.queue() as queue:
assert len(notifications._listeners) == 1
res = async_run(queue.get(5))
assert res[0] == "ping"
res = async_run(queue.get(0.5))
assert res[0] == "ping"
assert res[1]["cpu_usage_percent"] is not None
assert len(notifications._listeners) == 0

View File

@ -25,6 +25,7 @@ from unittest.mock import patch
from tests.utils import asyncio_patch from tests.utils import asyncio_patch
from gns3server.hypervisor.project import Project from gns3server.hypervisor.project import Project
from gns3server.hypervisor.notification_manager import NotificationManager
from gns3server.hypervisor.vpcs import VPCS, VPCSVM from gns3server.hypervisor.vpcs import VPCS, VPCSVM
from gns3server.config import Config from gns3server.config import Config
@ -256,3 +257,15 @@ def test_list_files(tmpdir, loop):
"md5sum": "098f6bcd4621d373cade4e832627b4f6" "md5sum": "098f6bcd4621d373cade4e832627b4f6"
} }
] ]
def test_emit(async_run):
with NotificationManager.instance().queue() as queue:
(action, event, context) = async_run(queue.get(0.5)) # Ping
project = Project(project_id=str(uuid4()))
project.emit("test", {})
(action, event, context) = async_run(queue.get(0.5))
assert action == "test"
assert context["project_id"] == project.id

View File

@ -28,6 +28,7 @@ from unittest.mock import patch, MagicMock
from gns3server.hypervisor.vpcs.vpcs_vm import VPCSVM from gns3server.hypervisor.vpcs.vpcs_vm import VPCSVM
from gns3server.hypervisor.vpcs.vpcs_error import VPCSError from gns3server.hypervisor.vpcs.vpcs_error import VPCSError
from gns3server.hypervisor.vpcs import VPCS from gns3server.hypervisor.vpcs import VPCS
from gns3server.hypervisor.notification_manager import NotificationManager
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
@ -82,10 +83,13 @@ def test_vm_invalid_vpcs_path(vm, manager, loop):
assert vm.id == "00010203-0405-0607-0809-0a0b0c0d0e0e" assert vm.id == "00010203-0405-0607-0809-0a0b0c0d0e0e"
def test_start(loop, vm): def test_start(loop, vm, async_run):
process = MagicMock() process = MagicMock()
process.returncode = None process.returncode = None
queue = vm.project.get_listen_queue()
with NotificationManager.instance().queue() as queue:
async_run(queue.get(0)) # Ping
with asyncio_patch("gns3server.hypervisor.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True): with asyncio_patch("gns3server.hypervisor.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec: with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec:
@ -108,7 +112,7 @@ def test_start(loop, vm):
'127.0.0.1') '127.0.0.1')
assert vm.is_running() assert vm.is_running()
assert vm.command_line == ' '.join(mock_exec.call_args[0]) assert vm.command_line == ' '.join(mock_exec.call_args[0])
(action, event) = queue.get_nowait() (action, event, kwargs) = async_run(queue.get(0))
assert action == "vm.started" assert action == "vm.started"
assert event == vm assert event == vm
@ -120,7 +124,6 @@ def test_start_0_6_1(loop, vm):
""" """
process = MagicMock() process = MagicMock()
process.returncode = None process.returncode = None
queue = vm.project.get_listen_queue()
vm._vpcs_version = parse_version("0.6.1") vm._vpcs_version = parse_version("0.6.1")
with asyncio_patch("gns3server.hypervisor.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True): with asyncio_patch("gns3server.hypervisor.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
@ -142,12 +145,10 @@ def test_start_0_6_1(loop, vm):
'-t', '-t',
'127.0.0.1') '127.0.0.1')
assert vm.is_running() assert vm.is_running()
(action, event) = queue.get_nowait()
assert action == "vm.started"
assert event == vm
def test_stop(loop, vm):
def test_stop(loop, vm, async_run):
process = MagicMock() process = MagicMock()
# Wait process kill success # Wait process kill success
@ -156,6 +157,7 @@ def test_stop(loop, vm):
process.wait.return_value = future process.wait.return_value = future
process.returncode = None process.returncode = None
with NotificationManager.instance().queue() as queue:
with asyncio_patch("gns3server.hypervisor.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True): with asyncio_patch("gns3server.hypervisor.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process): with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
nio = VPCS.instance().create_nio(vm.vpcs_path, {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"}) nio = VPCS.instance().create_nio(vm.vpcs_path, {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
@ -164,7 +166,6 @@ def test_stop(loop, vm):
loop.run_until_complete(asyncio.async(vm.start())) loop.run_until_complete(asyncio.async(vm.start()))
assert vm.is_running() assert vm.is_running()
queue = vm.project.get_listen_queue()
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"): with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
loop.run_until_complete(asyncio.async(vm.stop())) loop.run_until_complete(asyncio.async(vm.stop()))
@ -175,7 +176,10 @@ def test_stop(loop, vm):
else: else:
process.terminate.assert_called_with() process.terminate.assert_called_with()
(action, event) = queue.get_nowait() async_run(queue.get(0)) # Ping
async_run(queue.get(0)) # Started
(action, event, kwargs) = async_run(queue.get(0))
assert action == "vm.stopped" assert action == "vm.stopped"
assert event == vm assert event == vm