Merge branch 'unstable'

Conflicts:
	gns3server/modules/virtualbox/__init__.py
	gns3server/version.py
	tests/modules/test_manager.py
pull/168/head
Jeremy 9 years ago
commit 929c337e8b

@ -69,12 +69,17 @@ To run tests use:
py.test -v
Run as daemon
***************
Run as daemon (Unix only)
**************************
You will found init sample script for various systems
inside the init directory.
Usefull options:
* --daemon: start process as a daemon
* --log logfile: store output in a logfile
* --pid pidfile: store the pid of the running process in a file and prevent double execution
upstart
~~~~~~~

@ -27,6 +27,7 @@ from gns3server.handlers.api.virtualbox_handler import VirtualBoxHandler
from gns3server.handlers.api.vpcs_handler import VPCSHandler
from gns3server.handlers.api.config_handler import ConfigHandler
from gns3server.handlers.api.server_handler import ServerHandler
from gns3server.handlers.api.file_handler import FileHandler
from gns3server.handlers.upload_handler import UploadHandler
from gns3server.handlers.index_handler import IndexHandler

@ -25,6 +25,7 @@ from ...schemas.dynamips_vm import VM_UPDATE_SCHEMA
from ...schemas.dynamips_vm import VM_CAPTURE_SCHEMA
from ...schemas.dynamips_vm import VM_OBJECT_SCHEMA
from ...schemas.dynamips_vm import VM_CONFIGS_SCHEMA
from ...schemas.dynamips_vm import VMS_LIST_SCHEMA
from ...modules.dynamips import Dynamips
from ...modules.project_manager import ProjectManager
@ -421,3 +422,17 @@ class DynamipsVMHandler:
idlepc = yield from dynamips_manager.auto_idlepc(vm)
response.set_status(200)
response.json({"idlepc": idlepc})
@Route.get(
r"/dynamips/vms",
status_codes={
200: "List of Dynamips VM retrieved",
},
description="Retrieve the list of Dynamips VMS",
output=VMS_LIST_SCHEMA)
def list_vms(request, response):
dynamips_manager = Dynamips.instance()
vms = yield from dynamips_manager.list_images()
response.set_status(200)
response.json(vms)

@ -0,0 +1,60 @@
#
# Copyright (C) 2015 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import aiohttp
from ...web.route import Route
from ...schemas.file import FILE_STREAM_SCHEMA
class FileHandler:
@classmethod
@Route.get(
r"/files/stream",
description="Stream a file from the server",
status_codes={
200: "File retrieved",
404: "File doesn't exist",
409: "Can't access to file"
},
input=FILE_STREAM_SCHEMA
)
def read(request, response):
response.enable_chunked_encoding()
try:
with open(request.json.get("location"), "rb") as f:
loop = asyncio.get_event_loop()
response.content_type = "application/octet-stream"
response.set_status(200)
# Very important: do not send a content lenght otherwise QT close the connection but curl can consume the Feed
response.content_length = None
response.start(request)
while True:
data = yield from loop.run_in_executor(None, f.read, 16)
if len(data) == 0:
yield from asyncio.sleep(0.1)
else:
response.write(data)
except FileNotFoundError:
raise aiohttp.web.HTTPNotFound()
except OSError as e:
raise aiohttp.web.HTTPConflict(text=str(e))

@ -25,6 +25,7 @@ from ...schemas.iou import IOU_UPDATE_SCHEMA
from ...schemas.iou import IOU_OBJECT_SCHEMA
from ...schemas.iou import IOU_CAPTURE_SCHEMA
from ...schemas.iou import IOU_INITIAL_CONFIG_SCHEMA
from ...schemas.iou import IOU_LIST_VMS_SCHEMA
from ...modules.iou import IOU
@ -61,6 +62,8 @@ class IOUHandler:
if name == "initial_config_content" and (vm.initial_config_content and len(vm.initial_config_content) > 0):
continue
setattr(vm, name, value)
if "initial_config_content" in request.json:
vm.initial_config = request.json.get("initial_config_content")
response.set_status(201)
response.json(vm)
@ -108,6 +111,8 @@ class IOUHandler:
for name, value in request.json.items():
if hasattr(vm, name) and getattr(vm, name) != value:
setattr(vm, name, value)
if "initial_config_content" in request.json:
vm.initial_config = request.json.get("initial_config_content")
response.json(vm)
@classmethod
@ -311,3 +316,18 @@ class IOUHandler:
project_id=request.match_info["project_id"])
response.set_status(200)
response.json({"content": vm.initial_config_content})
@Route.get(
r"/iou/vms",
status_codes={
200: "List of IOU VM retrieved",
},
description="Retrieve the list of IOU VMS",
output=IOU_LIST_VMS_SCHEMA)
def list_vms(request, response):
iou_manager = IOU.instance()
vms = yield from iou_manager.list_images()
response.set_status(200)
response.json(vms)

