1
0
mirror of https://github.com/GNS3/gns3-server synced 2024-11-25 01:38:08 +00:00

Merge pull request #1274 from GNS3/asyncio-ensure-future

Conservative approach to supported versions of Python 3.4 and asyncio…
This commit is contained in:
ziajka 2018-01-29 12:21:41 +01:00 committed by GitHub
commit 7fe6508e73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 60 additions and 39 deletions

View File

@ -30,6 +30,7 @@ log = logging.getLogger(__name__)
from uuid import UUID, uuid4 from uuid import UUID, uuid4
from gns3server.utils.interfaces import is_interface_up from gns3server.utils.interfaces import is_interface_up
from gns3server.utils.asyncio import asyncio_ensure_future
from ..config import Config from ..config import Config
from ..utils.asyncio import wait_run_in_executor from ..utils.asyncio import wait_run_in_executor
from ..utils import force_unix_path from ..utils import force_unix_path
@ -127,7 +128,7 @@ class BaseManager:
tasks = [] tasks = []
for node_id in self._nodes.keys(): for node_id in self._nodes.keys():
tasks.append(asyncio.ensure_future(self.close_node(node_id))) tasks.append(asyncio_ensure_future(self.close_node(node_id)))
if tasks: if tasks:
done, _ = yield from asyncio.wait(tasks) done, _ = yield from asyncio.wait(tasks)

View File

@ -28,7 +28,7 @@ import os
from gns3server.utils.asyncio.telnet_server import AsyncioTelnetServer from gns3server.utils.asyncio.telnet_server import AsyncioTelnetServer
from gns3server.utils.asyncio.raw_command_server import AsyncioRawCommandServer from gns3server.utils.asyncio.raw_command_server import AsyncioRawCommandServer
from gns3server.utils.asyncio import wait_for_file_creation from gns3server.utils.asyncio import wait_for_file_creation, asyncio_ensure_future
from gns3server.utils.get_resource import get_resource from gns3server.utils.get_resource import get_resource
from gns3server.ubridge.ubridge_error import UbridgeError, UbridgeNamespaceError from gns3server.ubridge.ubridge_error import UbridgeError, UbridgeNamespaceError
@ -511,7 +511,7 @@ class DockerVM(BaseNode):
output_stream.feed_data(self.name.encode() + b" console is now available... Press RETURN to get started.\r\n") output_stream.feed_data(self.name.encode() + b" console is now available... Press RETURN to get started.\r\n")
asyncio.ensure_future(self._read_console_output(self._console_websocket, output_stream)) asyncio_ensure_future(self._read_console_output(self._console_websocket, output_stream))
@asyncio.coroutine @asyncio.coroutine
def _read_console_output(self, ws, out): def _read_console_output(self, ws, out):

View File

@ -34,7 +34,7 @@ import re
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
from gns3server.utils.interfaces import interfaces, is_interface_up from gns3server.utils.interfaces import interfaces, is_interface_up
from gns3server.utils.asyncio import wait_run_in_executor from gns3server.utils.asyncio import wait_run_in_executor, asyncio_ensure_future
from gns3server.utils import parse_version from gns3server.utils import parse_version
from uuid import uuid4 from uuid import uuid4
from ..base_manager import BaseManager from ..base_manager import BaseManager
@ -172,7 +172,7 @@ class Dynamips(BaseManager):
tasks = [] tasks = []
for device in self._devices.values(): for device in self._devices.values():
tasks.append(asyncio.ensure_future(device.hypervisor.stop())) tasks.append(asyncio_ensure_future(device.hypervisor.stop()))
if tasks: if tasks:
done, _ = yield from asyncio.wait(tasks) done, _ = yield from asyncio.wait(tasks)
@ -196,7 +196,7 @@ class Dynamips(BaseManager):
tasks = [] tasks = []
for device in self._devices.values(): for device in self._devices.values():
if device.project.id == project.id: if device.project.id == project.id:
tasks.append(asyncio.ensure_future(device.delete())) tasks.append(asyncio_ensure_future(device.delete()))
if tasks: if tasks:
done, _ = yield from asyncio.wait(tasks) done, _ = yield from asyncio.wait(tasks)

View File

