diff --git a/.travis.yml b/.travis.yml index 07eee96b..972b3545 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,7 @@ language: python python: - - "2.6" - "2.7" - - "pypy" - "3.3" install: diff --git a/dev-requirements.txt b/dev-requirements.txt new file mode 100644 index 00000000..efb4e7a0 --- /dev/null +++ b/dev-requirements.txt @@ -0,0 +1,4 @@ +-rrequirements.txt + +pytest +ws4py diff --git a/gns3server/__init__.py b/gns3server/__init__.py index d57df556..ae8e5ef2 100644 --- a/gns3server/__init__.py +++ b/gns3server/__init__.py @@ -23,8 +23,6 @@ # or negative for a release candidate or beta (after the base version # number has been incremented) -from gns3server.plugin_manager import PluginManager -from gns3server.server import Server - -__version__ = "0.1.dev" -__version_info__ = (0, 1, 0, -99) +from .module_manager import ModuleManager +from .server import Server +from .version import __version__ diff --git a/gns3server/main.py b/gns3server/main.py index bbd57346..5f176883 100644 --- a/gns3server/main.py +++ b/gns3server/main.py @@ -24,10 +24,15 @@ import tornado.options # command line options from tornado.options import define +define("host", default="127.0.0.1", help="run on the given host/IP address", type=str) define("port", default=8000, help="run on the given port", type=int) +define("ipc", default=False, help="use IPC for module communication", type=bool) def main(): + """ + Entry point for GNS3 server + """ current_year = datetime.date.today().year print("GNS3 server version {}".format(gns3server.__version__)) @@ -45,13 +50,15 @@ def main(): tornado.options.print_help() raise SystemExit - #FIXME: log everything for now (excepting DEBUG) + # FIXME: log everything for now (excepting DEBUG) logging.basicConfig(level=logging.INFO) - server = gns3server.Server() - server.load_plugins() + from tornado.options import options + server = gns3server.Server(options.host, + options.port, + ipc=options.ipc) + server.load_modules() server.run() - if __name__ == '__main__': main() diff --git a/gns3server/module_manager.py b/gns3server/module_manager.py new file mode 100644 index 00000000..d3ce8a21 --- /dev/null +++ b/gns3server/module_manager.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 GNS3 Technologies Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import imp +import inspect +import pkgutil +from .modules import IModule + +import logging +log = logging.getLogger(__name__) + + +class Module(object): + """ + Module representation for the module manager + + :param name: module name + :param cls: module class to be instantiated when + the module is activated + """ + + def __init__(self, name, cls): + + self._name = name + self._cls = cls + + @property + def name(self): + + return self._name + + @name.setter + def name(self, new_name): + self._name = new_name + + #@property + def cls(self): + return self._cls + + +class ModuleManager(object): + """ + Manages modules + + :param module_paths: path from where module are loaded + """ + + def __init__(self, module_paths=['modules']): + + self._modules = [] + self._module_paths = module_paths + + def load_modules(self): + """ + Finds all the possible modules (classes with IModule as a parent) + """ + + for _, name, ispkg in pkgutil.iter_modules(self._module_paths): + if (ispkg): + log.debug("analyzing {} package directory".format(name)) + try: + file, pathname, description = imp.find_module(name, self._module_paths) + module = imp.load_module(name, file, pathname, description) + classes = inspect.getmembers(module, inspect.isclass) + for module_class in classes: + if issubclass(module_class[1], IModule): + # make sure the module class has IModule as a parent + if module_class[1].__module__ == name: + log.info("found and loading {} module".format(module_class[0].lower())) + info = Module(name=module_class[0].lower(), cls=module_class[1]) + self._modules.append(info) + except: + log.warning("error while analyzing {} package directory".format(name)) + finally: + if file: + file.close() + + def get_all_modules(self): + """ + Returns all modules. + + :return: list of Module objects + """ + + return self._modules + + def activate_module(self, module, args=(), kwargs={}): + """ + Activates a given module. + + :param module: module to activate (Module object) + :param args: args passed to the module + :param kwargs: kwargs passed to the module + :return: instantiated module class + """ + + module_class = module.cls() + module_instance = module_class(name=module.name, args=args, kwargs={}) + log.info("activating {} module".format(module.name)) + return module_instance diff --git a/gns3server/plugins/__init__.py b/gns3server/modules/__init__.py similarity index 94% rename from gns3server/plugins/__init__.py rename to gns3server/modules/__init__.py index c76c9983..6447293d 100644 --- a/gns3server/plugins/__init__.py +++ b/gns3server/modules/__init__.py @@ -15,4 +15,4 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from gns3server.plugins.base import IPlugin +from .base import IModule diff --git a/gns3server/modules/base.py b/gns3server/modules/base.py new file mode 100644 index 00000000..25233ab5 --- /dev/null +++ b/gns3server/modules/base.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 GNS3 Technologies Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import multiprocessing +import zmq + +import logging +log = logging.getLogger(__name__) + + +class IModule(multiprocessing.Process): + """ + Module interface + """ + + destination = {} + + def __init__(self, name=None, args=(), kwargs={}): + + multiprocessing.Process.__init__(self, + name=name, + args=args, + kwargs=kwargs) + + self._context = None + self._ioloop = None + self._stream = None + self._host = args[0] + self._port = args[1] + self._current_session = None + self._current_destination = None + + def setup(self): + """ + Sets up PyZMQ and creates the stream to handle requests + """ + + self._context = zmq.Context() + self._ioloop = zmq.eventloop.ioloop.IOLoop.instance() + self._stream = self.create_stream(self._host, self._port, self.decode_request) + + def create_stream(self, host=None, port=0, callback=None): + """ + Creates a new ZMQ stream + """ + + socket = self._context.socket(zmq.DEALER) + socket.setsockopt(zmq.IDENTITY, self.name.encode("utf-8")) + if host and port: + log.info("ZeroMQ client ({}) connecting to {}:{}".format(self.name, host, port)) + try: + socket.connect("tcp://{}:{}".format(host, port)) + except zmq.error.ZMQError as e: + log.critical("Could not connect to ZeroMQ server on {}:{}, reason: {}".format(host, port, e)) + raise SystemExit + else: + log.info("ZeroMQ client ({}) connecting to ipc:///tmp/gns3.ipc".format(self.name)) + try: + socket.connect("ipc:///tmp/gns3.ipc") + except zmq.error.ZMQError as e: + log.critical("Could not connect to ZeroMQ server on ipc:///tmp/gns3.ipc, reason: {}".format(e)) + raise SystemExit + + stream = zmq.eventloop.zmqstream.ZMQStream(socket, self._ioloop) + if callback: + stream.on_recv(callback) + return stream + + def run(self): + """ + Sets up everything and starts the event loop + """ + + self.setup() + try: + self._ioloop.start() + except KeyboardInterrupt: + return + + def stop(self): + """ + Stops the event loop + """ + + #zmq.eventloop.ioloop.IOLoop.instance().stop() + self._ioloop.stop() + + def send_response(self, response): + """ + Sends a response back to the requester + """ + + # add session and destination to the response + response = [self._current_session, self._current_destination, response] + log.debug("ZeroMQ client ({}) sending: {}".format(self.name, response)) + self._stream.send_json(response) + + def decode_request(self, request): + """ + Decodes the request to JSON + """ + + try: + request = zmq.utils.jsonapi.loads(request[0]) + except ValueError: + self.send_response("ValueError") # FIXME: explicit json error + return + + log.debug("ZeroMQ client ({}) received: {}".format(self.name, request)) + self._current_session = request[0] + self._current_destination = request[1] + + if self._current_destination not in self.destination: + # FIXME: return error if destination not found! + return + log.debug("Routing request to {}: {}".format(self._current_destination, request[2])) + self.destination[self._current_destination](self, request[2]) + + def destinations(self): + """ + Channels handled by this modules. + """ + + return self.destination.keys() + + @classmethod + def route(cls, destination): + """ + Decorator to register a destination routed to a method + """ + + def wrapper(method): + cls.destination[destination] = method + return method + return wrapper diff --git a/gns3server/plugin_manager.py b/gns3server/plugin_manager.py deleted file mode 100644 index f0f582f4..00000000 --- a/gns3server/plugin_manager.py +++ /dev/null @@ -1,87 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2013 GNS3 Technologies Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import imp -import inspect -import pkgutil -import logging -from gns3server.plugins import IPlugin - -logger = logging.getLogger(__name__) - - -class Plugin(object): - """Plugin representation for the PluginManager - """ - - def __init__(self, name, cls): - - self._name = name - self._cls = cls - - @property - def name(self): - - return self._name - - @name.setter - def name(self, new_name): - self._name = new_name - - #@property - def cls(self): - return self._cls - - -class PluginManager(object): - """Manages plugins - """ - - def __init__(self, plugin_paths=['plugins']): - - self._plugins = [] - self._plugin_paths = plugin_paths - - def load_plugins(self): - - for _, name, ispkg in pkgutil.iter_modules(self._plugin_paths): - if (ispkg): - logger.info("analyzing '{}' package".format(name)) - try: - file, pathname, description = imp.find_module(name, self._plugin_paths) - plugin_module = imp.load_module(name, file, pathname, description) - plugin_classes = inspect.getmembers(plugin_module, inspect.isclass) - for plugin_class in plugin_classes: - if issubclass(plugin_class[1], IPlugin): - # don't instantiate any parent plugins - if plugin_class[1].__module__ == name: - logger.info("loading '{}' plugin".format(plugin_class[0])) - info = Plugin(name=plugin_class[0], cls=plugin_class[1]) - self._plugins.append(info) - finally: - if file: - file.close() - - def get_all_plugins(self): - return self._plugins - - def activate_plugin(self, plugin): - - plugin_class = plugin.cls() - plugin_instance = plugin_class() - logger.info("'{}' plugin activated".format(plugin.name)) - return plugin_instance diff --git a/gns3server/server.py b/gns3server/server.py index c46417ee..594e6d87 100644 --- a/gns3server/server.py +++ b/gns3server/server.py @@ -15,66 +15,142 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import logging +import zmq +from zmq.eventloop import ioloop, zmqstream +ioloop.install() + +import os +import functools import socket import tornado.ioloop import tornado.web -import gns3server - -logger = logging.getLogger(__name__) +import tornado.autoreload +from .version import __version__ +from .stomp_websocket import StompWebSocket +from .module_manager import ModuleManager - -class MainHandler(tornado.web.RequestHandler): - - def get(self): - self.write("Welcome to the GNS3 server!") +import logging +log = logging.getLogger(__name__) class VersionHandler(tornado.web.RequestHandler): def get(self): - response = {'version': gns3server.__version__} + response = {'version': __version__} self.write(response) class Server(object): # built-in handlers - handlers = [(r"/", MainHandler), - (r"/version", VersionHandler)] + handlers = [(r"/version", VersionHandler)] - def __init__(self): + def __init__(self, host, port, ipc=False): - self._plugins = [] + self._host = host + self._port = port + if ipc: + self._zmq_port = 0 # this forces module to use IPC for communications + else: + self._zmq_port = port + 1 # this server port + 1 + self._ipc = ipc + self._modules = [] - def load_plugins(self): + def load_modules(self): """Loads the plugins """ - plugin_manager = gns3server.PluginManager() - plugin_manager.load_plugins() - for plugin in plugin_manager.get_all_plugins(): - instance = plugin_manager.activate_plugin(plugin) - self._plugins.append(instance) - plugin_handlers = instance.handlers() - self.handlers.extend(plugin_handlers) + cwd = os.path.dirname(os.path.abspath(__file__)) + module_path = os.path.join(cwd, 'modules') + log.info("loading modules from {}".format(module_path)) + module_manager = ModuleManager([module_path]) + module_manager.load_modules() + for module in module_manager.get_all_modules(): + instance = module_manager.activate_module(module, ("127.0.0.1", self._zmq_port)) + self._modules.append(instance) + destinations = instance.destinations() + for destination in destinations: + StompWebSocket.register_destination(destination, module.name) + instance.start() # starts the new process def run(self): - """Starts the tornado web server + """ + Starts the Tornado web server and ZeroMQ server """ - from tornado.options import options - tornado_app = tornado.web.Application(self.handlers) + router = self._create_zmq_router() + # Add our Stomp Websocket handler to Tornado + self.handlers.extend([(r"/", StompWebSocket, dict(zmq_router=router))]) + tornado_app = tornado.web.Application(self.handlers, debug=True) # FIXME: debug mode! try: - port = options.port - print("Starting server on port {}".format(port)) - tornado_app.listen(port) + print("Starting server on port {}".format(self._port)) + tornado_app.listen(self._port) except socket.error as e: if e.errno is 48: # socket already in use - logging.critical("socket in use for port {}".format(port)) + logging.critical("socket in use for port {}".format(self._port)) raise SystemExit + + ioloop = tornado.ioloop.IOLoop.instance() + stream = zmqstream.ZMQStream(router, ioloop) + stream.on_recv(StompWebSocket.dispatch_message) + tornado.autoreload.add_reload_hook(functools.partial(self._cleanup, stop=False)) + try: - tornado.ioloop.IOLoop.instance().start() + ioloop.start() except (KeyboardInterrupt, SystemExit): print("\nExiting...") - tornado.ioloop.IOLoop.instance().stop() + self._cleanup() + + def _create_zmq_router(self): + """ + Creates the ZeroMQ router socket to send + requests to modules. + + :returns: ZeroMQ socket + """ + + context = zmq.Context() + context.linger = 0 + router = context.socket(zmq.ROUTER) + if self._ipc: + try: + router.bind("ipc:///tmp/gns3.ipc") + except zmq.error.ZMQError as e: + log.critical("Could not start ZeroMQ server on ipc:///tmp/gns3.ipc, reason: {}".format(e)) + self._cleanup() + raise SystemExit + log.info("ZeroMQ server listening to ipc:///tmp/gns3.ipc") + else: + try: + router.bind("tcp://127.0.0.1:{}".format(self._zmq_port)) + except zmq.error.ZMQError as e: + log.critical("Could not start ZeroMQ server on 127.0.0.1:{}, reason: {}".format(self._zmq_port, e)) + self._cleanup() + raise SystemExit + log.info("ZeroMQ server listening to 127.0.0.1:{}".format(self._zmq_port)) + return router + + def _cleanup(self, stop=True): + """ + Shutdowns running module processes + and close remaining Tornado ioloop file descriptors + + :param stop: Stop the ioloop if True (default) + """ + + # terminate all modules + for module in self._modules: + log.info("terminating {}".format(module.name)) + module.terminate() + module.join(timeout=1) + + ioloop = tornado.ioloop.IOLoop.instance() + # close any fd that would have remained open... + for fd in ioloop._handlers.keys(): + try: + os.close(fd) + except Exception: + pass + + if stop: + ioloop.stop() diff --git a/tests/__init__.py b/gns3server/stomp/__init__.py similarity index 100% rename from tests/__init__.py rename to gns3server/stomp/__init__.py diff --git a/gns3server/stomp/frame.py b/gns3server/stomp/frame.py new file mode 100644 index 00000000..61b86124 --- /dev/null +++ b/gns3server/stomp/frame.py @@ -0,0 +1,159 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 GNS3 Technologies Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +STOMP frame representation, decoding and encoding +http://stomp.github.io/stomp-specification-1.2.html +Adapted from Jason R. Briggs's code +https://github.com/jasonrbriggs/stomp.py +""" + +import re +from .utils import encode + + +class Frame(object): + """ + A STOMP frame. Comprises a command, the headers and the body content. + """ + + # Used to parse STOMP header lines in the format "key:value", + HEADER_LINE_RE = re.compile('(?P[^:]+)[:](?P.*)') + # As of STOMP 1.2, lines can end with either line feed, or carriage return plus line feed. + PREAMBLE_END_RE = re.compile('\n\n|\r\n\r\n') + # As of STOMP 1.2, lines can end with either line feed, or carriage return plus line feed. + LINE_END_RE = re.compile('\n|\r\n') + # NULL value + NULL = b'\x00' + + def __init__(self, cmd=None, headers={}, body=None): + self._cmd = cmd + self._headers = headers + self._body = body + + @property + def cmd(self): + + return(self._cmd) + + @cmd.setter + def cmd(self, cmd): + + self._cmd = cmd + + @property + def headers(self): + + return(self._headers) + + @headers.setter + def headers(self, headers): + + self._headers = headers + + @property + def body(self): + + return(self._body) + + @body.setter + def body(self, body): + + self._body = body + + def encode(self): + """ + Encodes this frame to be send on the wire + """ + + lines = [] + if self._cmd: + lines.append(self._cmd) + lines.append("\n") + for key, vals in sorted(self._headers.items()): + if type(vals) != tuple: + vals = (vals,) + for val in vals: + lines.append("%s:%s\n" % (key, val)) + lines.append("\n") + if self._body: + lines.append(self._body) + + if self._cmd: + lines.append(self.NULL) + + encoded_lines = (encode(line) for line in lines) + return b''.join(encoded_lines) + + @classmethod + def parse_headers(cls, lines, offset=0): + """ + Parses frame headers + + :param lines: Frame preamble lines + :param offset: To start parsing at the given offset + :returns: Headers in dict header:value + """ + + headers = {} + for header_line in lines[offset:]: + header_match = cls.HEADER_LINE_RE.match(header_line) + if header_match: + key = header_match.group('key') + if key not in headers: + headers[key] = header_match.group('value') + return headers + + @classmethod + def parse_frame(cls, frame): + """ + Parses a frame + + :params frame: The frame data to be parsed + :returns: STOMP Frame object + """ + + f = Frame() + # End-of-line (EOL) indicates an heart beat frame + if frame == '\x0a': + f.cmd = 'heartbeat' # This will have the frame ignored + return f + + mat = cls.PREAMBLE_END_RE.search(frame) + preamble_end = -1 + if mat: + preamble_end = mat.start() + if preamble_end == -1: + preamble_end = len(frame) + preamble = frame[0:preamble_end] + preamble_lines = cls.LINE_END_RE.split(preamble) + f.body = frame[preamble_end + 2:] + if f.body[-1] == '\x00': + f.body = f.body[:-1] + + # Skip any leading newlines + first_line = 0 + while first_line < len(preamble_lines) and len(preamble_lines[first_line]) == 0: + first_line += 1 + + # Extract frame type/command + f.cmd = preamble_lines[first_line] + + # Put headers into a key/value map + f.headers = cls.parse_headers(preamble_lines, first_line + 1) + + return f diff --git a/gns3server/stomp/protocol.py b/gns3server/stomp/protocol.py new file mode 100644 index 00000000..ad2db09d --- /dev/null +++ b/gns3server/stomp/protocol.py @@ -0,0 +1,227 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 GNS3 Technologies Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Basic STOMP 1.2 protocol implementation +http://stomp.github.io/stomp-specification-1.2.html +""" + +import uuid +from .frame import Frame +from .utils import encode, hasbyte + +# Commands server-side +CMD_CONNECTED = 'CONNECTED' +CMD_ERROR = 'ERROR' +CMD_MESSAGE = 'MESSAGE' +CMD_RECEIPT = 'RECEIPT' + +# Commands client-side +CMD_STOMP = 'STOMP' +CMD_CONNECT = 'CONNECT' +CMD_DISCONNECT = 'DISCONNECT' +CMD_SEND = 'SEND' + +# Commands not supported +CMD_SUBSCRIBE = 'SUBSCRIBE' +CMD_UNSUBSCRIBE = 'UNSUBSCRIBE' +CMD_ACK = 'ACK' +CMD_NACK = 'NACK' +CMD_BEGIN = 'BEGIN' +CMD_ABORT = 'ABORT' + +# Headers +HDR_VERSION = 'version' +HDR_SESSION = 'session' +HDR_SERVER = 'server' +HDR_CONTENT_TYPE = 'content-type' +HDR_CONTENT_LENGTH = 'content-length' +HDR_RECEIPT_ID = 'receipt-id' +HDR_MESSAGE = 'message' +HDR_MESSAGE_ID = 'message-id' +HDR_ACCEPT_VERSION = 'accept-version' +HDR_HOST = 'host' +HDR_DESTINATION = 'destination' +HDR_RECEIPT = 'receipt' + +# Headers not supported +HDR_HEARTBEAT = 'heart-beat' +HDR_LOGIN = 'login' +HDR_PASSCODE = 'passcode' +HDR_ID = 'id' +HDR_ACK = 'ack' +HDR_SUBSCRIPTION = 'subscription' +HDR_TRANSACTION = 'transaction' + + +class serverProtocol(object): + """ + STOMP 1.2 protocol support for servers. + """ + + def __init__(self): + + # STOMP protocol version + self.version = 1.2 + + def connected(self, session=None, server=None): + """ + Replies to the CONNECT or STOMP command. + Heart-beat header is not supported. + + :param session: A session identifier that uniquely identifies the session. + :param server: A field that contains information about the STOMP server. + :returns: STOMP Frame object + """ + + # Version header is required + headers = {HDR_VERSION: self.version} + + if session: + headers[HDR_SESSION] = session + + # The server-name field consists of a name token followed by an + # optional version number token. Example: Apache/1.3.9 + if server: + headers[HDR_SERVER] = server + + return Frame(CMD_CONNECTED, headers).encode() + + def message(self, destination, body, content_type=None, message_id=str(uuid.uuid4())): + """ + Sends a message to a STOMP client. + + :param destination: Destination string + :param body: Data to be added in the frame body + :param content_type: MIME type which describes the format of the body + :param message_id: Unique identifier for that message + :returns: STOMP Frame object + """ + + # Destination and message id headers are required + headers = {HDR_DESTINATION: destination, + HDR_MESSAGE_ID: message_id} + + # Subscription is required but not implemented on this server + headers[HDR_SUBSCRIPTION] = 0 + + if content_type: + headers[HDR_CONTENT_TYPE] = content_type + + body = encode(body) + if HDR_CONTENT_LENGTH not in headers and hasbyte(0, body): + headers[HDR_CONTENT_LENGTH] = len(body) + + return Frame(CMD_MESSAGE, headers, body).encode() + + def receipt(self, receipt_id): + """ + Sends an acknowledgment for client frame that requests a receipt. + + :param receipt_id: Receipt ID to send back to the client + :returns: STOMP Frame object + """ + + # Receipt ID header is required (the same sent in the client frame) + headers = {HDR_RECEIPT_ID: receipt_id} + return Frame(CMD_RECEIPT, headers).encode() + + def error(self, message='', body='', content_type=None): + """ + Sends an error to the client if something goes wrong. + + :param message: Short description of the error + :param body: Detailed information + :param content_type: MIME type which describes the format of the body + :returns: STOMP Frame object + """ + + headers = {} + if message: + headers[HDR_MESSAGE] = message + + if body: + body = encode(body) + if HDR_CONTENT_LENGTH not in headers and hasbyte(0, body): + headers[HDR_CONTENT_LENGTH] = len(body) + if content_type: + headers[HDR_CONTENT_TYPE] = content_type + + return Frame(CMD_ERROR, headers, body).encode() + + +class clientProtocol(object): + """ + STOMP 1.2 protocol support for clients. + """ + + def connect(self, host, accept_version='1.2'): + """ + Connects to a STOMP server. + Heart-beat, login and passcode headers are not supported. + + :param host: Host name that the socket was established against. + :param accept_version: The versions of the STOMP protocol the client supports. + :returns: STOMP Frame object + """ + + # Currently only STOMP 1.2 is supported (required header) + headers = {HDR_ACCEPT_VERSION: accept_version} + + if host: + headers[HDR_HOST] = host + + # The STOMP command is not backward compatible with STOMP 1.0 servers. + # Clients that use the STOMP frame instead of the CONNECT frame will + # only be able to connect to STOMP 1.2 servers (as well as some STOMP 1.1 servers. + return Frame(CMD_STOMP, headers).encode() + + def disconnect(self, receipt=str(uuid.uuid4())): + """ + Disconnects to a STOMP server. + + :param receipt: unique identifier + :returns: STOMP Frame object + """ + + # Receipt header is required + headers = {HDR_RECEIPT: receipt} + return Frame(CMD_DISCONNECT, headers).encode() + + def send(self, destination, body, content_type=None): + """ + Sends a message to a destination in the messaging system. + Transaction header is not supported. + User defined headers are not supported too (against the protocol specification) + + :param destination: Destination string + :param body: Data to be added in the frame body + :param content_type: MIME type which describes the format of the body + :returns: STOMP Frame object + """ + + # Destination header is required + headers = {HDR_DESTINATION: destination} + + if content_type: + headers[HDR_CONTENT_TYPE] = content_type + + body = encode(body) + if HDR_CONTENT_LENGTH not in headers and hasbyte(0, body): + headers[HDR_CONTENT_LENGTH] = len(body) + + return Frame(CMD_SEND, headers, body).encode() diff --git a/gns3server/plugins/dynamips/__init__.py b/gns3server/stomp/utils.py similarity index 54% rename from gns3server/plugins/dynamips/__init__.py rename to gns3server/stomp/utils.py index 3d667e39..8dd1b7e4 100644 --- a/gns3server/plugins/dynamips/__init__.py +++ b/gns3server/stomp/utils.py @@ -15,28 +15,29 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import logging -import tornado.web -from gns3server.plugins import IPlugin - -logger = logging.getLogger(__name__) - - -class TestHandler(tornado.web.RequestHandler): - def get(self): - self.write("This is my test handler") - - -class Dynamips(IPlugin): - - def __init__(self): - IPlugin.__init__(self) - logger.info("Dynamips plugin is initializing") - - def handlers(self): - """Returns tornado web request handlers that the plugin manages - - :returns: List of tornado.web.RequestHandler - """ - - return [(r"/test", TestHandler)] +""" +Utilitary functions for STOMP implementation +""" + +import sys + +PY2 = sys.version_info[0] == 2 + +if not PY2: + def encode(char_data): + if type(char_data) is str: + return char_data.encode() + elif type(char_data) is bytes: + return char_data + else: + raise TypeError('message should be a string or bytes') +else: + def encode(char_data): + if type(char_data) is unicode: + return char_data.encode('utf-8') + else: + return char_data + + +def hasbyte(byte, byte_data): + return bytes([byte]) in byte_data diff --git a/gns3server/stomp_websocket.py b/gns3server/stomp_websocket.py new file mode 100644 index 00000000..a64ade86 --- /dev/null +++ b/gns3server/stomp_websocket.py @@ -0,0 +1,230 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 GNS3 Technologies Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +STOMP protocol over Websockets +""" + +import zmq +import uuid +import tornado.websocket +from .version import __version__ +from tornado.escape import json_decode +from .stomp import frame as stomp_frame +from .stomp import protocol as stomp_protocol + +import logging +log = logging.getLogger(__name__) + + +class StompWebSocket(tornado.websocket.WebSocketHandler): + """ + STOMP protocol over Tornado Websockets with message + routing to ZeroMQ dealer clients. + + :param application: Tornado Application object + :param request: Tornado Request object + :param zmq_router: ZeroMQ router socket + """ + + clients = set() + destinations = {} + stomp = stomp_protocol.serverProtocol() + + def __init__(self, application, request, zmq_router): + tornado.websocket.WebSocketHandler.__init__(self, application, request) + self._session_id = str(uuid.uuid4()) + self.zmq_router = zmq_router + + @property + def session_id(self): + """ + Session ID uniquely representing a Websocket client + """ + + return self._session_id + + @classmethod + def dispatch_message(cls, message): + """ + Sends a message to Websocket client + + :param message: message from a module + """ + + # Module name that is replying + module = message[0].decode("utf-8") + + # ZMQ requests are encoded in JSON + # format is a JSON array: [session ID, destination, JSON dict] + json_message = json_decode(message[1]) + session_id = json_message[0] + destination = json_message[1] + content = json_message[2] + + log.debug("Received message from module {}: {}".format(module, + json_message)) + + stomp_msg = cls.stomp.message(destination, + content, + "application/json") + for client in cls.clients: + if client.session_id == session_id: + client.write_message(stomp_msg) + + @classmethod + def register_destination(cls, destination, module): + """ + Registers a destination handled by a module. + Used to route requests to the right module. + + :param destination: destination string + :param module: module string + """ + + # Make sure the destination is not already registered + # by another module for instance + assert destination not in cls.destinations + log.info("registering {} as a destination for {}".format(destination, + module)) + cls.destinations[destination] = module + + def stomp_handle_connect(self, frame): + """ + Handles a STOMP CONNECT frame and returns a STOMP CONNECTED frame. + + :param frame: received STOMP CONNECT frame (object) + """ + + if not stomp_protocol.HDR_ACCEPT_VERSION in frame.headers or \ + not str(self.stomp.version) in frame.headers[stomp_protocol.HDR_ACCEPT_VERSION]: + self.stomp_error("STOMP version error", + "Supported protocol version is {}".format(self.stomp.version),) + else: + self.write_message(self.stomp.connected(self.session_id, + 'gns3server/' + __version__)) + + def stomp_handle_send(self, frame): + """ + Handles a STOMP SEND frame and dispatches it to the right module + based on the destination. + + :param frame: received STOMP SEND frame (object) + """ + + if stomp_protocol.HDR_DESTINATION not in frame.headers: + self.stomp_error("No destination header in SEND frame") + return + + destination = frame.headers[stomp_protocol.HDR_DESTINATION] + if not destination: + self.stomp_error("Destination header is empty in SEND frame") + return + + if destination not in self.destinations: + self.stomp_error("Destination {} doesn't exist".format(destination)) + return + + if not frame.body: + self.stomp_error("SEND frame has no body") + return + + module = self.destinations[destination] + # ZMQ requests are encoded in JSON + # format is a JSON array: [session ID, destination, JSON dict] + zmq_request = [self.session_id, destination, frame.body] + # Route to the correct module + self.zmq_router.send_string(module, zmq.SNDMORE) + # Send the encoded JSON request + self.zmq_router.send_json(zmq_request) + + def stomp_handle_disconnect(self, frame): + """ + Sends an STOMP RECEIPT frame back to the client when receiving a disconnection + request and close the connection. + + :param frame: received STOMP DISCONNECT frame (object) + """ + + if stomp_protocol.HDR_RECEIPT not in frame.headers: + self.stomp_error("No receipt header in DISCONNECT frame") + return + + receipt = self.stomp.receipt(frame.headers[stomp_protocol.HDR_RECEIPT]) + self.write_message(receipt) + self.close() + log.info("Websocket client {} gracefully disconnected".format(self.session_id)) + self.clients.remove(self) + + def stomp_error(self, short_description='', detailed_info='', content_type="text/plain"): + """ + Sends an STOMP error message back to the client and close the connection. + + :param short_description: short description of the error + :param detailed_info: detailed description of the error + :param content_type: MIME type which describes the format of the detailed info + """ + + error = self.stomp.error(short_description, detailed_info, content_type) + self.write_message(error) + self.close() + log.warning("Websocket client {} disconnected on an error: {}".format(self.session_id, + short_description)) + self.clients.remove(self) + + def open(self): + """ + Invoked when a new WebSocket is opened. + """ + + log.info("Websocket client {} connected".format(self.session_id)) + self.clients.add(self) + + def on_message(self, message): + """ + Handles incoming messages. + + :param message: message received over the Websocket + """ + + log.debug("Received Websocket message: {}".format(message)) + + try: + frame = stomp_frame.Frame.parse_frame(message) + except Exception: + self.stomp_error("Malformed STOMP frame") + return + + if frame.cmd == stomp_protocol.CMD_STOMP or frame.cmd == stomp_protocol.CMD_CONNECT: + self.stomp_handle_connect(frame) + + elif frame.cmd == stomp_protocol.CMD_SEND: + self.stomp_handle_send(frame) + + elif frame.cmd == stomp_protocol.CMD_DISCONNECT: + self.stomp_handle_disconnect(frame) + + else: + self.stomp_error("STOMP frame not implemented") + + def on_close(self): + """ + Invoked when the WebSocket is closed. + """ + + log.info("Websocket client {} disconnected".format(self.session_id)) + self.clients.remove(self) diff --git a/gns3server/plugins/base.py b/gns3server/version.py similarity index 62% rename from gns3server/plugins/base.py rename to gns3server/version.py index 6160fd1a..cd882e72 100644 --- a/gns3server/plugins/base.py +++ b/gns3server/version.py @@ -15,16 +15,13 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +# __version__ is a human-readable version number. -class IPlugin(object): - """Plugin interface - """ +# __version_info__ is a four-tuple for programmatic comparison. The first +# three numbers are the components of the version number. The fourth +# is zero for an official release, positive for a development branch, +# or negative for a release candidate or beta (after the base version +# number has been incremented) - def __init__(self): - pass - - def setup(self): - """Called before the plugin is asked to do anything - """ - - raise NotImplementedError() +__version__ = "0.1.dev" +__version_info__ = (0, 1, 0, -99) diff --git a/requirements.txt b/requirements.txt index ed626c89..9c49a8d2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ tornado -jsonschema -networkx +pyzmq diff --git a/setup.py b/setup.py index edeef1c1..e375cef7 100644 --- a/setup.py +++ b/setup.py @@ -42,10 +42,11 @@ setup( cmdclass={"test": Tox}, author="Jeremy Grossmann", author_email="package-maintainer@gns3.net", - description="GNS3 server with HTTP REST API to manage emulators", + description="GNS3 server to asynchronously to manage emulators", long_description=open("README.rst", "r").read(), install_requires=[ "tornado >= 2.0", + "pyzmq", ], entry_points={ "console_scripts": [ diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..f9ae1cf7 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,16 @@ +import sys +import os +import pytest +import subprocess +import time + + +@pytest.fixture(scope="session", autouse=True) +def server(request): + + cwd = os.path.dirname(os.path.abspath(__file__)) + server_script = os.path.join(cwd, "../gns3server/main.py") + process = subprocess.Popen([sys.executable, server_script, "--port=8000"]) + time.sleep(0.1) # give some time for the process to start + request.addfinalizer(process.kill) + return process diff --git a/tests/test_stomp.py b/tests/test_stomp.py new file mode 100644 index 00000000..ac793107 --- /dev/null +++ b/tests/test_stomp.py @@ -0,0 +1,95 @@ +import uuid +from tornado.testing import AsyncTestCase +from tornado.escape import json_encode, json_decode +from ws4py.client.tornadoclient import TornadoWebSocketClient +from gns3server.stomp import frame as stomp_frame +from gns3server.stomp import protocol as stomp_protocol + + +class Stomp(AsyncTestCase): + + URL = "ws://127.0.0.1:8000/" + + def setUp(self): + + self.stomp = stomp_protocol.clientProtocol() + AsyncTestCase.setUp(self) + + def test_connect(self): + + request = self.stomp.connect("localhost") + AsyncWSRequest(self.URL, self.io_loop, self.stop, request) + response = self.wait() + assert response + frame = stomp_frame.Frame.parse_frame(response.decode("utf-8")) + assert frame.cmd == stomp_protocol.CMD_CONNECTED + + def test_protocol_negotiation_failure(self): + + request = self.stomp.connect("localhost", accept_version='1.0') + AsyncWSRequest(self.URL, self.io_loop, self.stop, request) + response = self.wait() + assert response + frame = stomp_frame.Frame.parse_frame(response.decode("utf-8")) + assert frame.cmd == stomp_protocol.CMD_ERROR + + def test_malformed_frame(self): + + request = b"" + AsyncWSRequest(self.URL, self.io_loop, self.stop, request) + response = self.wait() + assert response + frame = stomp_frame.Frame.parse_frame(response.decode("utf-8")) + assert frame.cmd == stomp_protocol.CMD_ERROR + + def test_send(self): + + destination = "dynamips/echo" + message = {"ping": "test"} + request = self.stomp.send(destination, json_encode(message), "application/json") + AsyncWSRequest(self.URL, self.io_loop, self.stop, request) + response = self.wait() + assert response + frame = stomp_frame.Frame.parse_frame(response.decode("utf-8")) + assert frame.cmd == stomp_protocol.CMD_MESSAGE + assert frame.headers[stomp_protocol.HDR_DESTINATION] == destination + json_reply = json_decode(frame.body) + assert message == json_reply + + def test_unimplemented_frame(self): + + frame = stomp_frame.Frame(stomp_protocol.CMD_BEGIN) + request = frame.encode() + AsyncWSRequest(self.URL, self.io_loop, self.stop, request) + response = self.wait() + assert response + frame = stomp_frame.Frame.parse_frame(response.decode("utf-8")) + assert frame.cmd == stomp_protocol.CMD_ERROR + + def test_disconnect(self): + + myid = str(uuid.uuid4()) + request = self.stomp.disconnect(myid) + AsyncWSRequest(self.URL, self.io_loop, self.stop, request) + response = self.wait() + assert response + frame = stomp_frame.Frame.parse_frame(response.decode("utf-8")) + assert frame.cmd == stomp_protocol.CMD_RECEIPT + assert frame.headers[stomp_protocol.HDR_RECEIPT_ID] == myid + + +class AsyncWSRequest(TornadoWebSocketClient): + + def __init__(self, url, io_loop, callback, message): + TornadoWebSocketClient.__init__(self, url, io_loop=io_loop) + self._callback = callback + self._message = message + self.connect() + + def opened(self): + self.send(self._message, binary=False) + + def received_message(self, message): + self.close() + if self._callback: + self._callback(message.data) diff --git a/tox.ini b/tox.ini index cc5a7779..8bfea5a7 100644 --- a/tox.ini +++ b/tox.ini @@ -1,8 +1,7 @@ [tox] -envlist = py27, pypy, py33 +envlist = py27, py33 [testenv] commands = py.test [] -s tests -deps = - pytest - tornado +deps = -rdev-requirements.txt +