@ -22,6 +22,7 @@ from ...schemas.qemu import QEMU_CREATE_SCHEMA
from ...schemas.qemu import QEMU_UPDATE_SCHEMA
from ...schemas.qemu import QEMU_OBJECT_SCHEMA
from ...schemas.qemu import QEMU_BINARY_LIST_SCHEMA
from ...schemas.qemu import QEMU_LIST_IMAGES_SCHEMA
from ...modules.qemu import Qemu
@ -290,3 +291,17 @@ class QEMUHandler:
binaries = yield from Qemu.binary_list()
response.json(binaries)
@Route.get(
r"/qemu/vms",
status_codes={
200: "List of Qemu images retrieved",
},
description="Retrieve the list of Qemu images",
output=QEMU_LIST_IMAGES_SCHEMA)
def list_vms(request, response):
qemu_manager = Qemu.instance()
vms = yield from qemu_manager.list_images()
response.set_status(200)
response.json(vms)

@ -43,7 +43,7 @@ class VirtualBoxHandler:
def show(request, response):
vbox_manager = VirtualBox.instance()
vms = yield from vbox_manager.get_list()
vms = yield from vbox_manager.list_images()
response.json(vms)
@classmethod

@ -16,121 +16,40 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Entry point of the server. It's support daemonize the process
"""
import os
import datetime
import sys
import locale
import argparse
from gns3server.server import Server
from gns3server.web.logger import init_logger
from gns3server.version import __version__
from gns3server.config import Config
from gns3server.modules.project import Project
from gns3server.crash_report import CrashReport
import logging
log = logging.getLogger(__name__)
def locale_check():
def daemonize():
"""
Checks if this application runs with a correct locale (i.e. supports UTF-8 encoding) and attempt to fix
if this is not the case.
This is to prevent UnicodeEncodeError with unicode paths when using standard library I/O operation
methods (e.g. os.stat() or os.path.*) which rely on the system or user locale.
More information can be found there: http://seasonofcode.com/posts/unicode-i-o-and-locales-in-python.html
or there: http://robjwells.com/post/61198832297/get-your-us-ascii-out-of-my-face
Do the UNIX double-fork magic for properly detaching process
"""
# no need to check on Windows or when this application is frozen
if sys.platform.startswith("win") or hasattr(sys, "frozen"):
return
language = encoding = None
try:
language, encoding = locale.getlocale()
except ValueError as e:
log.error("Could not determine the current locale: {}".format(e))
if not language and not encoding:
try:
log.warn("Could not find a default locale, switching to C.UTF-8...")
locale.setlocale(locale.LC_ALL, ("C", "UTF-8"))
except locale.Error as e:
log.error("Could not switch to the C.UTF-8 locale: {}".format(e))
raise SystemExit
elif encoding != "UTF-8":
log.warn("Your locale {}.{} encoding is not UTF-8, switching to the UTF-8 version...".format(language, encoding))
try:
locale.setlocale(locale.LC_ALL, (language, "UTF-8"))
except locale.Error as e:
log.error("Could not set an UTF-8 encoding for the {} locale: {}".format(language, e))
raise SystemExit
else:
log.info("Current locale is {}.{}".format(language, encoding))
def parse_arguments(argv, config):
"""
Parse command line arguments and override local configuration
:params args: Array of command line arguments
:params config: ConfigParser with default variable from configuration
"""
defaults = {
"host": config.get("host", "0.0.0.0"),
"port": config.get("port", 8000),
"ssl": config.getboolean("ssl", False),
"certfile": config.get("certfile", ""),
"certkey": config.get("certkey", ""),
"record": config.get("record", ""),
"local": config.getboolean("local", False),
"allow": config.getboolean("allow_remote_console", False),
"quiet": config.getboolean("quiet", False),
"debug": config.getboolean("debug", False),
"live": config.getboolean("live", False),
"logfile": config.getboolean("logfile", ""),
}
parser = argparse.ArgumentParser(description="GNS3 server version {}".format(__version__))
parser.set_defaults(**defaults)
parser.add_argument("-v", "--version", help="show the version", action="version", version=__version__)
parser.add_argument("--host", help="run on the given host/IP address")
parser.add_argument("--port", help="run on the given port", type=int)
parser.add_argument("--ssl", action="store_true", help="run in SSL mode")
parser.add_argument("--certfile", help="SSL cert file")
parser.add_argument("--certkey", help="SSL key file")
parser.add_argument("--record", help="save curl requests into a file")
parser.add_argument("-L", "--local", action="store_true", help="local mode (allows some insecure operations)")
parser.add_argument("-A", "--allow", action="store_true", help="allow remote connections to local console ports")
parser.add_argument("-q", "--quiet", action="store_true", help="do not show logs on stdout")
parser.add_argument("-d", "--debug", action="store_true", help="show debug logs")
parser.add_argument("--live", action="store_true", help="enable code live reload")
parser.add_argument("--shell", action="store_true", help="start a shell inside the server (debugging purpose only you need to install ptpython before)")
parser.add_argument("--log", help="send output to logfile instead of console")
return parser.parse_args(argv)
def set_config(args):
config = Config.instance()
server_config = config.get_section_config("Server")
server_config["local"] = str(args.local)
server_config["allow_remote_console"] = str(args.allow)
server_config["host"] = args.host
server_config["port"] = str(args.port)
server_config["ssl"] = str(args.ssl)
server_config["certfile"] = args.certfile
server_config["certkey"] = args.certkey
server_config["record"] = args.record
server_config["debug"] = str(args.debug)
server_config["live"] = str(args.live)
server_config["shell"] = str(args.shell)
config.set_section_config("Server", server_config)
pid = os.fork()
if pid > 0:
# Exit first parent
sys.exit(0)
except OSError as e:
print("First fork failed: %d (%s)\n" % (e.errno, e.strerror), file=sys.stderr)
sys.exit(1)
# Decouple from parent environment
os.setsid()
os.umask(700)
# Do second fork
try:
pid = os.fork()
if pid > 0:
# Exit from second parent
sys.exit(0)
except OSError as e:
print("Second fork failed: %d (%s)\n" % (e.errno, e.strerror), file=sys.stderr)
sys.exit(1)
def main():
@ -138,53 +57,11 @@ def main():
Entry point for GNS3 server
"""
level = logging.INFO
args = parse_arguments(sys.argv[1:], Config.instance().get_section_config("Server"))
if args.debug:
level = logging.DEBUG
user_log = init_logger(level, logfile=args.log, quiet=args.quiet)
user_log.info("GNS3 server version {}".format(__version__))
current_year = datetime.date.today().year
user_log.info("Copyright (c) 2007-{} GNS3 Technologies Inc.".format(current_year))
for config_file in Config.instance().get_config_files():
user_log.info("Config file {} loaded".format(config_file))
set_config(args)
server_config = Config.instance().get_section_config("Server")
if server_config.getboolean("local"):
log.warning("Local mode is enabled. Beware, clients will have full control on your filesystem")
# we only support Python 3 version >= 3.3
if sys.version_info < (3, 3):
raise RuntimeError("Python 3.3 or higher is required")
user_log.info("Running with Python {major}.{minor}.{micro} and has PID {pid}".format(
major=sys.version_info[0], minor=sys.version_info[1],
micro=sys.version_info[2], pid=os.getpid()))
# check for the correct locale (UNIX/Linux only)
locale_check()
try:
os.getcwd()
except FileNotFoundError:
log.critical("The current working directory doesn't exist")
return
Project.clean_project_directory()
CrashReport.instance()
host = server_config["host"]
port = int(server_config["port"])
server = Server.instance(host, port)
try:
server.run()
except Exception as e:
log.critical("Critical error while running the server: {}".format(e), exc_info=1)
CrashReport.instance().capture_exception()
return
if not sys.platform.startswith("win"):
if "--daemon" in sys.argv:
daemonize()
from gns3server.run import run
run()
if __name__ == '__main__':
main()