@ -40,7 +40,7 @@ from ..nios.nio_udp import NIOUDP
from gns3server.utils.file_watcher import FileWatcher from gns3server.utils.file_watcher import FileWatcher
from gns3server.utils.asyncio import wait_run_in_executor, monitor_process from gns3server.utils.asyncio import wait_run_in_executor, monitor_process, asyncio_ensure_future
from gns3server.utils.images import md5sum from gns3server.utils.images import md5sum
@ -194,7 +194,7 @@ class Router(BaseNode):
""" """
Called when the NVRAM file has changed Called when the NVRAM file has changed
""" """
asyncio.ensure_future(self.save_configs()) asyncio_ensure_future(self.save_configs())
@property @property
def dynamips_id(self): def dynamips_id(self):

View File

@ -28,7 +28,7 @@ from uuid import UUID, uuid4
from .port_manager import PortManager from .port_manager import PortManager
from .notification_manager import NotificationManager from .notification_manager import NotificationManager
from ..config import Config from ..config import Config
from ..utils.asyncio import wait_run_in_executor from ..utils.asyncio import wait_run_in_executor, asyncio_ensure_future
from ..utils.path import check_path_allowed, get_default_project_directory from ..utils.path import check_path_allowed, get_default_project_directory
@ -325,7 +325,7 @@ class Project:
tasks = [] tasks = []
for node in self._nodes: for node in self._nodes:
tasks.append(asyncio.ensure_future(node.manager.close_node(node.id))) tasks.append(asyncio_ensure_future(node.manager.close_node(node.id)))
if tasks: if tasks:
done, _ = yield from asyncio.wait(tasks) done, _ = yield from asyncio.wait(tasks)

View File

@ -39,6 +39,7 @@ log = logging.getLogger(__name__)
from gns3server.compute.base_manager import BaseManager from gns3server.compute.base_manager import BaseManager
from gns3server.compute.vmware.vmware_vm import VMwareVM from gns3server.compute.vmware.vmware_vm import VMwareVM
from gns3server.compute.vmware.vmware_error import VMwareError from gns3server.compute.vmware.vmware_error import VMwareError
from gns3server.utils.asyncio import asyncio_ensure_future
class VMware(BaseManager): class VMware(BaseManager):
@ -738,4 +739,4 @@ if __name__ == '__main__':
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
vmware = VMware.instance() vmware = VMware.instance()
print("=> Check version") print("=> Check version")
loop.run_until_complete(asyncio.ensure_future(vmware.check_vmware_version())) loop.run_until_complete(asyncio_ensure_future(vmware.check_vmware_version()))

View File

@ -27,7 +27,7 @@ from operator import itemgetter
from ..utils import parse_version from ..utils import parse_version
from ..utils.images import list_images from ..utils.images import list_images
from ..utils.asyncio import locked_coroutine from ..utils.asyncio import locked_coroutine, asyncio_ensure_future
from ..controller.controller_error import ControllerError from ..controller.controller_error import ControllerError
from ..version import __version__ from ..version import __version__
@ -415,7 +415,7 @@ class Compute:
if self._connection_failure == 5: if self._connection_failure == 5:
log.warning("Cannot connect to compute '{}': {}".format(self._id, e)) log.warning("Cannot connect to compute '{}': {}".format(self._id, e))
yield from self._controller.close_compute_projects(self) yield from self._controller.close_compute_projects(self)
asyncio.get_event_loop().call_later(2, lambda: asyncio.ensure_future(self._try_reconnect())) asyncio.get_event_loop().call_later(2, lambda: asyncio_ensure_future(self._try_reconnect()))
return return
except aiohttp.web.HTTPNotFound: except aiohttp.web.HTTPNotFound:
raise aiohttp.web.HTTPConflict(text="The server {} is not a GNS3 server or it's a 1.X server".format(self._id)) raise aiohttp.web.HTTPConflict(text="The server {} is not a GNS3 server or it's a 1.X server".format(self._id))
@ -472,7 +472,7 @@ class Compute:
# Try to reconnect after 1 seconds if server unavailable only if not during tests (otherwise we create a ressources usage bomb) # Try to reconnect after 1 seconds if server unavailable only if not during tests (otherwise we create a ressources usage bomb)
if not hasattr(sys, "_called_from_test") or not sys._called_from_test: if not hasattr(sys, "_called_from_test") or not sys._called_from_test:
asyncio.get_event_loop().call_later(1, lambda: asyncio.ensure_future(self.connect())) asyncio.get_event_loop().call_later(1, lambda: asyncio_ensure_future(self.connect()))
self._ws = None self._ws = None
self._cpu_usage_percent = None self._cpu_usage_percent = None
self._memory_usage_percent = None self._memory_usage_percent = None

