mirror of
https://github.com/GNS3/gns3-server
synced 2025-04-14 14:55:44 +00:00
Merge branch 'listen_notification' into unstable
This commit is contained in:
commit
e4ced882fd
@ -15,14 +15,23 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
from ...web.route import Route
|
||||
from ...schemas.project import PROJECT_OBJECT_SCHEMA, PROJECT_CREATE_SCHEMA, PROJECT_UPDATE_SCHEMA
|
||||
from ...modules.project_manager import ProjectManager
|
||||
from ...modules import MODULES
|
||||
|
||||
import logging
|
||||
log = logging.getLogger()
|
||||
|
||||
|
||||
class ProjectHandler:
|
||||
|
||||
# How many clients has subcribe to notifications
|
||||
_notifications_listening = 0
|
||||
|
||||
@classmethod
|
||||
@Route.post(
|
||||
r"/projects",
|
||||
@ -123,8 +132,9 @@ class ProjectHandler:
|
||||
|
||||
pm = ProjectManager.instance()
|
||||
project = pm.get_project(request.match_info["project_id"])
|
||||
yield from project.close()
|
||||
pm.remove_project(project.id)
|
||||
if ProjectHandler._notifications_listening == 0:
|
||||
yield from project.close()
|
||||
pm.remove_project(project.id)
|
||||
response.set_status(204)
|
||||
|
||||
@classmethod
|
||||
@ -145,3 +155,45 @@ class ProjectHandler:
|
||||
yield from project.delete()
|
||||
pm.remove_project(project.id)
|
||||
response.set_status(204)
|
||||
|
||||
@classmethod
|
||||
@Route.get(
|
||||
r"/projects/{project_id}/notifications",
|
||||
description="Receive notifications about the projects",
|
||||
parameters={
|
||||
"project_id": "The UUID of the project",
|
||||
},
|
||||
status_codes={
|
||||
200: "End of stream",
|
||||
404: "The project doesn't exist"
|
||||
})
|
||||
def notification(request, response):
|
||||
|
||||
pm = ProjectManager.instance()
|
||||
project = pm.get_project(request.match_info["project_id"])
|
||||
|
||||
response.content_type = "application/json"
|
||||
response.set_status(200)
|
||||
response.enable_chunked_encoding()
|
||||
# Very important: do not send a content lenght otherwise QT close the connection but curl can consume the Feed
|
||||
response.content_length = None
|
||||
|
||||
response.start(request)
|
||||
queue = project.get_listen_queue()
|
||||
ProjectHandler._notifications_listening += 1
|
||||
response.write("{\"action\": \"ping\"}\n".encode("utf-8"))
|
||||
while True:
|
||||
try:
|
||||
(action, msg) = yield from asyncio.wait_for(queue.get(), 5)
|
||||
if hasattr(msg, "__json__"):
|
||||
msg = json.dumps({"action": action, "event": msg.__json__()}, sort_keys=True)
|
||||
else:
|
||||
msg = json.dumps({"action": action, "event": msg}, sort_keys=True)
|
||||
log.debug("Send notification: %s", msg)
|
||||
response.write(("{}\n".format(msg)).encode("utf-8"))
|
||||
except asyncio.futures.CancelledError as e:
|
||||
break
|
||||
except asyncio.futures.TimeoutError as e:
|
||||
response.write("{\"action\": \"ping\"}\n".encode("utf-8"))
|
||||
project.stop_listen_queue(queue)
|
||||
ProjectHandler._notifications_listening -= 1
|
||||
|
@ -48,6 +48,7 @@ class BaseVM:
|
||||
self._manager = manager
|
||||
self._console = console
|
||||
self._temporary_directory = None
|
||||
self._vm_status = "stopped"
|
||||
|
||||
if self._console is not None:
|
||||
self._console = self._manager.port_manager.reserve_tcp_port(self._console, self._project)
|
||||
@ -66,6 +67,18 @@ class BaseVM:
|
||||
if os.path.exists(self._temporary_directory):
|
||||
shutil.rmtree(self._temporary_directory, ignore_errors=True)
|
||||
|
||||
@property
|
||||
def status(self):
|
||||
"""Return current VM status"""
|
||||
|
||||
return self._vm_status
|
||||
|
||||
@status.setter
|
||||
def status(self, status):
|
||||
|
||||
self._vm_status = status
|
||||
self._project.emit("vm.{}".format(status), self)
|
||||
|
||||
@property
|
||||
def project(self):
|
||||
"""
|
||||
|
@ -70,6 +70,16 @@ class Hypervisor(DynamipsHypervisor):
|
||||
|
||||
return self._id
|
||||
|
||||
@property
|
||||
def process(self):
|
||||
"""
|
||||
Returns the subprocess of the Hypervisor
|
||||
|
||||
:returns: subprocess
|
||||
"""
|
||||
|
||||
return self._process
|
||||
|
||||
@property
|
||||
def started(self):
|
||||
"""
|
||||
|
@ -35,7 +35,8 @@ from ...base_vm import BaseVM
|
||||
from ..dynamips_error import DynamipsError
|
||||
from ..nios.nio_udp import NIOUDP
|
||||
|
||||
from gns3server.utils.asyncio import wait_run_in_executor
|
||||
from gns3server.config import Config
|
||||
from gns3server.utils.asyncio import wait_run_in_executor, monitor_process
|
||||
|
||||
|
||||
class Router(BaseVM):
|
||||
@ -162,7 +163,7 @@ class Router(BaseVM):
|
||||
slot_number += 1
|
||||
|
||||
# add the wics
|
||||
if self._slots[0] and self._slots[0].wics:
|
||||
if len(self._slots) > 0 and self._slots[0] and self._slots[0].wics:
|
||||
for wic_slot_number in range(0, len(self._slots[0].wics)):
|
||||
if self._slots[0].wics[wic_slot_number]:
|
||||
router_info["wic" + str(wic_slot_number)] = str(self._slots[0].wics[wic_slot_number])
|
||||
@ -251,7 +252,20 @@ class Router(BaseVM):
|
||||
raise DynamipsError('"{}" is not a valid IOS image'.format(self._image))
|
||||
|
||||
yield from self._hypervisor.send('vm start "{name}"'.format(name=self._name))
|
||||
self.status = "started"
|
||||
log.info('router "{name}" [{id}] has been started'.format(name=self._name, id=self._id))
|
||||
monitor_process(self._hypervisor.process, self._termination_callback)
|
||||
|
||||
@asyncio.coroutine
|
||||
def _termination_callback(self, returncode):
|
||||
"""
|
||||
Called when the process has stopped.
|
||||
|
||||
:param returncode: Process returncode
|
||||
"""
|
||||
|
||||
log.info("Dynamips hypervisor process has stopped, return code: %d", returncode)
|
||||
self.status = "stopped"
|
||||
|
||||
@asyncio.coroutine
|
||||
def stop(self):
|
||||
@ -262,6 +276,7 @@ class Router(BaseVM):
|
||||
status = yield from self.get_status()
|
||||
if status != "inactive":
|
||||
yield from self._hypervisor.send('vm stop "{name}"'.format(name=self._name))
|
||||
self.status = "stopped"
|
||||
log.info('Router "{name}" [{id}] has been stopped'.format(name=self._name, id=self._id))
|
||||
|
||||
@asyncio.coroutine
|
||||
|
@ -17,7 +17,7 @@
|
||||
|
||||
"""
|
||||
IOU VM management (creates command line, processes, files etc.) in
|
||||
order to run an IOU VM.
|
||||
order to run an IOU instance.
|
||||
"""
|
||||
|
||||
import os
|
||||
@ -54,16 +54,32 @@ class IOUVM(BaseVM):
|
||||
module_name = 'iou'
|
||||
|
||||
"""
|
||||
IOU VM implementation.
|
||||
IOU vm implementation.
|
||||
|
||||
:param name: IOU VM name
|
||||
:param vm_id: IOU VM identifier
|
||||
:param name: name of this IOU vm
|
||||
:param vm_id: IOU instance identifier
|
||||
:param project: Project instance
|
||||
:param manager: Manager instance
|
||||
:param manager: parent VM Manager
|
||||
:param console: TCP console port
|
||||
:params ethernet_adapters: Number of ethernet adapters
|
||||
:params serial_adapters: Number of serial adapters
|
||||
:params ram: Ram MB
|
||||
:params nvram: Nvram KB
|
||||
:params l1_keepalives: Always up ethernet interface:
|
||||
:params initial_config: Content of the initial configuration file
|
||||
:params iourc_content: Content of the iourc file if no licence is installed on server
|
||||
"""
|
||||
|
||||
def __init__(self, name, vm_id, project, manager, console=None):
|
||||
def __init__(self, name, vm_id, project, manager,
|
||||
console=None,
|
||||
ram=None,
|
||||
nvram=None,
|
||||
use_default_iou_values=None,
|
||||
ethernet_adapters=None,
|
||||
serial_adapters=None,
|
||||
l1_keepalives=None,
|
||||
initial_config=None,
|
||||
iourc_content=None):
|
||||
|
||||
super().__init__(name, vm_id, project, manager, console=console)
|
||||
|
||||
@ -78,19 +94,20 @@ class IOUVM(BaseVM):
|
||||
# IOU settings
|
||||
self._ethernet_adapters = []
|
||||
self._serial_adapters = []
|
||||
self.ethernet_adapters = 2 # one adapter = 4 interfaces
|
||||
self.serial_adapters = 2 # one adapter = 4 interfaces
|
||||
self._use_default_iou_values = True # for RAM & NVRAM values
|
||||
self._nvram = 128 # Kilobytes
|
||||
self.ethernet_adapters = 2 if ethernet_adapters is None else ethernet_adapters # one adapter = 4 interfaces
|
||||
self.serial_adapters = 2 if serial_adapters is None else serial_adapters # one adapter = 4 interfaces
|
||||
self._use_default_iou_values = True if use_default_iou_values is None else use_default_iou_values # for RAM & NVRAM values
|
||||
self._nvram = 128 if nvram is None else nvram # Kilobytes
|
||||
self._initial_config = ""
|
||||
self._ram = 256 # Megabytes
|
||||
self._l1_keepalives = False # used to overcome the always-up Ethernet interfaces (not supported by all IOSes).
|
||||
self._ram = 256 if ram is None else ram # Megabytes
|
||||
self._l1_keepalives = False if l1_keepalives is None else l1_keepalives # used to overcome the always-up Ethernet interfaces (not supported by all IOSes).
|
||||
|
||||
self.iourc_content = iourc_content
|
||||
if initial_config is not None:
|
||||
self.initial_config = initial_config
|
||||
|
||||
@asyncio.coroutine
|
||||
def close(self):
|
||||
"""
|
||||
Closes this IOU VM.
|
||||
"""
|
||||
|
||||
log.debug('IOU "{name}" [{id}] is closing'.format(name=self._name, id=self._id))
|
||||
|
||||
@ -109,33 +126,26 @@ class IOUVM(BaseVM):
|
||||
|
||||
@property
|
||||
def path(self):
|
||||
"""
|
||||
Path of the IOU executable.
|
||||
|
||||
:returns: path to the IOU image executable
|
||||
"""
|
||||
"""Path of the iou binary"""
|
||||
|
||||
return self._path
|
||||
|
||||
@path.setter
|
||||
def path(self, path):
|
||||
"""
|
||||
Path of the IOU executable.
|
||||
Path of the iou binary
|
||||
|
||||
:param path: path to the IOU image executable
|
||||
:params path: Path to the binary
|
||||
"""
|
||||
|
||||
self._path = self.manager.get_abs_image_path(path)
|
||||
|
||||
# In 1.2 users uploaded images to the images roots
|
||||
# after the migration their images are inside images/IOU
|
||||
# but old topologies use old path
|
||||
if "IOU" not in self._path:
|
||||
location, filename = os.path.split(self._path)
|
||||
fix_path = os.path.join(location, "IOU", filename)
|
||||
if os.path.isfile(fix_path):
|
||||
self._path = fix_path
|
||||
if not os.path.isabs(path):
|
||||
server_config = self.manager.config.get_section_config("Server")
|
||||
relative_path = os.path.join(os.path.expanduser(server_config.get("images_path", "~/GNS3/images")), path)
|
||||
if not os.path.exists(relative_path):
|
||||
relative_path = os.path.join(os.path.expanduser(server_config.get("images_path", "~/GNS3/images")), "IOU", path)
|
||||
path = relative_path
|
||||
|
||||
self._path = path
|
||||
if not os.path.isfile(self._path) or not os.path.exists(self._path):
|
||||
if os.path.islink(self._path):
|
||||
raise IOUError("IOU image '{}' linked to '{}' is not accessible".format(self._path, os.path.realpath(self._path)))
|
||||
@ -161,7 +171,6 @@ class IOUVM(BaseVM):
|
||||
def use_default_iou_values(self):
|
||||
"""
|
||||
Returns if this device uses the default IOU image values.
|
||||
|
||||
:returns: boolean
|
||||
"""
|
||||
|
||||
@ -171,30 +180,28 @@ class IOUVM(BaseVM):
|
||||
def use_default_iou_values(self, state):
|
||||
"""
|
||||
Sets if this device uses the default IOU image values.
|
||||
|
||||
:param state: boolean
|
||||
"""
|
||||
|
||||
self._use_default_iou_values = state
|
||||
if state:
|
||||
log.info('IOU "{name}" [{id}]: uses the default IOU image values'.format(name=self._name, id=self._id))
|
||||
log.info("IOU {name} [id={id}]: uses the default IOU image values".format(name=self._name, id=self._id))
|
||||
else:
|
||||
log.info('IOU "{name}" [{id}]: does not use the default IOU image values'.format(name=self._name, id=self._id))
|
||||
log.info("IOU {name} [id={id}]: does not use the default IOU image values".format(name=self._name, id=self._id))
|
||||
|
||||
def _check_requirements(self):
|
||||
"""
|
||||
Checks if IOUYAP executable is available.
|
||||
Check if IOUYAP is available
|
||||
"""
|
||||
|
||||
path = self.iouyap_path
|
||||
if not path:
|
||||
raise IOUError("No path to iouyap program has been set")
|
||||
raise IOUError("No path to a IOU executable has been set")
|
||||
|
||||
if not os.path.isfile(path):
|
||||
raise IOUError("iouyap program '{}' is not accessible".format(path))
|
||||
raise IOUError("IOU program '{}' is not accessible".format(path))
|
||||
|
||||
if not os.access(path, os.X_OK):
|
||||
raise IOUError("iouyap program '{}' is not executable".format(path))
|
||||
raise IOUError("IOU program '{}' is not executable".format(path))
|
||||
|
||||
def __json__(self):
|
||||
|
||||
@ -209,11 +216,15 @@ class IOUVM(BaseVM):
|
||||
"nvram": self._nvram,
|
||||
"l1_keepalives": self._l1_keepalives,
|
||||
"initial_config": self.relative_initial_config_file,
|
||||
"iourc_path": self.iourc_path,
|
||||
"use_default_iou_values": self._use_default_iou_values}
|
||||
"use_default_iou_values": self._use_default_iou_values,
|
||||
"iourc_path": self.iourc_path}
|
||||
|
||||
# return the relative path if the IOU image is in the images_path directory
|
||||
iou_vm_info["path"] = self.manager.get_relative_image_path(self.path)
|
||||
server_config = self.manager.config.get_section_config("Server")
|
||||
relative_image = os.path.join(os.path.expanduser(server_config.get("images_path", "~/GNS3/images")), "IOU", self.path)
|
||||
if os.path.exists(relative_image):
|
||||
iou_vm_info["path"] = os.path.basename(self.path)
|
||||
|
||||
return iou_vm_info
|
||||
|
||||
@property
|
||||
@ -232,7 +243,7 @@ class IOUVM(BaseVM):
|
||||
@property
|
||||
def iourc_path(self):
|
||||
"""
|
||||
Returns the IOURC file path.
|
||||
Returns the IOURC path.
|
||||
|
||||
:returns: path to IOURC
|
||||
"""
|
||||
@ -256,9 +267,8 @@ class IOUVM(BaseVM):
|
||||
@property
|
||||
def ram(self):
|
||||
"""
|
||||
Returns the amount of RAM allocated to this IOU VM.
|
||||
|
||||
:returns: amount of RAM in MBytes (integer)
|
||||
Returns the amount of RAM allocated to this IOU instance.
|
||||
:returns: amount of RAM in Mbytes (integer)
|
||||
"""
|
||||
|
||||
return self._ram
|
||||
@ -267,17 +277,16 @@ class IOUVM(BaseVM):
|
||||
def ram(self, ram):
|
||||
"""
|
||||
Sets amount of RAM allocated to this IOU instance.
|
||||
|
||||
:param ram: amount of RAM in MBytes (integer)
|
||||
:param ram: amount of RAM in Mbytes (integer)
|
||||
"""
|
||||
|
||||
if self._ram == ram:
|
||||
return
|
||||
|
||||
log.info('IOU "{name}" [{id}]: RAM updated from {old_ram}MB to {new_ram}MB'.format(name=self._name,
|
||||
id=self._id,
|
||||
old_ram=self._ram,
|
||||
new_ram=ram))
|
||||
log.info("IOU {name} [id={id}]: RAM updated from {old_ram}MB to {new_ram}MB".format(name=self._name,
|
||||
id=self._id,
|
||||
old_ram=self._ram,
|
||||
new_ram=ram))
|
||||
|
||||
self._ram = ram
|
||||
|
||||
@ -285,8 +294,7 @@ class IOUVM(BaseVM):
|
||||
def nvram(self):
|
||||
"""
|
||||
Returns the mount of NVRAM allocated to this IOU instance.
|
||||
|
||||
:returns: amount of NVRAM in KBytes (integer)
|
||||
:returns: amount of NVRAM in Kbytes (integer)
|
||||
"""
|
||||
|
||||
return self._nvram
|
||||
@ -295,42 +303,39 @@ class IOUVM(BaseVM):
|
||||
def nvram(self, nvram):
|
||||
"""
|
||||
Sets amount of NVRAM allocated to this IOU instance.
|
||||
|
||||
:param nvram: amount of NVRAM in KBytes (integer)
|
||||
:param nvram: amount of NVRAM in Kbytes (integer)
|
||||
"""
|
||||
|
||||
if self._nvram == nvram:
|
||||
return
|
||||
|
||||
log.info('IOU "{name}" [{id}]: NVRAM updated from {old_nvram}KB to {new_nvram}KB'.format(name=self._name,
|
||||
id=self._id,
|
||||
old_nvram=self._nvram,
|
||||
new_nvram=nvram))
|
||||
log.info("IOU {name} [id={id}]: NVRAM updated from {old_nvram}KB to {new_nvram}KB".format(name=self._name,
|
||||
id=self._id,
|
||||
old_nvram=self._nvram,
|
||||
new_nvram=nvram))
|
||||
self._nvram = nvram
|
||||
|
||||
@BaseVM.name.setter
|
||||
def name(self, new_name):
|
||||
"""
|
||||
Sets the name of this IOU VM.
|
||||
Sets the name of this IOU vm.
|
||||
|
||||
:param new_name: name
|
||||
"""
|
||||
|
||||
if self.initial_config_file:
|
||||
content = self.initial_config_content
|
||||
content = self.initial_config
|
||||
content = content.replace(self._name, new_name)
|
||||
self.initial_config_content = content
|
||||
self.initial_config = content
|
||||
|
||||
super(IOUVM, IOUVM).name.__set__(self, new_name)
|
||||
|
||||
@property
|
||||
def application_id(self):
|
||||
|
||||
return self._manager.get_application_id(self.id)
|
||||
|
||||
@property
|
||||
def iourc_content(self):
|
||||
|
||||
try:
|
||||
with open(os.path.join(self.temporary_directory, "iourc")) as f:
|
||||
return f.read()
|
||||
@ -339,14 +344,13 @@ class IOUVM(BaseVM):
|
||||
|
||||
@iourc_content.setter
|
||||
def iourc_content(self, value):
|
||||
|
||||
if value is not None:
|
||||
path = os.path.join(self.temporary_directory, "iourc")
|
||||
try:
|
||||
with open(path, "w+") as f:
|
||||
f.write(value)
|
||||
except OSError as e:
|
||||
raise IOUError("Could not write the iourc file {}: {}".format(path, e))
|
||||
raise IOUError("Could not write iourc file {}: {}".format(path, e))
|
||||
|
||||
@asyncio.coroutine
|
||||
def _library_check(self):
|
||||
@ -372,8 +376,8 @@ class IOUVM(BaseVM):
|
||||
Checks for a valid IOU key in the iourc file (paranoid mode).
|
||||
"""
|
||||
|
||||
license_check = self._manager.config.get_section_config("IOU").getboolean("license_check", True)
|
||||
if license_check is False:
|
||||
license_check = self._manager.config.get_section_config("IOU").getboolean("license_check", False)
|
||||
if license_check:
|
||||
return
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
@ -464,11 +468,11 @@ class IOUVM(BaseVM):
|
||||
log.info("IOU instance {} started PID={}".format(self._id, self._iou_process.pid))
|
||||
self._started = True
|
||||
except FileNotFoundError as e:
|
||||
raise IOUError("Could not start IOU: {}: 32-bit binary support is probably not installed".format(e))
|
||||
raise IOUError("could not start IOU: {}: 32-bit binary support is probably not installed".format(e))
|
||||
except (OSError, subprocess.SubprocessError) as e:
|
||||
iou_stdout = self.read_iou_stdout()
|
||||
log.error("Could not start IOU {}: {}\n{}".format(self._path, e, iou_stdout))
|
||||
raise IOUError("Could not start IOU {}: {}\n{}".format(self._path, e, iou_stdout))
|
||||
log.error("could not start IOU {}: {}\n{}".format(self._path, e, iou_stdout))
|
||||
raise IOUError("could not start IOU {}: {}\n{}".format(self._path, e, iou_stdout))
|
||||
|
||||
# start console support
|
||||
self._start_ioucon()
|
||||
@ -477,7 +481,7 @@ class IOUVM(BaseVM):
|
||||
|
||||
def _rename_nvram_file(self):
|
||||
"""
|
||||
Before starting the VM, rename the nvram and vlan.dat files with the correct IOU application identifier.
|
||||
Before start the VM rename the nvram file to the correct application id
|
||||
"""
|
||||
|
||||
destination = os.path.join(self.working_dir, "nvram_{:05d}".format(self.application_id))
|
||||
@ -490,7 +494,7 @@ class IOUVM(BaseVM):
|
||||
@asyncio.coroutine
|
||||
def _start_iouyap(self):
|
||||
"""
|
||||
Starts iouyap (handles connections to and from this IOU VM).
|
||||
Starts iouyap (handles connections to and from this IOU device).
|
||||
"""
|
||||
|
||||
try:
|
||||
@ -508,7 +512,7 @@ class IOUVM(BaseVM):
|
||||
log.info("iouyap started PID={}".format(self._iouyap_process.pid))
|
||||
except (OSError, subprocess.SubprocessError) as e:
|
||||
iouyap_stdout = self.read_iouyap_stdout()
|
||||
log.error("Could not start iouyap: {}\n{}".format(e, iouyap_stdout))
|
||||
log.error("could not start iouyap: {}\n{}".format(e, iouyap_stdout))
|
||||
raise IOUError("Could not start iouyap: {}\n{}".format(e, iouyap_stdout))
|
||||
|
||||
def _update_iouyap_config(self):
|
||||
@ -632,6 +636,8 @@ class IOUVM(BaseVM):
|
||||
# Sometime the process may already be dead when we garbage collect
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
self._started = False
|
||||
self.status = "stopped"
|
||||
|
||||
@asyncio.coroutine
|
||||
def reload(self):
|
||||
|
@ -541,7 +541,6 @@ def send_recv_loop(epoll, console, router, esc_char, stop_event):
|
||||
else:
|
||||
router.write(buf)
|
||||
finally:
|
||||
log.debug("Finally")
|
||||
router.unregister(epoll)
|
||||
console.unregister(epoll)
|
||||
|
||||
|
@ -65,6 +65,9 @@ class Project:
|
||||
self._used_tcp_ports = set()
|
||||
self._used_udp_ports = set()
|
||||
|
||||
# List of clients listening for notifications
|
||||
self._listeners = set()
|
||||
|
||||
if path is None:
|
||||
path = os.path.join(self._location, self._id)
|
||||
try:
|
||||
@ -415,3 +418,27 @@ class Project:
|
||||
# We import it at the last time to avoid circular dependencies
|
||||
from ..modules import MODULES
|
||||
return MODULES
|
||||
|
||||
def emit(self, action, event):
|
||||
"""
|
||||
Send an event to all the client listening for notifications
|
||||
|
||||
:param action: Action name
|
||||
:param event: Event to send
|
||||
"""
|
||||
|
||||
for listener in self._listeners:
|
||||
listener.put_nowait((action, event, ))
|
||||
|
||||
def get_listen_queue(self):
|
||||
"""Get a queue where you receive all the events related to the
|
||||
project."""
|
||||
|
||||
queue = asyncio.Queue()
|
||||
self._listeners.add(queue)
|
||||
return queue
|
||||
|
||||
def stop_listen_queue(self, queue):
|
||||
"""Stop sending notification to this clients"""
|
||||
|
||||
self._listeners.remove(queue)
|
||||
|
@ -34,6 +34,8 @@ from ..adapters.ethernet_adapter import EthernetAdapter
|
||||
from ..nios.nio_udp import NIOUDP
|
||||
from ..base_vm import BaseVM
|
||||
from ...schemas.qemu import QEMU_OBJECT_SCHEMA
|
||||
from ...utils.asyncio import monitor_process
|
||||
from ...config import Config
|
||||
|
||||
import logging
|
||||
log = logging.getLogger(__name__)
|
||||
@ -61,7 +63,6 @@ class QemuVM(BaseVM):
|
||||
self._host = server_config.get("host", "127.0.0.1")
|
||||
self._monitor_host = server_config.get("monitor_host", "127.0.0.1")
|
||||
self._command = []
|
||||
self._started = False
|
||||
self._process = None
|
||||
self._cpulimit_process = None
|
||||
self._monitor = None
|
||||
@ -580,7 +581,8 @@ class QemuVM(BaseVM):
|
||||
stderr=subprocess.STDOUT,
|
||||
cwd=self.working_dir)
|
||||
log.info('QEMU VM "{}" started PID={}'.format(self._name, self._process.pid))
|
||||
self._started = True
|
||||
self.status = "started"
|
||||
monitor_process(self._process, self._termination_callback)
|
||||
except (OSError, subprocess.SubprocessError) as e:
|
||||
stdout = self.read_stdout()
|
||||
log.error("Could not start QEMU {}: {}\n{}".format(self.qemu_path, e, stdout))
|
||||
@ -590,6 +592,18 @@ class QemuVM(BaseVM):
|
||||
if self._cpu_throttling:
|
||||
self._set_cpu_throttling()
|
||||
|
||||
def _termination_callback(self, returncode):
|
||||
"""
|
||||
Called when the process has stopped.
|
||||
|
||||
:param returncode: Process returncode
|
||||
"""
|
||||
|
||||
if self.started:
|
||||
log.info("QEMU process has stopped, return code: %d", returncode)
|
||||
self.status = "stopped"
|
||||
self._process = None
|
||||
|
||||
@asyncio.coroutine
|
||||
def stop(self):
|
||||
"""
|
||||
@ -607,7 +621,7 @@ class QemuVM(BaseVM):
|
||||
if self._process.returncode is None:
|
||||
log.warn('QEMU VM "{}" PID={} is still running'.format(self._name, self._process.pid))
|
||||
self._process = None
|
||||
self._started = False
|
||||
self.status = "stopped"
|
||||
self._stop_cpulimit()
|
||||
|
||||
@asyncio.coroutine
|
||||
@ -806,7 +820,7 @@ class QemuVM(BaseVM):
|
||||
:returns: boolean
|
||||
"""
|
||||
|
||||
return self._started
|
||||
return self.status == "started"
|
||||
|
||||
def read_stdout(self):
|
||||
"""
|
||||
|
@ -34,7 +34,7 @@ from ..adapters.ethernet_adapter import EthernetAdapter
|
||||
from ..nios.nio_udp import NIOUDP
|
||||
from ..nios.nio_tap import NIOTAP
|
||||
from ..base_vm import BaseVM
|
||||
from ...utils.asyncio import subprocess_check_output
|
||||
from ...utils.asyncio import subprocess_check_output, monitor_process
|
||||
|
||||
|
||||
import logging
|
||||
@ -108,6 +108,7 @@ class VPCSVM(BaseVM):
|
||||
|
||||
return {"name": self.name,
|
||||
"vm_id": self.id,
|
||||
"status": self.status,
|
||||
"console": self._console,
|
||||
"project_id": self.project.id,
|
||||
"startup_script": self.startup_script,
|
||||
@ -232,13 +233,28 @@ class VPCSVM(BaseVM):
|
||||
stderr=subprocess.STDOUT,
|
||||
cwd=self.working_dir,
|
||||
creationflags=flags)
|
||||
monitor_process(self._process, self._termination_callback)
|
||||
log.info("VPCS instance {} started PID={}".format(self.name, self._process.pid))
|
||||
self._started = True
|
||||
self.status = "started"
|
||||
except (OSError, subprocess.SubprocessError) as e:
|
||||
vpcs_stdout = self.read_vpcs_stdout()
|
||||
log.error("Could not start VPCS {}: {}\n{}".format(self.vpcs_path, e, vpcs_stdout))
|
||||
raise VPCSError("Could not start VPCS {}: {}\n{}".format(self.vpcs_path, e, vpcs_stdout))
|
||||
|
||||
def _termination_callback(self, returncode):
|
||||
"""
|
||||
Called when the process has stopped.
|
||||
|
||||
:param returncode: Process returncode
|
||||
"""
|
||||
|
||||
if self._started:
|
||||
log.info("VPCS process has stopped, return code: %d", returncode)
|
||||
self._started = False
|
||||
self.status = "stopped"
|
||||
self._process = None
|
||||
|
||||
@asyncio.coroutine
|
||||
def stop(self):
|
||||
"""
|
||||
@ -256,6 +272,7 @@ class VPCSVM(BaseVM):
|
||||
|
||||
self._process = None
|
||||
self._started = False
|
||||
self.status = "stopped"
|
||||
|
||||
@asyncio.coroutine
|
||||
def reload(self):
|
||||
|
@ -148,6 +148,10 @@ VPCS_OBJECT_SCHEMA = {
|
||||
"maxLength": 36,
|
||||
"pattern": "^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$"
|
||||
},
|
||||
"status": {
|
||||
"description": "VM status",
|
||||
"enum": ["started", "stopped"]
|
||||
},
|
||||
"console": {
|
||||
"description": "console TCP port",
|
||||
"minimum": 1,
|
||||
@ -171,5 +175,5 @@ VPCS_OBJECT_SCHEMA = {
|
||||
},
|
||||
},
|
||||
"additionalProperties": False,
|
||||
"required": ["name", "vm_id", "console", "project_id", "startup_script_path"]
|
||||
"required": ["name", "vm_id", "status", "console", "project_id", "startup_script_path"]
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
@ -76,3 +77,19 @@ def wait_for_process_termination(process, timeout=10):
|
||||
yield from asyncio.sleep(0.1)
|
||||
timeout -= 0.1
|
||||
raise asyncio.TimeoutError()
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def _check_process(process, termination_callback):
|
||||
if not hasattr(sys, "_called_from_test") or not sys._called_from_test:
|
||||
returncode = yield from process.wait()
|
||||
if asyncio.iscoroutinefunction(termination_callback):
|
||||
yield from termination_callback(returncode)
|
||||
else:
|
||||
termination_callback(returncode)
|
||||
|
||||
|
||||
def monitor_process(process, termination_callback):
|
||||
"""Call termination_callback when a process dies"""
|
||||
|
||||
asyncio.async(_check_process(process, termination_callback))
|
||||
|
@ -20,9 +20,13 @@ This test suite check /project endpoint
|
||||
"""
|
||||
|
||||
import uuid
|
||||
import asyncio
|
||||
import aiohttp
|
||||
from unittest.mock import patch
|
||||
from tests.utils import asyncio_patch
|
||||
|
||||
from gns3server.handlers.api.project_handler import ProjectHandler
|
||||
|
||||
|
||||
def test_create_project_with_path(server, tmpdir):
|
||||
with patch("gns3server.modules.project.Project.is_local", return_value=True):
|
||||
@ -139,6 +143,38 @@ def test_close_project(server, project):
|
||||
assert mock.called
|
||||
|
||||
|
||||
def test_close_project_two_client_connected(server, project):
|
||||
|
||||
ProjectHandler._notifications_listening = 2
|
||||
|
||||
with asyncio_patch("gns3server.modules.project.Project.close", return_value=True) as mock:
|
||||
response = server.post("/projects/{project_id}/close".format(project_id=project.id), example=True)
|
||||
assert response.status == 204
|
||||
assert not mock.called
|
||||
|
||||
|
||||
def test_close_project_invalid_uuid(server):
|
||||
response = server.post("/projects/{project_id}/close".format(project_id=uuid.uuid4()))
|
||||
assert response.status == 404
|
||||
|
||||
|
||||
def test_notification(server, project, loop):
|
||||
@asyncio.coroutine
|
||||
def go(future):
|
||||
response = yield from aiohttp.request("GET", server.get_url("/projects/{project_id}/notifications".format(project_id=project.id), 1))
|
||||
response.body = yield from response.content.read(19)
|
||||
project.emit("vm.created", {"a": "b"})
|
||||
response.body += yield from response.content.read(47)
|
||||
response.close()
|
||||
future.set_result(response)
|
||||
|
||||
future = asyncio.Future()
|
||||
asyncio.async(go(future))
|
||||
response = loop.run_until_complete(future)
|
||||
assert response.status == 200
|
||||
assert response.body == b'{"action": "ping"}\n{"action": "vm.created", "event": {"a": "b"}}\n'
|
||||
|
||||
|
||||
def test_notification_invalid_id(server, project):
|
||||
response = server.get("/projects/{project_id}/notifications".format(project_id=uuid.uuid4()))
|
||||
assert response.status == 404
|
||||
|
@ -43,6 +43,7 @@ def test_vpcs_get(server, project, vm):
|
||||
assert response.json["name"] == "PC TEST 1"
|
||||
assert response.json["project_id"] == project.id
|
||||
assert response.json["startup_script_path"] == None
|
||||
assert response.json["status"] == "stopped"
|
||||
|
||||
|
||||
def test_vpcs_create_startup_script(server, project):
|
||||
@ -95,6 +96,7 @@ def test_vpcs_delete_nio(server, vm):
|
||||
|
||||
|
||||
def test_vpcs_start(server, vm):
|
||||
|
||||
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM.start", return_value=True) as mock:
|
||||
response = server.post("/projects/{project_id}/vpcs/vms/{vm_id}/start".format(project_id=vm["project_id"], vm_id=vm["vm_id"]), example=True)
|
||||
assert mock.called
|
||||
|
@ -71,6 +71,7 @@ def test_vm_invalid_vpcs_path(project, manager, loop):
|
||||
def test_start(loop, vm):
|
||||
process = MagicMock()
|
||||
process.returncode = None
|
||||
queue = vm.project.get_listen_queue()
|
||||
|
||||
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
|
||||
@ -78,6 +79,9 @@ def test_start(loop, vm):
|
||||
vm.port_add_nio_binding(0, nio)
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
assert vm.is_running()
|
||||
(action, event) = queue.get_nowait()
|
||||
assert action == "vm.started"
|
||||
assert event == vm
|
||||
|
||||
|
||||
def test_stop(loop, vm):
|
||||
@ -97,10 +101,16 @@ def test_stop(loop, vm):
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
assert vm.is_running()
|
||||
|
||||
queue = vm.project.get_listen_queue()
|
||||
|
||||
loop.run_until_complete(asyncio.async(vm.stop()))
|
||||
assert vm.is_running() is False
|
||||
process.terminate.assert_called_with()
|
||||
|
||||
(action, event) = queue.get_nowait()
|
||||
assert action == "vm.stopped"
|
||||
assert event == vm
|
||||
|
||||
|
||||
def test_reload(loop, vm):
|
||||
process = MagicMock()
|
||||
|
Loading…
Reference in New Issue
Block a user