@ -407,6 +407,25 @@ class BaseManager:
return os.path.basename(path)
return path
@asyncio.coroutine
def list_images(self):
"""
Return the list of available images for this VM type
:returns: Array of hash
"""
try:
files = os.listdir(self.get_images_directory())
except FileNotFoundError:
return []
files.sort()
images = []
for filename in files:
if filename[0] != ".":
images.append({"filename": filename})
return images
def get_images_directory(self):
"""
Get the image directory on disk

@ -161,6 +161,7 @@ class VirtualBox(BaseManager):
@asyncio.coroutine
def get_list(self):
def list_images(self):
"""
Gets VirtualBox VM list.
"""

@ -0,0 +1,238 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Start the program. Use main.py to load it.
"""
import os
import datetime
import sys
import locale
import argparse
import asyncio
from gns3server.server import Server
from gns3server.web.logger import init_logger
from gns3server.version import __version__
from gns3server.config import Config
from gns3server.modules.project import Project
from gns3server.crash_report import CrashReport
import logging
log = logging.getLogger(__name__)
def locale_check():
"""
Checks if this application runs with a correct locale (i.e. supports UTF-8 encoding) and attempt to fix
if this is not the case.
This is to prevent UnicodeEncodeError with unicode paths when using standard library I/O operation
methods (e.g. os.stat() or os.path.*) which rely on the system or user locale.
More information can be found there: http://seasonofcode.com/posts/unicode-i-o-and-locales-in-python.html
or there: http://robjwells.com/post/61198832297/get-your-us-ascii-out-of-my-face
"""
# no need to check on Windows or when this application is frozen
if sys.platform.startswith("win") or hasattr(sys, "frozen"):
return
language = encoding = None
try:
language, encoding = locale.getlocale()
except ValueError as e:
log.error("Could not determine the current locale: {}".format(e))
if not language and not encoding:
try:
log.warn("Could not find a default locale, switching to C.UTF-8...")
locale.setlocale(locale.LC_ALL, ("C", "UTF-8"))
except locale.Error as e:
log.error("Could not switch to the C.UTF-8 locale: {}".format(e))
raise SystemExit
elif encoding != "UTF-8":
log.warn("Your locale {}.{} encoding is not UTF-8, switching to the UTF-8 version...".format(language, encoding))
try:
locale.setlocale(locale.LC_ALL, (language, "UTF-8"))
except locale.Error as e:
log.error("Could not set an UTF-8 encoding for the {} locale: {}".format(language, e))
raise SystemExit
else:
log.info("Current locale is {}.{}".format(language, encoding))
def parse_arguments(argv, config):
"""
Parse command line arguments and override local configuration
:params args: Array of command line arguments
:params config: ConfigParser with default variable from configuration
"""
defaults = {
"host": config.get("host", "0.0.0.0"),
"port": config.get("port", 8000),
"ssl": config.getboolean("ssl", False),
"certfile": config.get("certfile", ""),
"certkey": config.get("certkey", ""),
"record": config.get("record", ""),
"local": config.getboolean("local", False),
"allow": config.getboolean("allow_remote_console", False),
"quiet": config.getboolean("quiet", False),
"debug": config.getboolean("debug", False),
"live": config.getboolean("live", False),
"logfile": config.getboolean("logfile", ""),
}
parser = argparse.ArgumentParser(description="GNS3 server version {}".format(__version__))
parser.set_defaults(**defaults)
parser.add_argument("-v", "--version", help="show the version", action="version", version=__version__)
parser.add_argument("--host", help="run on the given host/IP address")
parser.add_argument("--port", help="run on the given port", type=int)
parser.add_argument("--ssl", action="store_true", help="run in SSL mode")
parser.add_argument("--certfile", help="SSL cert file")
parser.add_argument("--certkey", help="SSL key file")
parser.add_argument("--record", help="save curl requests into a file")
parser.add_argument("-L", "--local", action="store_true", help="local mode (allows some insecure operations)")
parser.add_argument("-A", "--allow", action="store_true", help="allow remote connections to local console ports")
parser.add_argument("-q", "--quiet", action="store_true", help="do not show logs on stdout")
parser.add_argument("-d", "--debug", action="store_true", help="show debug logs")
parser.add_argument("--live", action="store_true", help="enable code live reload")
parser.add_argument("--shell", action="store_true", help="start a shell inside the server (debugging purpose only you need to install ptpython before)")
parser.add_argument("--log", help="send output to logfile instead of console")
parser.add_argument("--daemon", action="store_true", help="start as a daemon")
parser.add_argument("--pid", help="store process pid")
return parser.parse_args(argv)
def set_config(args):
config = Config.instance()
server_config = config.get_section_config("Server")
server_config["local"] = str(args.local)
server_config["allow_remote_console"] = str(args.allow)
server_config["host"] = args.host
server_config["port"] = str(args.port)
server_config["ssl"] = str(args.ssl)
server_config["certfile"] = args.certfile
server_config["certkey"] = args.certkey
server_config["record"] = args.record
server_config["debug"] = str(args.debug)
server_config["live"] = str(args.live)
server_config["shell"] = str(args.shell)
config.set_section_config("Server", server_config)
def pid_lock(path):
"""
Write the file in a file on the system.
Check if the process is not already running.
"""
if os.path.exists(path):
pid = None
try:
with open(path) as f:
pid = int(f.read())
try:
os.kill(pid, 0) # If the proces is not running kill return an error
except OSError:
pid = None
except OSError as e:
log.critical("Can't open pid file %s: %s", args.pid, str(e))
sys.exit(1)
if pid:
log.critical("GNS3 is already running pid: %d", pid)
sys.exit(1)
try:
with open(path, 'w+') as f:
f.write(str(os.getpid()))
except OSError as e:
log.critical("Can't write pid file %s: %s", args.pid, str(e))
sys.exit(1)
def run():
args = parse_arguments(sys.argv[1:], Config.instance().get_section_config("Server"))
if args.daemon and sys.platform.startswith("win"):
log.critical("Daemon is not supported on Windows")
sys.exit(1)
if args.pid:
pid_lock(args.pid)
level = logging.INFO
if args.debug:
level = logging.DEBUG
user_log = init_logger(level, logfile=args.log, quiet=args.quiet)
user_log.info("GNS3 server version {}".format(__version__))
current_year = datetime.date.today().year
user_log.info("Copyright (c) 2007-{} GNS3 Technologies Inc.".format(current_year))
for config_file in Config.instance().get_config_files():
user_log.info("Config file {} loaded".format(config_file))
set_config(args)
server_config = Config.instance().get_section_config("Server")
if server_config.getboolean("local"):
log.warning("Local mode is enabled. Beware, clients will have full control on your filesystem")
# we only support Python 3 version >= 3.3
if sys.version_info < (3, 3):
raise RuntimeError("Python 3.3 or higher is required")
user_log.info("Running with Python {major}.{minor}.{micro} and has PID {pid}".format(
major=sys.version_info[0], minor=sys.version_info[1],
micro=sys.version_info[2], pid=os.getpid()))
# check for the correct locale (UNIX/Linux only)
locale_check()
try:
os.getcwd()
except FileNotFoundError:
log.critical("The current working directory doesn't exist")
return
Project.clean_project_directory()
CrashReport.instance()
host = server_config["host"]
port = int(server_config["port"])
server = Server.instance(host, port)
try:
server.run()
except Exception as e:
log.critical("Critical error while running the server: {}".format(e), exc_info=1)
CrashReport.instance().capture_exception()
return
if args.pid:
log.info("Remove PID file %s", args.pid)
try:
os.remove(args.pid)
except OSError as e:
log.critical("Can't remove pid file %s: %s", args.pid, str(e))

