mirror of
https://github.com/GNS3/gns3-server
synced 2025-01-22 05:51:17 +00:00
A notification stream with process monitoring
This commit is contained in:
parent
533ce78b90
commit
995881980b
@ -15,14 +15,23 @@
|
|||||||
# You should have received a copy of the GNU General Public License
|
# You should have received a copy of the GNU General Public License
|
||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
|
||||||
from ...web.route import Route
|
from ...web.route import Route
|
||||||
from ...schemas.project import PROJECT_OBJECT_SCHEMA, PROJECT_CREATE_SCHEMA, PROJECT_UPDATE_SCHEMA
|
from ...schemas.project import PROJECT_OBJECT_SCHEMA, PROJECT_CREATE_SCHEMA, PROJECT_UPDATE_SCHEMA
|
||||||
from ...modules.project_manager import ProjectManager
|
from ...modules.project_manager import ProjectManager
|
||||||
from ...modules import MODULES
|
from ...modules import MODULES
|
||||||
|
|
||||||
|
import logging
|
||||||
|
log = logging.getLogger()
|
||||||
|
|
||||||
|
|
||||||
class ProjectHandler:
|
class ProjectHandler:
|
||||||
|
|
||||||
|
# How many clients has subcribe to notifications
|
||||||
|
_notifications_listening = 0
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@Route.post(
|
@Route.post(
|
||||||
r"/projects",
|
r"/projects",
|
||||||
@ -123,8 +132,9 @@ class ProjectHandler:
|
|||||||
|
|
||||||
pm = ProjectManager.instance()
|
pm = ProjectManager.instance()
|
||||||
project = pm.get_project(request.match_info["project_id"])
|
project = pm.get_project(request.match_info["project_id"])
|
||||||
yield from project.close()
|
if ProjectHandler._notifications_listening == 0:
|
||||||
pm.remove_project(project.id)
|
yield from project.close()
|
||||||
|
pm.remove_project(project.id)
|
||||||
response.set_status(204)
|
response.set_status(204)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@ -145,3 +155,45 @@ class ProjectHandler:
|
|||||||
yield from project.delete()
|
yield from project.delete()
|
||||||
pm.remove_project(project.id)
|
pm.remove_project(project.id)
|
||||||
response.set_status(204)
|
response.set_status(204)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@Route.get(
|
||||||
|
r"/projects/{project_id}/notifications",
|
||||||
|
description="Receive notifications about the projects",
|
||||||
|
parameters={
|
||||||
|
"project_id": "The UUID of the project",
|
||||||
|
},
|
||||||
|
status_codes={
|
||||||
|
200: "End of stream",
|
||||||
|
404: "The project doesn't exist"
|
||||||
|
})
|
||||||
|
def notification(request, response):
|
||||||
|
|
||||||
|
pm = ProjectManager.instance()
|
||||||
|
project = pm.get_project(request.match_info["project_id"])
|
||||||
|
|
||||||
|
response.content_type = "application/json"
|
||||||
|
response.set_status(200)
|
||||||
|
response.enable_chunked_encoding()
|
||||||
|
# Very important: do not send a content lenght otherwise QT close the connection but curl can consume the Feed
|
||||||
|
response.content_length = None
|
||||||
|
|
||||||
|
response.start(request)
|
||||||
|
queue = project.get_listen_queue()
|
||||||
|
ProjectHandler._notifications_listening += 1
|
||||||
|
response.write("{\"action\": \"ping\"}\n".encode("utf-8"))
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
(action, msg) = yield from asyncio.wait_for(queue.get(), 5)
|
||||||
|
if hasattr(msg, "__json__"):
|
||||||
|
msg = json.dumps({"action": action, "event": msg.__json__()}, sort_keys=True)
|
||||||
|
else:
|
||||||
|
msg = json.dumps({"action": action, "event": msg}, sort_keys=True)
|
||||||
|
log.debug("Send notification: %s", msg)
|
||||||
|
response.write(("{}\n".format(msg)).encode("utf-8"))
|
||||||
|
except asyncio.futures.CancelledError as e:
|
||||||
|
break
|
||||||
|
except asyncio.futures.TimeoutError as e:
|
||||||
|
response.write("{\"action\": \"ping\"}\n".encode("utf-8"))
|
||||||
|
project.stop_listen_queue(queue)
|
||||||
|
ProjectHandler._notifications_listening -= 1
|
||||||
|
@ -48,6 +48,7 @@ class BaseVM:
|
|||||||
self._manager = manager
|
self._manager = manager
|
||||||
self._console = console
|
self._console = console
|
||||||
self._temporary_directory = None
|
self._temporary_directory = None
|
||||||
|
self._vm_status = "stopped"
|
||||||
|
|
||||||
if self._console is not None:
|
if self._console is not None:
|
||||||
self._console = self._manager.port_manager.reserve_tcp_port(self._console, self._project)
|
self._console = self._manager.port_manager.reserve_tcp_port(self._console, self._project)
|
||||||
@ -68,6 +69,18 @@ class BaseVM:
|
|||||||
if os.path.exists(self._temporary_directory):
|
if os.path.exists(self._temporary_directory):
|
||||||
shutil.rmtree(self._temporary_directory)
|
shutil.rmtree(self._temporary_directory)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def status(self):
|
||||||
|
"""Return current VM status"""
|
||||||
|
|
||||||
|
return self._vm_status
|
||||||
|
|
||||||
|
@status.setter
|
||||||
|
def status(self, status):
|
||||||
|
|
||||||
|
self._vm_status = status
|
||||||
|
self._project.emit("vm.{}".format(status), self)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def project(self):
|
def project(self):
|
||||||
"""
|
"""
|
||||||
|
@ -70,6 +70,16 @@ class Hypervisor(DynamipsHypervisor):
|
|||||||
|
|
||||||
return self._id
|
return self._id
|
||||||
|
|
||||||
|
@property
|
||||||
|
def process(self):
|
||||||
|
"""
|
||||||
|
Returns the subprocess of the Hypervisor
|
||||||
|
|
||||||
|
:returns: subprocess
|
||||||
|
"""
|
||||||
|
|
||||||
|
return self._process
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def started(self):
|
def started(self):
|
||||||
"""
|
"""
|
||||||
|
@ -34,7 +34,8 @@ from ...base_vm import BaseVM
|
|||||||
from ..dynamips_error import DynamipsError
|
from ..dynamips_error import DynamipsError
|
||||||
from ..nios.nio_udp import NIOUDP
|
from ..nios.nio_udp import NIOUDP
|
||||||
|
|
||||||
from gns3server.utils.asyncio import wait_run_in_executor
|
from gns3server.config import Config
|
||||||
|
from gns3server.utils.asyncio import wait_run_in_executor, monitor_process
|
||||||
|
|
||||||
|
|
||||||
class Router(BaseVM):
|
class Router(BaseVM):
|
||||||
@ -164,7 +165,7 @@ class Router(BaseVM):
|
|||||||
slot_number += 1
|
slot_number += 1
|
||||||
|
|
||||||
# add the wics
|
# add the wics
|
||||||
if self._slots[0] and self._slots[0].wics:
|
if len(self._slots) > 0 and self._slots[0] and self._slots[0].wics:
|
||||||
for wic_slot_number in range(0, len(self._slots[0].wics)):
|
for wic_slot_number in range(0, len(self._slots[0].wics)):
|
||||||
if self._slots[0].wics[wic_slot_number]:
|
if self._slots[0].wics[wic_slot_number]:
|
||||||
router_info["wic" + str(wic_slot_number)] = str(self._slots[0].wics[wic_slot_number])
|
router_info["wic" + str(wic_slot_number)] = str(self._slots[0].wics[wic_slot_number])
|
||||||
@ -259,7 +260,18 @@ class Router(BaseVM):
|
|||||||
raise DynamipsError('"{}" is not a valid IOS image'.format(self._image))
|
raise DynamipsError('"{}" is not a valid IOS image'.format(self._image))
|
||||||
|
|
||||||
yield from self._hypervisor.send('vm start "{name}"'.format(name=self._name))
|
yield from self._hypervisor.send('vm start "{name}"'.format(name=self._name))
|
||||||
|
self.status = "started"
|
||||||
log.info('router "{name}" [{id}] has been started'.format(name=self._name, id=self._id))
|
log.info('router "{name}" [{id}] has been started'.format(name=self._name, id=self._id))
|
||||||
|
monitor_process(self._hypervisor.process, self._termination_callback)
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def _termination_callback(self, returncode):
|
||||||
|
"""
|
||||||
|
Called when the process is killed
|
||||||
|
|
||||||
|
:param returncode: Process returncode
|
||||||
|
"""
|
||||||
|
self.status = "stopped"
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def stop(self):
|
def stop(self):
|
||||||
@ -270,6 +282,7 @@ class Router(BaseVM):
|
|||||||
status = yield from self.get_status()
|
status = yield from self.get_status()
|
||||||
if status != "inactive":
|
if status != "inactive":
|
||||||
yield from self._hypervisor.send('vm stop "{name}"'.format(name=self._name))
|
yield from self._hypervisor.send('vm stop "{name}"'.format(name=self._name))
|
||||||
|
self.status = "stopped"
|
||||||
log.info('Router "{name}" [{id}] has been stopped'.format(name=self._name, id=self._id))
|
log.info('Router "{name}" [{id}] has been stopped'.format(name=self._name, id=self._id))
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
|
@ -467,6 +467,8 @@ class IOUVM(BaseVM):
|
|||||||
env=env)
|
env=env)
|
||||||
log.info("IOU instance {} started PID={}".format(self._id, self._iou_process.pid))
|
log.info("IOU instance {} started PID={}".format(self._id, self._iou_process.pid))
|
||||||
self._started = True
|
self._started = True
|
||||||
|
self.status = "started"
|
||||||
|
gns3server.utils.asyncio.monitor_process(self._iou_process, self._termination_callback)
|
||||||
except FileNotFoundError as e:
|
except FileNotFoundError as e:
|
||||||
raise IOUError("could not start IOU: {}: 32-bit binary support is probably not installed".format(e))
|
raise IOUError("could not start IOU: {}: 32-bit binary support is probably not installed".format(e))
|
||||||
except (OSError, subprocess.SubprocessError) as e:
|
except (OSError, subprocess.SubprocessError) as e:
|
||||||
@ -479,6 +481,17 @@ class IOUVM(BaseVM):
|
|||||||
# connections support
|
# connections support
|
||||||
yield from self._start_iouyap()
|
yield from self._start_iouyap()
|
||||||
|
|
||||||
|
def _termination_callback(self, returncode):
|
||||||
|
"""
|
||||||
|
Called when the process is killed
|
||||||
|
|
||||||
|
:param returncode: Process returncode
|
||||||
|
"""
|
||||||
|
log.info("IOU process crash return code: %d", returncode)
|
||||||
|
self._terminate_process_iou()
|
||||||
|
self._terminate_process_iouyap()
|
||||||
|
self._ioucon_thread_stop_event.set()
|
||||||
|
|
||||||
def _rename_nvram_file(self):
|
def _rename_nvram_file(self):
|
||||||
"""
|
"""
|
||||||
Before start the VM rename the nvram file to the correct application id
|
Before start the VM rename the nvram file to the correct application id
|
||||||
@ -509,6 +522,7 @@ class IOUVM(BaseVM):
|
|||||||
stderr=subprocess.STDOUT,
|
stderr=subprocess.STDOUT,
|
||||||
cwd=self.working_dir)
|
cwd=self.working_dir)
|
||||||
|
|
||||||
|
gns3server.utils.asyncio.monitor_process(self._iouyap_process, self._termination_callback)
|
||||||
log.info("iouyap started PID={}".format(self._iouyap_process.pid))
|
log.info("iouyap started PID={}".format(self._iouyap_process.pid))
|
||||||
except (OSError, subprocess.SubprocessError) as e:
|
except (OSError, subprocess.SubprocessError) as e:
|
||||||
iouyap_stdout = self.read_iouyap_stdout()
|
iouyap_stdout = self.read_iouyap_stdout()
|
||||||
@ -616,22 +630,26 @@ class IOUVM(BaseVM):
|
|||||||
def _terminate_process_iouyap(self):
|
def _terminate_process_iouyap(self):
|
||||||
"""Terminate the process if running"""
|
"""Terminate the process if running"""
|
||||||
|
|
||||||
log.info("Stopping IOUYAP instance {} PID={}".format(self.name, self._iouyap_process.pid))
|
if self._iouyap_process:
|
||||||
try:
|
log.info("Stopping IOUYAP instance {} PID={}".format(self.name, self._iouyap_process.pid))
|
||||||
self._iouyap_process.terminate()
|
try:
|
||||||
# Sometime the process can already be dead when we garbage collect
|
self._iouyap_process.terminate()
|
||||||
except ProcessLookupError:
|
# Sometime the process can already be dead when we garbage collect
|
||||||
pass
|
except ProcessLookupError:
|
||||||
|
pass
|
||||||
|
|
||||||
def _terminate_process_iou(self):
|
def _terminate_process_iou(self):
|
||||||
"""Terminate the process if running"""
|
"""Terminate the process if running"""
|
||||||
|
|
||||||
log.info("Stopping IOU instance {} PID={}".format(self.name, self._iou_process.pid))
|
if self._iou_process:
|
||||||
try:
|
log.info("Stopping IOU instance {} PID={}".format(self.name, self._iou_process.pid))
|
||||||
self._iou_process.terminate()
|
try:
|
||||||
# Sometime the process can already be dead when we garbage collect
|
self._iou_process.terminate()
|
||||||
except ProcessLookupError:
|
# Sometime the process can already be dead when we garbage collect
|
||||||
pass
|
except ProcessLookupError:
|
||||||
|
pass
|
||||||
|
self._started = False
|
||||||
|
self.status = "stopped"
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def reload(self):
|
def reload(self):
|
||||||
|
@ -541,7 +541,6 @@ def send_recv_loop(epoll, console, router, esc_char, stop_event):
|
|||||||
else:
|
else:
|
||||||
router.write(buf)
|
router.write(buf)
|
||||||
finally:
|
finally:
|
||||||
log.debug("Finally")
|
|
||||||
router.unregister(epoll)
|
router.unregister(epoll)
|
||||||
console.unregister(epoll)
|
console.unregister(epoll)
|
||||||
|
|
||||||
|
@ -65,6 +65,9 @@ class Project:
|
|||||||
self._used_tcp_ports = set()
|
self._used_tcp_ports = set()
|
||||||
self._used_udp_ports = set()
|
self._used_udp_ports = set()
|
||||||
|
|
||||||
|
# List of clients listen for notifications
|
||||||
|
self._listeners = set()
|
||||||
|
|
||||||
if path is None:
|
if path is None:
|
||||||
path = os.path.join(self._location, self._id)
|
path = os.path.join(self._location, self._id)
|
||||||
try:
|
try:
|
||||||
@ -403,3 +406,26 @@ class Project:
|
|||||||
# We import it at the last time to avoid circular dependencies
|
# We import it at the last time to avoid circular dependencies
|
||||||
from ..modules import MODULES
|
from ..modules import MODULES
|
||||||
return MODULES
|
return MODULES
|
||||||
|
|
||||||
|
def emit(self, action, event):
|
||||||
|
"""
|
||||||
|
Send an event to all the client listens for notifications
|
||||||
|
|
||||||
|
:param action: Action happened
|
||||||
|
:param event: Event sended to the client
|
||||||
|
"""
|
||||||
|
for listener in self._listeners:
|
||||||
|
listener.put_nowait((action, event, ))
|
||||||
|
|
||||||
|
def get_listen_queue(self):
|
||||||
|
"""Get a queue where you receive all the events related to the
|
||||||
|
project."""
|
||||||
|
|
||||||
|
queue = asyncio.Queue()
|
||||||
|
self._listeners.add(queue)
|
||||||
|
return queue
|
||||||
|
|
||||||
|
def stop_listen_queue(self, queue):
|
||||||
|
"""Stop sending notification to this clients"""
|
||||||
|
|
||||||
|
self._listeners.remove(queue)
|
||||||
|
@ -33,6 +33,8 @@ from ..adapters.ethernet_adapter import EthernetAdapter
|
|||||||
from ..nios.nio_udp import NIOUDP
|
from ..nios.nio_udp import NIOUDP
|
||||||
from ..base_vm import BaseVM
|
from ..base_vm import BaseVM
|
||||||
from ...schemas.qemu import QEMU_OBJECT_SCHEMA
|
from ...schemas.qemu import QEMU_OBJECT_SCHEMA
|
||||||
|
from ...utils.asyncio import monitor_process
|
||||||
|
from ...config import Config
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
@ -68,7 +70,6 @@ class QemuVM(BaseVM):
|
|||||||
self._host = server_config.get("host", "127.0.0.1")
|
self._host = server_config.get("host", "127.0.0.1")
|
||||||
self._monitor_host = server_config.get("monitor_host", "127.0.0.1")
|
self._monitor_host = server_config.get("monitor_host", "127.0.0.1")
|
||||||
self._command = []
|
self._command = []
|
||||||
self._started = False
|
|
||||||
self._process = None
|
self._process = None
|
||||||
self._cpulimit_process = None
|
self._cpulimit_process = None
|
||||||
self._monitor = None
|
self._monitor = None
|
||||||
@ -606,7 +607,8 @@ class QemuVM(BaseVM):
|
|||||||
stderr=subprocess.STDOUT,
|
stderr=subprocess.STDOUT,
|
||||||
cwd=self.working_dir)
|
cwd=self.working_dir)
|
||||||
log.info("QEMU VM instance {} started PID={}".format(self._id, self._process.pid))
|
log.info("QEMU VM instance {} started PID={}".format(self._id, self._process.pid))
|
||||||
self._started = True
|
self.status = "started"
|
||||||
|
monitor_process(self._process, self._termination_callback)
|
||||||
except (OSError, subprocess.SubprocessError) as e:
|
except (OSError, subprocess.SubprocessError) as e:
|
||||||
stdout = self.read_stdout()
|
stdout = self.read_stdout()
|
||||||
log.error("could not start QEMU {}: {}\n{}".format(self.qemu_path, e, stdout))
|
log.error("could not start QEMU {}: {}\n{}".format(self.qemu_path, e, stdout))
|
||||||
@ -616,6 +618,17 @@ class QemuVM(BaseVM):
|
|||||||
if self._cpu_throttling:
|
if self._cpu_throttling:
|
||||||
self._set_cpu_throttling()
|
self._set_cpu_throttling()
|
||||||
|
|
||||||
|
def _termination_callback(self, returncode):
|
||||||
|
"""
|
||||||
|
Called when the process is killed
|
||||||
|
|
||||||
|
:param returncode: Process returncode
|
||||||
|
"""
|
||||||
|
if self.started:
|
||||||
|
log.info("Process Qemu is dead. Return code: %d", returncode)
|
||||||
|
self.status = "stopped"
|
||||||
|
self._process = None
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def stop(self):
|
def stop(self):
|
||||||
"""
|
"""
|
||||||
@ -634,7 +647,7 @@ class QemuVM(BaseVM):
|
|||||||
log.warn("QEMU VM instance {} PID={} is still running".format(self._id,
|
log.warn("QEMU VM instance {} PID={} is still running".format(self._id,
|
||||||
self._process.pid))
|
self._process.pid))
|
||||||
self._process = None
|
self._process = None
|
||||||
self._started = False
|
self.status = "stopped"
|
||||||
self._stop_cpulimit()
|
self._stop_cpulimit()
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
@ -830,7 +843,7 @@ class QemuVM(BaseVM):
|
|||||||
:returns: boolean
|
:returns: boolean
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self._started
|
return self.status == "started"
|
||||||
|
|
||||||
def read_stdout(self):
|
def read_stdout(self):
|
||||||
"""
|
"""
|
||||||
|
@ -34,7 +34,7 @@ from ..adapters.ethernet_adapter import EthernetAdapter
|
|||||||
from ..nios.nio_udp import NIOUDP
|
from ..nios.nio_udp import NIOUDP
|
||||||
from ..nios.nio_tap import NIOTAP
|
from ..nios.nio_tap import NIOTAP
|
||||||
from ..base_vm import BaseVM
|
from ..base_vm import BaseVM
|
||||||
from ...utils.asyncio import subprocess_check_output
|
from ...utils.asyncio import subprocess_check_output, monitor_process
|
||||||
|
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
@ -105,6 +105,7 @@ class VPCSVM(BaseVM):
|
|||||||
|
|
||||||
return {"name": self.name,
|
return {"name": self.name,
|
||||||
"vm_id": self.id,
|
"vm_id": self.id,
|
||||||
|
"status": self.status,
|
||||||
"console": self._console,
|
"console": self._console,
|
||||||
"project_id": self.project.id,
|
"project_id": self.project.id,
|
||||||
"startup_script": self.startup_script,
|
"startup_script": self.startup_script,
|
||||||
@ -228,13 +229,27 @@ class VPCSVM(BaseVM):
|
|||||||
stderr=subprocess.STDOUT,
|
stderr=subprocess.STDOUT,
|
||||||
cwd=self.working_dir,
|
cwd=self.working_dir,
|
||||||
creationflags=flags)
|
creationflags=flags)
|
||||||
|
monitor_process(self._process, self._termination_callback)
|
||||||
log.info("VPCS instance {} started PID={}".format(self.name, self._process.pid))
|
log.info("VPCS instance {} started PID={}".format(self.name, self._process.pid))
|
||||||
self._started = True
|
self._started = True
|
||||||
|
self.status = "started"
|
||||||
except (OSError, subprocess.SubprocessError) as e:
|
except (OSError, subprocess.SubprocessError) as e:
|
||||||
vpcs_stdout = self.read_vpcs_stdout()
|
vpcs_stdout = self.read_vpcs_stdout()
|
||||||
log.error("Could not start VPCS {}: {}\n{}".format(self.vpcs_path, e, vpcs_stdout))
|
log.error("Could not start VPCS {}: {}\n{}".format(self.vpcs_path, e, vpcs_stdout))
|
||||||
raise VPCSError("Could not start VPCS {}: {}\n{}".format(self.vpcs_path, e, vpcs_stdout))
|
raise VPCSError("Could not start VPCS {}: {}\n{}".format(self.vpcs_path, e, vpcs_stdout))
|
||||||
|
|
||||||
|
def _termination_callback(self, returncode):
|
||||||
|
"""
|
||||||
|
Called when the process is killed
|
||||||
|
|
||||||
|
:param returncode: Process returncode
|
||||||
|
"""
|
||||||
|
if self._started:
|
||||||
|
log.info("Process VPCS is dead. Return code: %d", returncode)
|
||||||
|
self._started = False
|
||||||
|
self.status = "stopped"
|
||||||
|
self._process = None
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def stop(self):
|
def stop(self):
|
||||||
"""
|
"""
|
||||||
@ -252,6 +267,7 @@ class VPCSVM(BaseVM):
|
|||||||
|
|
||||||
self._process = None
|
self._process = None
|
||||||
self._started = False
|
self._started = False
|
||||||
|
self.status = "stopped"
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def reload(self):
|
def reload(self):
|
||||||
|
@ -148,6 +148,10 @@ VPCS_OBJECT_SCHEMA = {
|
|||||||
"maxLength": 36,
|
"maxLength": 36,
|
||||||
"pattern": "^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$"
|
"pattern": "^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$"
|
||||||
},
|
},
|
||||||
|
"status": {
|
||||||
|
"description": "VM status",
|
||||||
|
"enum": ["started", "stopped"]
|
||||||
|
},
|
||||||
"console": {
|
"console": {
|
||||||
"description": "console TCP port",
|
"description": "console TCP port",
|
||||||
"minimum": 1,
|
"minimum": 1,
|
||||||
@ -171,5 +175,5 @@ VPCS_OBJECT_SCHEMA = {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
"additionalProperties": False,
|
"additionalProperties": False,
|
||||||
"required": ["name", "vm_id", "console", "project_id", "startup_script_path"]
|
"required": ["name", "vm_id", "status", "console", "project_id", "startup_script_path"]
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import shutil
|
import shutil
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
@ -77,3 +78,19 @@ def wait_for_process_termination(process, timeout=10):
|
|||||||
yield from asyncio.sleep(0.1)
|
yield from asyncio.sleep(0.1)
|
||||||
timeout -= 0.1
|
timeout -= 0.1
|
||||||
raise asyncio.TimeoutError()
|
raise asyncio.TimeoutError()
|
||||||
|
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def _check_process(process, termination_callback):
|
||||||
|
if not hasattr(sys, "_called_from_test") or not sys._called_from_test:
|
||||||
|
returncode = yield from process.wait()
|
||||||
|
if asyncio.iscoroutinefunction(termination_callback):
|
||||||
|
yield from termination_callback(returncode)
|
||||||
|
else:
|
||||||
|
termination_callback(returncode)
|
||||||
|
|
||||||
|
|
||||||
|
def monitor_process(process, termination_callback):
|
||||||
|
"""Call termination_callback when process die"""
|
||||||
|
|
||||||
|
asyncio.async(_check_process(process, termination_callback))
|
||||||
|
@ -44,7 +44,7 @@ class Query:
|
|||||||
def delete(self, path, **kwargs):
|
def delete(self, path, **kwargs):
|
||||||
return self._fetch("DELETE", path, **kwargs)
|
return self._fetch("DELETE", path, **kwargs)
|
||||||
|
|
||||||
def _get_url(self, path, version):
|
def get_url(self, path, version):
|
||||||
if version is None:
|
if version is None:
|
||||||
return "http://{}:{}{}".format(self._host, self._port, path)
|
return "http://{}:{}{}".format(self._host, self._port, path)
|
||||||
return "http://{}:{}/v{}{}".format(self._host, self._port, version, path)
|
return "http://{}:{}/v{}{}".format(self._host, self._port, version, path)
|
||||||
@ -62,7 +62,7 @@ class Query:
|
|||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def go(future):
|
def go(future):
|
||||||
response = yield from aiohttp.request(method, self._get_url(path, api_version), data=body)
|
response = yield from aiohttp.request(method, self.get_url(path, api_version), data=body)
|
||||||
future.set_result(response)
|
future.set_result(response)
|
||||||
future = asyncio.Future()
|
future = asyncio.Future()
|
||||||
asyncio.async(go(future))
|
asyncio.async(go(future))
|
||||||
|
@ -20,9 +20,13 @@ This test suite check /project endpoint
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import uuid
|
import uuid
|
||||||
|
import asyncio
|
||||||
|
import aiohttp
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
from tests.utils import asyncio_patch
|
from tests.utils import asyncio_patch
|
||||||
|
|
||||||
|
from gns3server.handlers.api.project_handler import ProjectHandler
|
||||||
|
|
||||||
|
|
||||||
def test_create_project_with_path(server, tmpdir):
|
def test_create_project_with_path(server, tmpdir):
|
||||||
with patch("gns3server.modules.project.Project.is_local", return_value=True):
|
with patch("gns3server.modules.project.Project.is_local", return_value=True):
|
||||||
@ -139,6 +143,38 @@ def test_close_project(server, project):
|
|||||||
assert mock.called
|
assert mock.called
|
||||||
|
|
||||||
|
|
||||||
|
def test_close_project_two_client_connected(server, project):
|
||||||
|
|
||||||
|
ProjectHandler._notifications_listening = 2
|
||||||
|
|
||||||
|
with asyncio_patch("gns3server.modules.project.Project.close", return_value=True) as mock:
|
||||||
|
response = server.post("/projects/{project_id}/close".format(project_id=project.id), example=True)
|
||||||
|
assert response.status == 204
|
||||||
|
assert not mock.called
|
||||||
|
|
||||||
|
|
||||||
def test_close_project_invalid_uuid(server):
|
def test_close_project_invalid_uuid(server):
|
||||||
response = server.post("/projects/{project_id}/close".format(project_id=uuid.uuid4()))
|
response = server.post("/projects/{project_id}/close".format(project_id=uuid.uuid4()))
|
||||||
assert response.status == 404
|
assert response.status == 404
|
||||||
|
|
||||||
|
|
||||||
|
def test_notification(server, project, loop):
|
||||||
|
@asyncio.coroutine
|
||||||
|
def go(future):
|
||||||
|
response = yield from aiohttp.request("GET", server.get_url("/projects/{project_id}/notifications".format(project_id=project.id), 1))
|
||||||
|
response.body = yield from response.content.read(19)
|
||||||
|
project.emit("vm.created", {"a": "b"})
|
||||||
|
response.body += yield from response.content.read(47)
|
||||||
|
response.close()
|
||||||
|
future.set_result(response)
|
||||||
|
|
||||||
|
future = asyncio.Future()
|
||||||
|
asyncio.async(go(future))
|
||||||
|
response = loop.run_until_complete(future)
|
||||||
|
assert response.status == 200
|
||||||
|
assert response.body == b'{"action": "ping"}\n{"action": "vm.created", "event": {"a": "b"}}\n'
|
||||||
|
|
||||||
|
|
||||||
|
def test_notification_invalid_id(server, project):
|
||||||
|
response = server.get("/projects/{project_id}/notifications".format(project_id=uuid.uuid4()))
|
||||||
|
assert response.status == 404
|
||||||
|
@ -43,6 +43,7 @@ def test_vpcs_get(server, project, vm):
|
|||||||
assert response.json["name"] == "PC TEST 1"
|
assert response.json["name"] == "PC TEST 1"
|
||||||
assert response.json["project_id"] == project.id
|
assert response.json["project_id"] == project.id
|
||||||
assert response.json["startup_script_path"] == None
|
assert response.json["startup_script_path"] == None
|
||||||
|
assert response.json["status"] == "stopped"
|
||||||
|
|
||||||
|
|
||||||
def test_vpcs_create_startup_script(server, project):
|
def test_vpcs_create_startup_script(server, project):
|
||||||
@ -95,6 +96,7 @@ def test_vpcs_delete_nio(server, vm):
|
|||||||
|
|
||||||
|
|
||||||
def test_vpcs_start(server, vm):
|
def test_vpcs_start(server, vm):
|
||||||
|
|
||||||
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM.start", return_value=True) as mock:
|
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM.start", return_value=True) as mock:
|
||||||
response = server.post("/projects/{project_id}/vpcs/vms/{vm_id}/start".format(project_id=vm["project_id"], vm_id=vm["vm_id"]), example=True)
|
response = server.post("/projects/{project_id}/vpcs/vms/{vm_id}/start".format(project_id=vm["project_id"], vm_id=vm["vm_id"]), example=True)
|
||||||
assert mock.called
|
assert mock.called
|
||||||
|
@ -71,6 +71,7 @@ def test_vm_invalid_vpcs_path(project, manager, loop):
|
|||||||
def test_start(loop, vm):
|
def test_start(loop, vm):
|
||||||
process = MagicMock()
|
process = MagicMock()
|
||||||
process.returncode = None
|
process.returncode = None
|
||||||
|
queue = vm.project.get_listen_queue()
|
||||||
|
|
||||||
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
|
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
|
||||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
|
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
|
||||||
@ -78,6 +79,9 @@ def test_start(loop, vm):
|
|||||||
vm.port_add_nio_binding(0, nio)
|
vm.port_add_nio_binding(0, nio)
|
||||||
loop.run_until_complete(asyncio.async(vm.start()))
|
loop.run_until_complete(asyncio.async(vm.start()))
|
||||||
assert vm.is_running()
|
assert vm.is_running()
|
||||||
|
(action, event) = queue.get_nowait()
|
||||||
|
assert action == "vm.started"
|
||||||
|
assert event == vm
|
||||||
|
|
||||||
|
|
||||||
def test_stop(loop, vm):
|
def test_stop(loop, vm):
|
||||||
@ -97,10 +101,16 @@ def test_stop(loop, vm):
|
|||||||
loop.run_until_complete(asyncio.async(vm.start()))
|
loop.run_until_complete(asyncio.async(vm.start()))
|
||||||
assert vm.is_running()
|
assert vm.is_running()
|
||||||
|
|
||||||
|
queue = vm.project.get_listen_queue()
|
||||||
|
|
||||||
loop.run_until_complete(asyncio.async(vm.stop()))
|
loop.run_until_complete(asyncio.async(vm.stop()))
|
||||||
assert vm.is_running() is False
|
assert vm.is_running() is False
|
||||||
process.terminate.assert_called_with()
|
process.terminate.assert_called_with()
|
||||||
|
|
||||||
|
(action, event) = queue.get_nowait()
|
||||||
|
assert action == "vm.stopped"
|
||||||
|
assert event == vm
|
||||||
|
|
||||||
|
|
||||||
def test_reload(loop, vm):
|
def test_reload(loop, vm):
|
||||||
process = MagicMock()
|
process = MagicMock()
|
||||||
|
Loading…
Reference in New Issue
Block a user