diff --git a/gns3server/compute/docker/__init__.py b/gns3server/compute/docker/__init__.py index 4d8af0f6..81ff8bb2 100644 --- a/gns3server/compute/docker/__init__.py +++ b/gns3server/compute/docker/__init__.py @@ -93,6 +93,8 @@ class Docker(BaseManager): if self._connected: if self._connector and not self._connector.closed: self._connector.close() + if self._session and not self._session.closed: + await self._session.close() async def query(self, method, path, data={}, params={}): """ @@ -106,6 +108,7 @@ class Docker(BaseManager): response = await self.http_query(method, path, data=data, params=params) body = await response.read() + response.close() if body and len(body): if response.headers['CONTENT-TYPE'] == 'application/json': body = json.loads(body.decode("utf-8")) @@ -140,14 +143,12 @@ class Docker(BaseManager): if self._session is None or self._session.closed: connector = self.connector() self._session = aiohttp.ClientSession(connector=connector) - response = await self._session.request( - method, - url, - params=params, - data=data, - headers={"content-type": "application/json", }, - timeout=timeout - ) + response = await self._session.request(method, + url, + params=params, + data=data, + headers={"content-type": "application/json", }, + timeout=timeout) except (aiohttp.ClientResponseError, aiohttp.ClientOSError) as e: raise DockerError("Docker has returned an error: {}".format(str(e))) except (asyncio.TimeoutError): @@ -177,9 +178,7 @@ class Docker(BaseManager): """ url = "http://docker/v" + self._api_version + "/" + path - connection = await self._session.ws_connect(url, - origin="http://docker", - autoping=True) + connection = await self._session.ws_connect(url, origin="http://docker", autoping=True) return connection @locking diff --git a/gns3server/controller/__init__.py b/gns3server/controller/__init__.py index fb7f2a60..cf303e90 100644 --- a/gns3server/controller/__init__.py +++ b/gns3server/controller/__init__.py @@ -67,50 +67,46 @@ class Controller: @locking async def download_appliance_templates(self): - session = aiohttp.ClientSession() try: headers = {} if self._appliance_templates_etag: log.info("Checking if appliance templates are up-to-date (ETag {})".format(self._appliance_templates_etag)) headers["If-None-Match"] = self._appliance_templates_etag - response = await session.get('https://api.github.com/repos/GNS3/gns3-registry/contents/appliances', headers=headers) - if response.status == 304: - log.info("Appliance templates are already up-to-date (ETag {})".format(self._appliance_templates_etag)) - return - elif response.status != 200: - raise aiohttp.web.HTTPConflict(text="Could not retrieve appliance templates on GitHub due to HTTP error code {}".format(response.status)) - etag = response.headers.get("ETag") - if etag: - self._appliance_templates_etag = etag - self.save() - json_data = await response.json() - response.close() - appliances_dir = get_resource('appliances') - for appliance in json_data: - if appliance["type"] == "file": - appliance_name = appliance["name"] - log.info("Download appliance template file from '{}'".format(appliance["download_url"])) - response = await session.get(appliance["download_url"]) - if response.status != 200: - log.warning("Could not download '{}' due to HTTP error code {}".format(appliance["download_url"], response.status)) - continue - try: - appliance_data = await response.read() - except asyncio.TimeoutError: - log.warning("Timeout while downloading '{}'".format(appliance["download_url"])) - continue - path = os.path.join(appliances_dir, appliance_name) - - try: - log.info("Saving {} file to {}".format(appliance_name, path)) - with open(path, 'wb') as f: - f.write(appliance_data) - except OSError as e: - raise aiohttp.web.HTTPConflict(text="Could not write appliance template file '{}': {}".format(path, e)) + async with aiohttp.ClientSession() as session: + async with session.get('https://api.github.com/repos/GNS3/gns3-registry/contents/appliances', headers=headers) as response: + if response.status == 304: + log.info("Appliance templates are already up-to-date (ETag {})".format(self._appliance_templates_etag)) + return + elif response.status != 200: + raise aiohttp.web.HTTPConflict(text="Could not retrieve appliance templates on GitHub due to HTTP error code {}".format(response.status)) + etag = response.headers.get("ETag") + if etag: + self._appliance_templates_etag = etag + self.save() + json_data = await response.json() + appliances_dir = get_resource('appliances') + for appliance in json_data: + if appliance["type"] == "file": + appliance_name = appliance["name"] + log.info("Download appliance template file from '{}'".format(appliance["download_url"])) + async with session.get(appliance["download_url"]) as response: + if response.status != 200: + log.warning("Could not download '{}' due to HTTP error code {}".format(appliance["download_url"], response.status)) + continue + try: + appliance_data = await response.read() + except asyncio.TimeoutError: + log.warning("Timeout while downloading '{}'".format(appliance["download_url"])) + continue + path = os.path.join(appliances_dir, appliance_name) + try: + log.info("Saving {} file to {}".format(appliance_name, path)) + with open(path, 'wb') as f: + f.write(appliance_data) + except OSError as e: + raise aiohttp.web.HTTPConflict(text="Could not write appliance template file '{}': {}".format(path, e)) except ValueError as e: raise aiohttp.web.HTTPConflict(text="Could not read appliance templates information from GitHub: {}".format(e)) - finally: - session.close() def load_appliance_templates(self): diff --git a/gns3server/controller/compute.py b/gns3server/controller/compute.py index 61532a68..8946de71 100644 --- a/gns3server/controller/compute.py +++ b/gns3server/controller/compute.py @@ -18,6 +18,7 @@ import ipaddress import aiohttp import asyncio +import async_timeout import socket import json import uuid @@ -52,22 +53,6 @@ class ComputeConflict(aiohttp.web.HTTPConflict): self.response = response -class Timeout(aiohttp.Timeout): - """ - Could be removed with aiohttp 0.22 that support None timeout - """ - - def __enter__(self): - if self._timeout: - return super().__enter__() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - if self._timeout: - return super().__exit__(exc_type, exc_val, exc_tb) - return self - - class Compute: """ A GNS3 compute. @@ -101,12 +86,8 @@ class Compute: "node_types": [] } self.name = name - # Websocket for notifications - self._ws = None - # Cache of interfaces on remote host self._interfaces_cache = None - self._connection_failure = 0 def _session(self): @@ -114,9 +95,10 @@ class Compute: self._http_session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=None, force_close=True)) return self._http_session - def __del__(self): - if self._http_session: - self._http_session.close() + #def __del__(self): + # pass + # if self._http_session: + # self._http_session.close() def _set_auth(self, user, password): """ @@ -162,19 +144,16 @@ class Compute: # It's important to set user and password at the same time if "user" in kwargs or "password" in kwargs: self._set_auth(kwargs.get("user", self._user), kwargs.get("password", self._password)) - if self._http_session: - self._http_session.close() + if self._http_session and not self._http_session.closed: + await self._http_session.close() self._connected = False self._controller.notification.controller_emit("compute.updated", self.__json__()) self._controller.save() async def close(self): self._connected = False - if self._http_session: - self._http_session.close() - if self._ws: - await self._ws.close() - self._ws = None + if self._http_session and not self._http_session.closed: + await self._http_session.close() self._closed = True @property @@ -474,35 +453,27 @@ class Compute: """ Connect to the notification stream """ - try: - self._ws = await self._session().ws_connect(self._getUrl("/notifications/ws"), auth=self._auth) - except (aiohttp.WSServerHandshakeError, aiohttp.ClientResponseError): - self._ws = None - while self._ws is not None: - try: - response = await self._ws.receive() - except aiohttp.WSServerHandshakeError: - self._ws = None - break - if response.tp == aiohttp.WSMsgType.closed or response.tp == aiohttp.WSMsgType.error or response.data is None: - self._connected = False - break - msg = json.loads(response.data) - action = msg.pop("action") - event = msg.pop("event") - if action == "ping": - self._cpu_usage_percent = event["cpu_usage_percent"] - self._memory_usage_percent = event["memory_usage_percent"] - self._controller.notification.controller_emit("compute.updated", self.__json__()) - else: - await self._controller.notification.dispatch(action, event, compute_id=self.id) - if self._ws: - await self._ws.close() + + async with self._session().ws_connect(self._getUrl("/notifications/ws"), auth=self._auth) as ws: + async for response in ws: + if response.type == aiohttp.WSMsgType.TEXT and response.data: + msg = json.loads(response.data) + action = msg.pop("action") + event = msg.pop("event") + if action == "ping": + self._cpu_usage_percent = event["cpu_usage_percent"] + self._memory_usage_percent = event["memory_usage_percent"] + self._controller.notification.controller_emit("compute.updated", self.__json__()) + else: + await self._controller.notification.dispatch(action, event, compute_id=self.id) + elif response.type == aiohttp.WSMsgType.CLOSED or response.type == aiohttp.WSMsgType.ERROR or response.data is None: + self._connected = False + break # Try to reconnect after 1 seconds if server unavailable only if not during tests (otherwise we create a ressources usage bomb) if not hasattr(sys, "_called_from_test") or not sys._called_from_test: asyncio.get_event_loop().call_later(1, lambda: asyncio.ensure_future(self.connect())) - self._ws = None + self._cpu_usage_percent = None self._memory_usage_percent = None self._controller.notification.controller_emit("compute.updated", self.__json__()) @@ -527,7 +498,7 @@ class Compute: return self._getUrl(path) async def _run_http_query(self, method, path, data=None, timeout=20, raw=False): - with Timeout(timeout): + with async_timeout.timeout(timeout): url = self._getUrl(path) headers = {} headers['content-type'] = 'application/json' diff --git a/gns3server/controller/gns3vm/virtualbox_gns3_vm.py b/gns3server/controller/gns3vm/virtualbox_gns3_vm.py index 8ef060e5..c1ee98e8 100644 --- a/gns3server/controller/gns3vm/virtualbox_gns3_vm.py +++ b/gns3server/controller/gns3vm/virtualbox_gns3_vm.py @@ -261,29 +261,22 @@ class VirtualBoxGNS3VM(BaseGNS3VM): """ remaining_try = 300 while remaining_try > 0: - json_data = None - session = aiohttp.ClientSession() - try: - resp = None - resp = await session.get('http://127.0.0.1:{}/v2/compute/network/interfaces'.format(api_port)) - except (OSError, aiohttp.ClientError, TimeoutError, asyncio.TimeoutError): - pass - - if resp: - if resp.status < 300: - try: - json_data = await resp.json() - except ValueError: - pass - resp.close() - - session.close() - - if json_data: - for interface in json_data: - if "name" in interface and interface["name"] == "eth{}".format(hostonly_interface_number - 1): - if "ip_address" in interface and len(interface["ip_address"]) > 0: - return interface["ip_address"] + async with aiohttp.ClientSession() as session: + try: + async with session.get('http://127.0.0.1:{}/v2/compute/network/interfaces'.format(api_port)) as resp: + if resp.status < 300: + try: + json_data = await resp.json() + if json_data: + for interface in json_data: + if "name" in interface and interface["name"] == "eth{}".format( + hostonly_interface_number - 1): + if "ip_address" in interface and len(interface["ip_address"]) > 0: + return interface["ip_address"] + except ValueError: + pass + except (OSError, aiohttp.ClientError, TimeoutError, asyncio.TimeoutError): + pass remaining_try -= 1 await asyncio.sleep(1) raise GNS3VMError("Could not get the GNS3 VM ip make sure the VM receive an IP from VirtualBox") diff --git a/gns3server/handlers/api/compute/notification_handler.py b/gns3server/handlers/api/compute/notification_handler.py index b8157921..7446a6ce 100644 --- a/gns3server/handlers/api/compute/notification_handler.py +++ b/gns3server/handlers/api/compute/notification_handler.py @@ -42,15 +42,17 @@ class NotificationHandler: ws = WebSocketResponse() await ws.prepare(request) + request.app['websockets'].add(ws) asyncio.ensure_future(process_websocket(ws)) - with notifications.queue() as queue: while True: try: notification = await queue.get_json(1) + if ws.closed: + break + await ws.send_str(notification) except asyncio.futures.CancelledError: break - if ws.closed: - break - ws.send_str(notification) + finally: + request.app['websockets'].discard(ws) return ws diff --git a/gns3server/handlers/api/controller/notification_handler.py b/gns3server/handlers/api/controller/notification_handler.py index 7e7fd320..b81b2f23 100644 --- a/gns3server/handlers/api/controller/notification_handler.py +++ b/gns3server/handlers/api/controller/notification_handler.py @@ -69,14 +69,17 @@ class NotificationHandler: ws = aiohttp.web.WebSocketResponse() await ws.prepare(request) + request.app['websockets'].add(ws) asyncio.ensure_future(process_websocket(ws)) with controller.notification.controller_queue() as queue: while True: try: notification = await queue.get_json(5) + if ws.closed: + break + await ws.send_str(notification) except asyncio.futures.CancelledError: break - if ws.closed: - break - ws.send_str(notification) + finally: + request.app['websockets'].discard(ws) return ws diff --git a/gns3server/handlers/api/controller/project_handler.py b/gns3server/handlers/api/controller/project_handler.py index 2e68f288..394999db 100644 --- a/gns3server/handlers/api/controller/project_handler.py +++ b/gns3server/handlers/api/controller/project_handler.py @@ -261,17 +261,19 @@ class ProjectHandler: ws = aiohttp.web.WebSocketResponse() await ws.prepare(request) + request.app['websockets'].add(ws) asyncio.ensure_future(process_websocket(ws)) - with controller.notification.project_queue(project) as queue: while True: try: notification = await queue.get_json(5) - except asyncio.futures.CancelledError as e: - break - if ws.closed: + if ws.closed: + break + await ws.send_str(notification) + except asyncio.futures.CancelledError: break - ws.send_str(notification) + finally: + request.app['websockets'].discard(ws) if project.auto_close: # To avoid trouble with client connecting disconnecting we sleep few seconds before checking diff --git a/gns3server/web/response.py b/gns3server/web/response.py index 9a13ab87..8ef199b5 100644 --- a/gns3server/web/response.py +++ b/gns3server/web/response.py @@ -51,6 +51,7 @@ class Response(aiohttp.web.Response): super().enable_chunked_encoding() async def prepare(self, request): + if log.getEffectiveLevel() == logging.DEBUG: log.info("%s %s", request.method, request.path_qs) log.debug("%s", dict(request.headers)) diff --git a/gns3server/web/web_server.py b/gns3server/web/web_server.py index 76f39660..e115fa5a 100644 --- a/gns3server/web/web_server.py +++ b/gns3server/web/web_server.py @@ -28,6 +28,7 @@ import aiohttp_cors import functools import time import atexit +import weakref # Import encoding now, to avoid implicit import later. # Implicit import within threads may cause LookupError when standard library is in a ZIP @@ -48,8 +49,8 @@ import gns3server.handlers import logging log = logging.getLogger(__name__) -if not (aiohttp.__version__.startswith("2.2") or aiohttp.__version__.startswith("2.3")): - raise RuntimeError("aiohttp 2.2.x or 2.3.x is required to run the GNS3 server") +if not (aiohttp.__version__.startswith("3.")): + raise RuntimeError("aiohttp 3.x is required to run the GNS3 server") class WebServer: @@ -100,18 +101,17 @@ class WebServer: log.warning("Close is already in progress") return + # close websocket connections + for ws in set(self._app['websockets']): + await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Server shutdown') + if self._server: self._server.close() await self._server.wait_closed() if self._app: await self._app.shutdown() if self._handler: - try: - # aiohttp < 2.3 - await self._handler.finish_connections(2) # Parameter is timeout - except AttributeError: - # aiohttp >= 2.3 - await self._handler.shutdown(2) # Parameter is timeout + await self._handler.shutdown(2) # Parameter is timeout if self._app: await self._app.cleanup() @@ -254,6 +254,10 @@ class WebServer: log.debug("ENV %s=%s", key, val) self._app = aiohttp.web.Application() + + # Keep a list of active websocket connections + self._app['websockets'] = weakref.WeakSet() + # Background task started with the server self._app.on_startup.append(self._on_startup) diff --git a/requirements.txt b/requirements.txt index 25710c78..d93bc735 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,10 @@ jsonschema>=2.4.0 -aiohttp>=2.3.3,<2.4.0 # pyup: ignore -aiohttp-cors>=0.5.3,<0.6.0 # pyup: ignore -yarl>=0.11 +aiohttp==3.2.1 +aiohttp-cors==0.7.0 Jinja2>=2.7.3 raven>=5.23.0 psutil>=3.0.0 zipstream>=1.1.4 -typing>=3.5.3.0 # Otherwise yarl fails with python 3.4 prompt-toolkit==1.0.15 -async-timeout<3.0.0 # pyup: ignore; 3.0 drops support for python 3.4 +async-timeout==3.0.1 distro>=1.3.0 \ No newline at end of file diff --git a/setup.py b/setup.py index bdda2ff6..f6bb40db 100644 --- a/setup.py +++ b/setup.py @@ -19,9 +19,9 @@ import sys from setuptools import setup, find_packages from setuptools.command.test import test as TestCommand -# we only support Python 3 version >= 3.4 -if len(sys.argv) >= 2 and sys.argv[1] == "install" and sys.version_info < (3, 4): - raise SystemExit("Python 3.4 or higher is required") +# we only support Python 3 version >= 3.5.3 +if len(sys.argv) >= 2 and sys.argv[1] == "install" and sys.version_info < (3, 5, 3): + raise SystemExit("Python 3.5.3 or higher is required") class PyTest(TestCommand):