@ -747,3 +747,21 @@ VM_CONFIGS_SCHEMA = {
"additionalProperties": False,
"required": ["startup_config_content", "private_config_content"]
}
VMS_LIST_SCHEMA = {
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "List available Dynamips images",
"type": "array",
"items": [
{
"type": "object",
"properties": {
"filename": {
"description": "Image filename",
"type": ["string"]
},
},
}
],
"additionalProperties": False,
}

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
FILE_STREAM_SCHEMA = {
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Request validation retrieval of a file stream",
"type": "object",
"properties": {
"location": {
"description": "File path",
"type": ["string"],
"minLength": 1
}
},
"additionalProperties": False,
"required": ["location"]
}

@ -247,3 +247,21 @@ IOU_INITIAL_CONFIG_SCHEMA = {
"additionalProperties": False,
"required": ["content"]
}
IOU_LIST_VMS_SCHEMA = {
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "List available IOU images",
"type": "array",
"items": [
{
"type": "object",
"properties": {
"filename": {
"description": "Image filename",
"type": ["string"]
},
},
}
],
"additionalProperties": False,
}

@ -341,3 +341,21 @@ QEMU_BINARY_LIST_SCHEMA = {
},
"additionalProperties": False,
}
QEMU_LIST_IMAGES_SCHEMA = {
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "List available QEMU images",
"type": "array",
"items": [
{
"type": "object",
"properties": {
"filename": {
"description": "Image filename",
"type": ["string"]
},
},
}
],
"additionalProperties": False,
}