View File

@ -21,7 +21,7 @@ import asyncio
import aiohttp import aiohttp
import ipaddress import ipaddress
from ...utils.asyncio import locked_coroutine from ...utils.asyncio import locked_coroutine, asyncio_ensure_future
from .vmware_gns3_vm import VMwareGNS3VM from .vmware_gns3_vm import VMwareGNS3VM
from .virtualbox_gns3_vm import VirtualBoxGNS3VM from .virtualbox_gns3_vm import VirtualBoxGNS3VM
from .remote_gns3_vm import RemoteGNS3VM from .remote_gns3_vm import RemoteGNS3VM
@ -302,7 +302,7 @@ class GNS3VM:
# check if the VM is in the same subnet as the local server, start 10 seconds later to give # check if the VM is in the same subnet as the local server, start 10 seconds later to give
# some time for the compute in the VM to be ready for requests # some time for the compute in the VM to be ready for requests
asyncio.get_event_loop().call_later(10, lambda: asyncio.ensure_future(self._check_network(compute))) asyncio.get_event_loop().call_later(10, lambda: asyncio_ensure_future(self._check_network(compute)))
@asyncio.coroutine @asyncio.coroutine
def _check_network(self, compute): def _check_network(self, compute):

View File

@ -22,6 +22,8 @@ import html
import asyncio import asyncio
import aiohttp import aiohttp
from gns3server.utils.asyncio import asyncio_ensure_future
import logging import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -288,7 +290,7 @@ class Link:
self._capturing = True self._capturing = True
self._capture_file_name = capture_file_name self._capture_file_name = capture_file_name
self._streaming_pcap = asyncio.ensure_future(self._start_streaming_pcap()) self._streaming_pcap = asyncio_ensure_future(self._start_streaming_pcap())
self._project.controller.notification.emit("link.updated", self.__json__()) self._project.controller.notification.emit("link.updated", self.__json__())
@asyncio.coroutine @asyncio.coroutine

View File

@ -36,7 +36,7 @@ from .udp_link import UDPLink
from ..config import Config from ..config import Config
from ..utils.path import check_path_allowed, get_default_project_directory from ..utils.path import check_path_allowed, get_default_project_directory
from ..utils.asyncio.pool import Pool from ..utils.asyncio.pool import Pool
from ..utils.asyncio import locked_coroutine from ..utils.asyncio import locked_coroutine, asyncio_ensure_future
from .export_project import export_project from .export_project import export_project
from .import_project import import_project from .import_project import import_project
from ..compute.iou.utils.application_id import get_next_application_id from ..compute.iou.utils.application_id import get_next_application_id
@ -867,7 +867,7 @@ class Project:
# Start all in the background without waiting for completion # Start all in the background without waiting for completion
# we ignore errors because we want to let the user open # we ignore errors because we want to let the user open
# their project and fix it # their project and fix it
asyncio.ensure_future(self.start_all()) asyncio_ensure_future(self.start_all())
@asyncio.coroutine @asyncio.coroutine
def wait_loaded(self): def wait_loaded(self):

View File

@ -20,6 +20,7 @@ import aiohttp
from aiohttp.web import WebSocketResponse from aiohttp.web import WebSocketResponse
from gns3server.web.route import Route from gns3server.web.route import Route
from gns3server.compute.notification_manager import NotificationManager from gns3server.compute.notification_manager import NotificationManager
from gns3server.utils.asyncio import asyncio_ensure_future
@asyncio.coroutine @asyncio.coroutine
@ -43,7 +44,7 @@ class NotificationHandler:
ws = WebSocketResponse() ws = WebSocketResponse()
yield from ws.prepare(request) yield from ws.prepare(request)
asyncio.ensure_future(process_websocket(ws)) asyncio_ensure_future(process_websocket(ws))
with notifications.queue() as queue: with notifications.queue() as queue:
while True: while True:

View File