@ -23,5 +23,5 @@
# or negative for a release candidate or beta (after the base version
# number has been incremented)
__version__ = "1.3.3dev1"
__version_info__ = (1, 3, 3, -99)
__version__ = "1.4.0.dev1"
__version_info__ = (1, 4, 0, -99)

@ -5,8 +5,7 @@ start on filesystem or runlevel [2345]
stop on shutdown
script
echo $$ > /var/run/gns3.pid
exec start-stop-daemon --start -c gns3 --exec /usr/local/bin/gns3server --log /var/log/gns3.log
exec start-stop-daemon --start -c gns3 --exec /usr/local/bin/gns3server --log /var/log/gns3.log --pid /var/run/gns3.pid --daemon
end script
pre-start script
@ -14,6 +13,5 @@ pre-start script
end script
pre-stop script
rm /var/run/gns3.pid
echo "[`date`] GNS3 Stopping" >> /var/log/gns3.log
end script

@ -44,7 +44,7 @@ class Query:
def delete(self, path, **kwargs):
return self._fetch("DELETE", path, **kwargs)
def _get_url(self, path, version):
def get_url(self, path, version):
if version is None:
return "http://{}:{}{}".format(self._host, self._port, path)
return "http://{}:{}/v{}{}".format(self._host, self._port, version, path)
@ -62,7 +62,7 @@ class Query:
@asyncio.coroutine
def go(future):
response = yield from aiohttp.request(method, self._get_url(path, api_version), data=body)
response = yield from aiohttp.request(method, self.get_url(path, api_version), data=body)
future.set_result(response)
future = asyncio.Future()
asyncio.async(go(future))