@ -25,7 +25,7 @@ from gns3server.controller import Controller
from gns3server.controller.import_project import import_project from gns3server.controller.import_project import import_project
from gns3server.controller.export_project import export_project from gns3server.controller.export_project import export_project
from gns3server.config import Config from gns3server.config import Config
from gns3server.utils.asyncio import asyncio_ensure_future
from gns3server.schemas.project import ( from gns3server.schemas.project import (
PROJECT_OBJECT_SCHEMA, PROJECT_OBJECT_SCHEMA,
@ -246,7 +246,7 @@ class ProjectHandler:
ws = aiohttp.web.WebSocketResponse() ws = aiohttp.web.WebSocketResponse()
yield from ws.prepare(request) yield from ws.prepare(request)
asyncio.ensure_future(process_websocket(ws)) asyncio_ensure_future(process_websocket(ws))
with controller.notification.queue(project) as queue: with controller.notification.queue(project) as queue:
while True: while True:

View File

@ -20,6 +20,7 @@ from gns3server.config import Config
from gns3server.controller import Controller from gns3server.controller import Controller
from gns3server.schemas.version import VERSION_SCHEMA from gns3server.schemas.version import VERSION_SCHEMA
from gns3server.version import __version__ from gns3server.version import __version__
from gns3server.utils.asyncio import asyncio_ensure_future
from aiohttp.web import HTTPConflict, HTTPForbidden from aiohttp.web import HTTPConflict, HTTPForbidden
@ -57,7 +58,7 @@ class ServerHandler:
tasks = [] tasks = []
for project in projects: for project in projects:
tasks.append(asyncio.ensure_future(project.close())) tasks.append(asyncio_ensure_future(project.close()))
if tasks: if tasks:
done, _ = yield from asyncio.wait(tasks) done, _ = yield from asyncio.wait(tasks)
@ -71,7 +72,7 @@ class ServerHandler:
# then shutdown the server itself # then shutdown the server itself
from gns3server.web.web_server import WebServer from gns3server.web.web_server import WebServer
server = WebServer.instance() server = WebServer.instance()
asyncio.ensure_future(server.shutdown_server()) asyncio_ensure_future(server.shutdown_server())
response.set_status(201) response.set_status(201)
@Route.get( @Route.get(

View File

@ -223,9 +223,9 @@ def run():
if server_config.getboolean("local"): if server_config.getboolean("local"):
log.warning("Local mode is enabled. Beware, clients will have full control on your filesystem") log.warning("Local mode is enabled. Beware, clients will have full control on your filesystem")
# we only support Python 3 version >= 3.4.4 # we only support Python 3 version >= 3.4
if sys.version_info < (3, 4, 4): if sys.version_info < (3, 4):
raise SystemExit("Python 3.4.4 or higher is required") raise SystemExit("Python 3.4 or higher is required")
user_log.info("Running with Python {major}.{minor}.{micro} and has PID {pid}".format( user_log.info("Running with Python {major}.{minor}.{micro} and has PID {pid}".format(
major=sys.version_info[0], minor=sys.version_info[1], major=sys.version_info[0], minor=sys.version_info[1],

View File

@ -108,7 +108,7 @@ def _check_process(process, termination_callback):
def monitor_process(process, termination_callback): def monitor_process(process, termination_callback):
"""Call termination_callback when a process dies""" """Call termination_callback when a process dies"""
asyncio.ensure_future(_check_process(process, termination_callback)) asyncio_ensure_future(_check_process(process, termination_callback))
@asyncio.coroutine @asyncio.coroutine
@ -158,3 +158,12 @@ def locked_coroutine(f):
return (yield from f(*args, **kwargs)) return (yield from f(*args, **kwargs))
return new_function return new_function
# It's conservative approach to supported versions, please remove it when we drop the support to Python < 3.4.4
if hasattr(asyncio, 'ensure_future'):
# python>=3.4.4
asyncio_ensure_future = asyncio.ensure_future
else:
# deprecated
asyncio_ensure_future = asyncio.async

View File

@ -20,6 +20,8 @@ import copy
import asyncio import asyncio
import asyncio.subprocess import asyncio.subprocess
from gns3server.utils.asyncio import asyncio_ensure_future
import logging import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -69,8 +71,8 @@ class AsyncioRawCommandServer:
else: else:
replaces.append((replace[0], replace[1], )) replaces.append((replace[0], replace[1], ))
network_read = asyncio.ensure_future(network_reader.read(READ_SIZE)) network_read = asyncio_ensure_future(network_reader.read(READ_SIZE))
reader_read = asyncio.ensure_future(process_reader.read(READ_SIZE)) reader_read = asyncio_ensure_future(process_reader.read(READ_SIZE))
timeout = 30 timeout = 30
while True: while True:
@ -89,7 +91,7 @@ class AsyncioRawCommandServer:
if network_reader.at_eof(): if network_reader.at_eof():
raise ConnectionResetError() raise ConnectionResetError()
network_read = asyncio.ensure_future(network_reader.read(READ_SIZE)) network_read = asyncio_ensure_future(network_reader.read(READ_SIZE))
process_writer.write(data) process_writer.write(data)
yield from process_writer.drain() yield from process_writer.drain()
@ -97,7 +99,7 @@ class AsyncioRawCommandServer:
if process_reader.at_eof(): if process_reader.at_eof():
raise ConnectionResetError() raise ConnectionResetError()
reader_read = asyncio.ensure_future(process_reader.read(READ_SIZE)) reader_read = asyncio_ensure_future(process_reader.read(READ_SIZE))
for replace in replaces: for replace in replaces:
data = data.replace(replace[0], replace[1]) data = data.replace(replace[0], replace[1])

View File

@ -20,6 +20,8 @@ import asyncio
import asyncio.subprocess import asyncio.subprocess
import struct import struct
from gns3server.utils.asyncio import asyncio_ensure_future
import logging import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -229,13 +231,13 @@ class AsyncioTelnetServer:
self._reader_process = network_reader self._reader_process = network_reader
if self._reader: if self._reader:
if self._reader_process == network_reader: if self._reader_process == network_reader:
self._current_read = asyncio.ensure_future(self._reader.read(READ_SIZE)) self._current_read = asyncio_ensure_future(self._reader.read(READ_SIZE))
return self._current_read return self._current_read
return None return None
@asyncio.coroutine @asyncio.coroutine
def _process(self, network_reader, network_writer, connection): def _process(self, network_reader, network_writer, connection):
network_read = asyncio.ensure_future(network_reader.read(READ_SIZE)) network_read = asyncio_ensure_future(network_reader.read(READ_SIZE))
reader_read = yield from self._get_reader(network_reader) reader_read = yield from self._get_reader(network_reader)
while True: while True:
@ -261,7 +263,7 @@ class AsyncioTelnetServer:
if network_reader.at_eof(): if network_reader.at_eof():
raise ConnectionResetError() raise ConnectionResetError()
network_read = asyncio.ensure_future(network_reader.read(READ_SIZE)) network_read = asyncio_ensure_future(network_reader.read(READ_SIZE))
if IAC in data: if IAC in data:
data = yield from self._IAC_parser(data, network_reader, network_writer, connection) data = yield from self._IAC_parser(data, network_reader, network_writer, connection)
@ -418,7 +420,7 @@ if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
process = loop.run_until_complete(asyncio.ensure_future(asyncio.subprocess.create_subprocess_exec("/bin/sh", "-i", process = loop.run_until_complete(asyncio_ensure_future(asyncio.subprocess.create_subprocess_exec("/bin/sh", "-i",
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT, stderr=asyncio.subprocess.STDOUT,
stdin=asyncio.subprocess.PIPE))) stdin=asyncio.subprocess.PIPE)))

View File

@ -36,6 +36,8 @@ from ..compute.port_manager import PortManager
from ..compute.qemu import Qemu from ..compute.qemu import Qemu
from ..controller import Controller from ..controller import Controller
from gns3server.utils.asyncio import asyncio_ensure_future
# do not delete this import # do not delete this import
import gns3server.handlers import gns3server.handlers
@ -136,7 +138,7 @@ class WebServer:
def signal_handler(signame, *args): def signal_handler(signame, *args):
log.warning("Server has got signal {}, exiting...".format(signame)) log.warning("Server has got signal {}, exiting...".format(signame))
asyncio.ensure_future(self.shutdown_server()) asyncio_ensure_future(self.shutdown_server())
signals = ["SIGTERM", "SIGINT"] signals = ["SIGTERM", "SIGINT"]
if sys.platform.startswith("win"): if sys.platform.startswith("win"):
@ -203,7 +205,7 @@ class WebServer:
# Because with a large image collection # Because with a large image collection
# without md5sum already computed we start the # without md5sum already computed we start the
# computing with server start # computing with server start
asyncio.ensure_future(Qemu.instance().list_images()) asyncio_ensure_future(Qemu.instance().list_images())
def run(self): def run(self):
""" """
@ -283,7 +285,7 @@ class WebServer:
self._exit_handling() self._exit_handling()
if server_config.getboolean("shell"): if server_config.getboolean("shell"):
asyncio.ensure_future(self.start_shell()) asyncio_ensure_future(self.start_shell())
try: try:
self._loop.run_forever() self._loop.run_forever()