@ -16,6 +16,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest
import os
import stat
from unittest.mock import patch
from tests.utils import asyncio_patch
@ -123,3 +127,22 @@ from tests.utils import asyncio_patch
# assert response.status == 200
# assert response.json["name"] == "test"
# assert response.json["console"] == free_console_port
@pytest.fixture
def fake_dynamips(tmpdir):
"""Create a fake IOU image on disk"""
path = str(tmpdir / "7200.bin")
with open(path, "w+") as f:
f.write('1')
os.chmod(path, stat.S_IREAD)
return path
def test_vms(server, tmpdir, fake_dynamips):
with patch("gns3server.modules.Dynamips.get_images_directory", return_value=str(tmpdir), example=True):
response = server.get("/dynamips/vms")
assert response.status == 200
assert response.json == [{"filename": "7200.bin"}]

@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
This test suite check /files endpoint
"""
import json
import asyncio
import aiohttp
from gns3server.version import __version__
def test_stream(server, tmpdir, loop):
with open(str(tmpdir / "test"), 'w+') as f:
f.write("hello")
def go(future):
query = json.dumps({"location": str(tmpdir / "test")})
headers = {'content-type': 'application/json'}
response = yield from aiohttp.request("GET", server.get_url("/files/stream", 1), data=query, headers=headers)
response.body = yield from response.content.read(5)
with open(str(tmpdir / "test"), 'a') as f:
f.write("world")
response.body += yield from response.content.read(5)
response.close()
future.set_result(response)
future = asyncio.Future()
asyncio.async(go(future))
response = loop.run_until_complete(future)
assert response.status == 200
assert response.body == b'helloworld'
def test_stream_file_not_found(server, tmpdir, loop):
def go(future):
query = json.dumps({"location": str(tmpdir / "test")})
headers = {'content-type': 'application/json'}
response = yield from aiohttp.request("GET", server.get_url("/files/stream", 1), data=query, headers=headers)
response.close()
future.set_result(response)
future = asyncio.Future()
asyncio.async(go(future))
response = loop.run_until_complete(future)
assert response.status == 404

@ -309,3 +309,11 @@ def test_get_initial_config_with_config_file(server, project, vm):
response = server.get("/projects/{project_id}/iou/vms/{vm_id}/initial_config".format(project_id=vm["project_id"], vm_id=vm["vm_id"]), example=True)
assert response.status == 200
assert response.json["content"] == "TEST"
def test_vms(server, vm, tmpdir, fake_iou_bin):
with patch("gns3server.modules.IOU.get_images_directory", return_value=str(tmpdir), example=True):
response = server.get("/iou/vms")
assert response.status == 200
assert response.json == [{"filename": "iou.bin"}]

@ -32,12 +32,32 @@ def fake_qemu_bin():
return bin_path
@pytest.fixture
def fake_qemu_vm(tmpdir):
bin_path = os.path.join(str(tmpdir / "linux.img"))
with open(bin_path, "w+") as f:
f.write("1")
os.chmod(bin_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
return bin_path
@pytest.fixture
def base_params(tmpdir, fake_qemu_bin):
"""Return standard parameters"""
return {"name": "PC TEST 1", "qemu_path": fake_qemu_bin}
@pytest.fixture
def fake_qemu_bin():
bin_path = os.path.join(os.environ["PATH"], "qemu_x42")
with open(bin_path, "w+") as f:
f.write("1")
os.chmod(bin_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
return bin_path
@pytest.fixture
def vm(server, project, base_params):
response = server.post("/projects/{project_id}/qemu/vms".format(project_id=project.id), base_params)
@ -172,3 +192,11 @@ def test_qemu_list_binaries(server, vm):
assert mock.called
assert response.status == 200
assert response.json == ret
def test_vms(server, tmpdir, fake_qemu_vm):
with patch("gns3server.modules.Qemu.get_images_directory", return_value=str(tmpdir), example=True):
response = server.get("/qemu/vms")
assert response.status == 200
assert response.json == [{"filename": "linux.img"}]

@ -40,11 +40,7 @@ def iou(port_manager):
return iou
def test_get_application_id(loop, project, port_manager):
# Cleanup the IOU object
IOU._instance = None
iou = IOU.instance()
iou.port_manager = port_manager
def test_get_application_id(loop, project, iou):
vm1_id = str(uuid.uuid4())
vm2_id = str(uuid.uuid4())
vm3_id = str(uuid.uuid4())
@ -58,11 +54,7 @@ def test_get_application_id(loop, project, port_manager):
assert iou.get_application_id(vm3_id) == 1
def test_get_application_id_multiple_project(loop, port_manager):
# Cleanup the IOU object
IOU._instance = None
iou = IOU.instance()
iou.port_manager = port_manager
def test_get_application_id_multiple_project(loop, iou):
vm1_id = str(uuid.uuid4())
vm2_id = str(uuid.uuid4())
vm3_id = str(uuid.uuid4())
@ -76,11 +68,7 @@ def test_get_application_id_multiple_project(loop, port_manager):
assert iou.get_application_id(vm3_id) == 3
def test_get_application_id_no_id_available(loop, project, port_manager):
# Cleanup the IOU object
IOU._instance = None
iou = IOU.instance()
iou.port_manager = port_manager
def test_get_application_id_no_id_available(loop, project, iou):
with pytest.raises(IOUError):
for i in range(1, 513):
vm_id = str(uuid.uuid4())

@ -22,33 +22,33 @@ from unittest.mock import patch
from gns3server.modules.vpcs import VPCS
from gns3server.modules.qemu import Qemu
from gns3server.modules.iou import IOU
@pytest.fixture(scope="function")
def qemu(port_manager):
Qemu._instance = None
qemu = Qemu.instance()
qemu.port_manager = port_manager
return qemu
def test_create_vm_new_topology(loop, project, port_manager):
def vpcs(port_manager):
VPCS._instance = None
vpcs = VPCS.instance()
vpcs.port_manager = port_manager
return vpcs
@pytest.fixture(scope="function")
def iou(port_manager):
IOU._instance = None
iou = IOU.instance()
iou.port_manager = port_manager
return iou
def test_create_vm_new_topology(loop, project, vpcs):
vm_id = str(uuid.uuid4())
vm = loop.run_until_complete(vpcs.create_vm("PC 1", project.id, vm_id))
assert vm in project.vms
def test_create_twice_same_vm_new_topology(loop, project, port_manager):
def test_create_twice_same_vm_new_topology(loop, project, vpcs):
project._vms = set()
VPCS._instance = None
vpcs = VPCS.instance()
vpcs.port_manager = port_manager
vm_id = str(uuid.uuid4())
vm = loop.run_until_complete(vpcs.create_vm("PC 1", project.id, vm_id, console=2222))
assert vm in project.vms
@ -57,17 +57,13 @@ def test_create_twice_same_vm_new_topology(loop, project, port_manager):
assert len(project.vms) == 1
def test_create_vm_new_topology_without_uuid(loop, project, port_manager):
VPCS._instance = None
vpcs = VPCS.instance()
vpcs.port_manager = port_manager
def test_create_vm_new_topology_without_uuid(loop, project, vpcs):
vm = loop.run_until_complete(vpcs.create_vm("PC 1", project.id, None))
assert vm in project.vms
assert len(vm.id) == 36
def test_create_vm_old_topology(loop, project, tmpdir, port_manager):
def test_create_vm_old_topology(loop, project, tmpdir, vpcs):
with patch("gns3server.modules.project.Project.is_local", return_value=True):
# Create an old topology directory
@ -79,9 +75,6 @@ def test_create_vm_old_topology(loop, project, tmpdir, port_manager):
with open(os.path.join(vm_dir, "startup.vpc"), "w+") as f:
f.write("1")
VPCS._instance = None
vpcs = VPCS.instance()
vpcs.port_manager = port_manager
vm_id = 1
vm = loop.run_until_complete(vpcs.create_vm("PC 1", project.id, vm_id))
assert len(vm.id) == 36
@ -93,31 +86,55 @@ def test_create_vm_old_topology(loop, project, tmpdir, port_manager):
assert f.read() == "1"
def test_get_abs_image_path(qemu, tmpdir):
os.makedirs(str(tmpdir / "QEMU"))
def test_get_abs_image_path(iou, tmpdir):
os.makedirs(str(tmpdir / "IOU"))
path1 = str(tmpdir / "test1.bin")
open(path1, 'w+').close()
path2 = str(tmpdir / "QEMU" / "test2.bin")
path2 = str(tmpdir / "IOU" / "test2.bin")
open(path2, 'w+').close()
with patch("gns3server.config.Config.get_section_config", return_value={"images_path": str(tmpdir)}):
assert qemu.get_abs_image_path(path1) == path1
assert qemu.get_abs_image_path(path2) == path2
assert qemu.get_abs_image_path("test2.bin") == path2
assert qemu.get_abs_image_path("../test1.bin") == path1
assert iou.get_abs_image_path(path1) == path1
assert iou.get_abs_image_path(path2) == path2
assert iou.get_abs_image_path("test2.bin") == path2
assert iou.get_abs_image_path("../test1.bin") == path1
def test_get_relative_image_path(qemu, tmpdir):
os.makedirs(str(tmpdir / "QEMU"))
def test_get_relative_image_path(iou, tmpdir):
os.makedirs(str(tmpdir / "IOU"))
path1 = str(tmpdir / "test1.bin")
open(path1, 'w+').close()
path2 = str(tmpdir / "QEMU" / "test2.bin")
path2 = str(tmpdir / "IOU" / "test2.bin")
open(path2, 'w+').close()
with patch("gns3server.config.Config.get_section_config", return_value={"images_path": str(tmpdir)}):
assert qemu.get_relative_image_path(path1) == path1
assert qemu.get_relative_image_path(path2) == "test2.bin"
assert qemu.get_relative_image_path("test2.bin") == "test2.bin"
assert qemu.get_relative_image_path("../test1.bin") == path1
assert iou.get_relative_image_path(path1) == path1
assert iou.get_relative_image_path(path2) == "test2.bin"
assert iou.get_relative_image_path("test2.bin") == "test2.bin"
assert iou.get_relative_image_path("../test1.bin") == path1
def test_list_images(loop, iou, tmpdir):
fake_images = ["a.bin", "b.bin", ".blu.bin"]
for image in fake_images:
with open(str(tmpdir / image), "w+") as f:
f.write("1")
with patch("gns3server.modules.IOU.get_images_directory", return_value=str(tmpdir)):
assert loop.run_until_complete(iou.list_images()) == [
{"filename": "a.bin"},
{"filename": "b.bin"}
]
def test_list_images_empty(loop, iou, tmpdir):
with patch("gns3server.modules.IOU.get_images_directory", return_value=str(tmpdir)):
assert loop.run_until_complete(iou.list_images()) == []
def test_list_images_directory_not_exist(loop, iou):
with patch("gns3server.modules.IOU.get_images_directory", return_value="/bla"):
assert loop.run_until_complete(iou.list_images()) == []

Loading…
Cancel
Save