Use asyncio.ensure_future() instead of asyncio.async() with conservative approach to support Python < 3.4.4. Fixes https://github.com/GNS3/gns3-gui/issues/2566

pull/1399/head
grossmj 6 years ago
parent a3d1e865a8
commit 3560cda06c

@ -30,6 +30,7 @@ log = logging.getLogger(__name__)
from uuid import UUID, uuid4
from gns3server.utils.interfaces import is_interface_up
from gns3server.utils.asyncio import asyncio_ensure_future
from ..config import Config
from ..utils.asyncio import wait_run_in_executor
from ..utils import force_unix_path
@ -127,7 +128,7 @@ class BaseManager:
tasks = []
for node_id in self._nodes.keys():
tasks.append(asyncio.async(self.close_node(node_id)))
tasks.append(asyncio_ensure_future(self.close_node(node_id)))
if tasks:
done, _ = yield from asyncio.wait(tasks)

@ -29,6 +29,7 @@ import os
from gns3server.utils.asyncio.telnet_server import AsyncioTelnetServer
from gns3server.utils.asyncio.raw_command_server import AsyncioRawCommandServer
from gns3server.utils.asyncio import wait_for_file_creation
from gns3server.utils.asyncio import asyncio_ensure_future
from gns3server.utils.get_resource import get_resource
from gns3server.ubridge.ubridge_error import UbridgeError, UbridgeNamespaceError
@ -567,7 +568,7 @@ class DockerVM(BaseNode):
output_stream.feed_data(self.name.encode() + b" console is now available... Press RETURN to get started.\r\n")
asyncio.async(self._read_console_output(self._console_websocket, output_stream))
asyncio_ensure_future(self._read_console_output(self._console_websocket, output_stream))
@asyncio.coroutine
def _read_console_output(self, ws, out):

@ -36,6 +36,7 @@ log = logging.getLogger(__name__)
from gns3server.utils.interfaces import interfaces, is_interface_up
from gns3server.utils.asyncio import wait_run_in_executor
from gns3server.utils import parse_version
from gns3server.utils.asyncio import asyncio_ensure_future
from uuid import uuid4
from ..base_manager import BaseManager
from ..port_manager import PortManager
@ -172,7 +173,7 @@ class Dynamips(BaseManager):
tasks = []
for device in self._devices.values():
tasks.append(asyncio.async(device.hypervisor.stop()))
tasks.append(asyncio_ensure_future(device.hypervisor.stop()))
if tasks:
done, _ = yield from asyncio.wait(tasks)
@ -196,7 +197,7 @@ class Dynamips(BaseManager):
tasks = []
for device in self._devices.values():
if device.project.id == project.id:
tasks.append(asyncio.async(device.delete()))
tasks.append(asyncio_ensure_future(device.delete()))
if tasks:
done, _ = yield from asyncio.wait(tasks)

@ -40,7 +40,7 @@ from ..nios.nio_udp import NIOUDP
from gns3server.utils.file_watcher import FileWatcher
from gns3server.utils.asyncio import wait_run_in_executor, monitor_process
from gns3server.utils.asyncio import wait_run_in_executor, monitor_process, asyncio_ensure_future
from gns3server.utils.images import md5sum
@ -196,7 +196,7 @@ class Router(BaseNode):
"""
Called when the NVRAM file has changed
"""
asyncio.async(self.save_configs())
asyncio_ensure_future(self.save_configs())
@property
def dynamips_id(self):

@ -29,7 +29,7 @@ from uuid import UUID, uuid4
from .port_manager import PortManager
from .notification_manager import NotificationManager
from ..config import Config
from ..utils.asyncio import wait_run_in_executor
from ..utils.asyncio import wait_run_in_executor, asyncio_ensure_future
from ..utils.path import check_path_allowed, get_default_project_directory
import logging
@ -346,7 +346,7 @@ class Project:
tasks = []
for node in self._nodes:
tasks.append(asyncio.async(node.manager.close_node(node.id)))
tasks.append(asyncio_ensure_future(node.manager.close_node(node.id)))
if tasks:
done, _ = yield from asyncio.wait(tasks)

@ -32,6 +32,7 @@ import shlex
from collections import OrderedDict
from gns3server.utils.interfaces import interfaces
from gns3server.utils.asyncio import subprocess_check_output
from gns3server.utils.asyncio import asyncio_ensure_future
from gns3server.utils import parse_version
log = logging.getLogger(__name__)
@ -738,4 +739,4 @@ if __name__ == '__main__':
loop = asyncio.get_event_loop()
vmware = VMware.instance()
print("=> Check version")
loop.run_until_complete(asyncio.async(vmware.check_vmware_version()))
loop.run_until_complete(asyncio_ensure_future(vmware.check_vmware_version()))

@ -27,7 +27,7 @@ from operator import itemgetter
from ..utils import parse_version
from ..utils.images import list_images
from ..utils.asyncio import locked_coroutine
from ..utils.asyncio import locked_coroutine, asyncio_ensure_future
from ..controller.controller_error import ControllerError
from ..version import __version__, __version_info__
@ -418,7 +418,7 @@ class Compute:
if self._connection_failure == 5:
log.warning("Cannot connect to compute '{}': {}".format(self._id, e))
yield from self._controller.close_compute_projects(self)
asyncio.get_event_loop().call_later(2, lambda: asyncio.async(self._try_reconnect()))
asyncio.get_event_loop().call_later(2, lambda: asyncio_ensure_future(self._try_reconnect()))
return
except aiohttp.web.HTTPNotFound:
raise aiohttp.web.HTTPConflict(text="The server {} is not a GNS3 server or it's a 1.X server".format(self._id))
@ -494,7 +494,7 @@ class Compute:
# Try to reconnect after 1 seconds if server unavailable only if not during tests (otherwise we create a ressources usage bomb)
if not hasattr(sys, "_called_from_test") or not sys._called_from_test:
asyncio.get_event_loop().call_later(1, lambda: asyncio.async(self.connect()))
asyncio.get_event_loop().call_later(1, lambda: asyncio_ensure_future(self.connect()))
self._ws = None
self._cpu_usage_percent = None
self._memory_usage_percent = None

@ -21,7 +21,7 @@ import asyncio
import aiohttp
import ipaddress
from ...utils.asyncio import locked_coroutine
from ...utils.asyncio import locked_coroutine, asyncio_ensure_future
from .vmware_gns3_vm import VMwareGNS3VM
from .virtualbox_gns3_vm import VirtualBoxGNS3VM
from .remote_gns3_vm import RemoteGNS3VM
@ -302,7 +302,7 @@ class GNS3VM:
# check if the VM is in the same subnet as the local server, start 10 seconds later to give
# some time for the compute in the VM to be ready for requests
asyncio.get_event_loop().call_later(10, lambda: asyncio.async(self._check_network(compute)))
asyncio.get_event_loop().call_later(10, lambda: asyncio_ensure_future(self._check_network(compute)))
@asyncio.coroutine
def _check_network(self, compute):

@ -22,6 +22,8 @@ import html
import asyncio
import aiohttp
from gns3server.utils.asyncio import asyncio_ensure_future
import logging
log = logging.getLogger(__name__)
@ -296,7 +298,7 @@ class Link:
self._capturing = True
self._capture_file_name = capture_file_name
self._streaming_pcap = asyncio.async(self._start_streaming_pcap())
self._streaming_pcap = asyncio_ensure_future(self._start_streaming_pcap())
self._project.controller.notification.emit("link.updated", self.__json__())
@asyncio.coroutine

@ -38,6 +38,7 @@ from ..utils.path import check_path_allowed, get_default_project_directory
from ..utils.asyncio.pool import Pool
from ..utils.asyncio import locked_coroutine
from ..utils.asyncio import wait_run_in_executor
from ..utils.asyncio import asyncio_ensure_future
from .export_project import export_project
from .import_project import import_project
@ -887,7 +888,7 @@ class Project:
# Start all in the background without waiting for completion
# we ignore errors because we want to let the user open
# their project and fix it
asyncio.async(self.start_all())
asyncio_ensure_future(self.start_all())
@asyncio.coroutine
def wait_loaded(self):

@ -20,6 +20,7 @@ import aiohttp
from aiohttp.web import WebSocketResponse
from gns3server.web.route import Route
from gns3server.compute.notification_manager import NotificationManager
from gns3server.utils.asyncio import asyncio_ensure_future
@asyncio.coroutine
@ -43,7 +44,7 @@ class NotificationHandler:
ws = WebSocketResponse()
yield from ws.prepare(request)
asyncio.async(process_websocket(ws))
asyncio_ensure_future(process_websocket(ws))
with notifications.queue() as queue:
while True:

@ -25,6 +25,7 @@ from gns3server.controller import Controller
from gns3server.controller.import_project import import_project
from gns3server.controller.export_project import export_project
from gns3server.config import Config
from gns3server.utils.asyncio import asyncio_ensure_future
from gns3server.schemas.project import (
@ -246,7 +247,7 @@ class ProjectHandler:
ws = aiohttp.web.WebSocketResponse()
yield from ws.prepare(request)
asyncio.async(process_websocket(ws))
asyncio_ensure_future(process_websocket(ws))
with controller.notification.queue(project) as queue:
while True:

@ -20,6 +20,7 @@ from gns3server.config import Config
from gns3server.controller import Controller
from gns3server.schemas.version import VERSION_SCHEMA
from gns3server.version import __version__
from gns3server.utils.asyncio import asyncio_ensure_future
from aiohttp.web import HTTPConflict, HTTPForbidden
@ -57,7 +58,7 @@ class ServerHandler:
tasks = []
for project in projects:
tasks.append(asyncio.async(project.close()))
tasks.append(asyncio_ensure_future(project.close()))
if tasks:
done, _ = yield from asyncio.wait(tasks)
@ -71,7 +72,7 @@ class ServerHandler:
# then shutdown the server itself
from gns3server.web.web_server import WebServer
server = WebServer.instance()
asyncio.async(server.shutdown_server())
asyncio_ensure_future(server.shutdown_server())
response.set_status(201)
@Route.get(

@ -108,7 +108,7 @@ def _check_process(process, termination_callback):
def monitor_process(process, termination_callback):
"""Call termination_callback when a process dies"""
asyncio.async(_check_process(process, termination_callback))
asyncio_ensure_future(_check_process(process, termination_callback))
@asyncio.coroutine
@ -158,3 +158,10 @@ def locked_coroutine(f):
return (yield from f(*args, **kwargs))
return new_function
#FIXME: conservative approach to supported versions, please remove it when we drop the support to Python < 3.4.4
try:
from asyncio import ensure_future
asyncio_ensure_future = asyncio.ensure_future
except ImportError:
asyncio_ensure_future = getattr(asyncio, 'async')

@ -20,6 +20,8 @@ import copy
import asyncio
import asyncio.subprocess
from gns3server.utils.asyncio import asyncio_ensure_future
import logging
log = logging.getLogger(__name__)
@ -69,8 +71,8 @@ class AsyncioRawCommandServer:
else:
replaces.append((replace[0], replace[1], ))
network_read = asyncio.async(network_reader.read(READ_SIZE))
reader_read = asyncio.async(process_reader.read(READ_SIZE))
network_read = asyncio_ensure_future(network_reader.read(READ_SIZE))
reader_read = asyncio_ensure_future(process_reader.read(READ_SIZE))
timeout = 30
while True:
@ -89,7 +91,7 @@ class AsyncioRawCommandServer:
if network_reader.at_eof():
raise ConnectionResetError()
network_read = asyncio.async(network_reader.read(READ_SIZE))
network_read = asyncio_ensure_future(network_reader.read(READ_SIZE))
process_writer.write(data)
yield from process_writer.drain()
@ -97,7 +99,7 @@ class AsyncioRawCommandServer:
if process_reader.at_eof():
raise ConnectionResetError()
reader_read = asyncio.async(process_reader.read(READ_SIZE))
reader_read = asyncio_ensure_future(process_reader.read(READ_SIZE))
for replace in replaces:
data = data.replace(replace[0], replace[1])

@ -20,6 +20,8 @@ import asyncio
import asyncio.subprocess
import struct
from gns3server.utils.asyncio import asyncio_ensure_future
import logging
log = logging.getLogger(__name__)
@ -212,9 +214,12 @@ class AsyncioTelnetServer:
@asyncio.coroutine
def close(self):
for writer, connection in self._connections.items():
writer.write_eof()
yield from writer.drain()
try:
writer.write_eof()
yield from writer.drain()
except ConnectionResetError:
continue
@asyncio.coroutine
def client_connected_hook(self):
pass
@ -229,13 +234,13 @@ class AsyncioTelnetServer:
self._reader_process = network_reader
if self._reader:
if self._reader_process == network_reader:
self._current_read = asyncio.async(self._reader.read(READ_SIZE))
self._current_read = asyncio_ensure_future(self._reader.read(READ_SIZE))
return self._current_read
return None
@asyncio.coroutine
def _process(self, network_reader, network_writer, connection):
network_read = asyncio.async(network_reader.read(READ_SIZE))
network_read = asyncio_ensure_future(network_reader.read(READ_SIZE))
reader_read = yield from self._get_reader(network_reader)
while True:
@ -261,7 +266,7 @@ class AsyncioTelnetServer:
if network_reader.at_eof():
raise ConnectionResetError()
network_read = asyncio.async(network_reader.read(READ_SIZE))
network_read = asyncio_ensure_future(network_reader.read(READ_SIZE))
if IAC in data:
data = yield from self._IAC_parser(data, network_reader, network_writer, connection)
@ -418,10 +423,10 @@ if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
process = loop.run_until_complete(asyncio.async(asyncio.subprocess.create_subprocess_exec("/bin/sh", "-i",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
stdin=asyncio.subprocess.PIPE)))
process = loop.run_until_complete(asyncio_ensure_future(asyncio.subprocess.create_subprocess_exec("/bin/sh", "-i",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
stdin=asyncio.subprocess.PIPE)))
server = AsyncioTelnetServer(reader=process.stdout, writer=process.stdin, binary=False, echo=False)
coro = asyncio.start_server(server.run, '127.0.0.1', 4444, loop=loop)

@ -35,6 +35,7 @@ from ..compute import MODULES
from ..compute.port_manager import PortManager
from ..compute.qemu import Qemu
from ..controller import Controller
from ..utils.asyncio import asyncio_ensure_future
# do not delete this import
import gns3server.handlers
@ -136,7 +137,7 @@ class WebServer:
def signal_handler(signame, *args):
log.warning("Server has got signal {}, exiting...".format(signame))
asyncio.async(self.shutdown_server())
asyncio_ensure_future(self.shutdown_server())
signals = ["SIGTERM", "SIGINT"]
if sys.platform.startswith("win"):
@ -203,7 +204,7 @@ class WebServer:
# Because with a large image collection
# without md5sum already computed we start the
# computing with server start
asyncio.async(Qemu.instance().list_images())
asyncio_ensure_future(Qemu.instance().list_images())
def run(self):
"""
@ -283,7 +284,7 @@ class WebServer:
self._exit_handling()
if server_config.getboolean("shell"):
asyncio.async(self.start_shell())
asyncio_ensure_future(self.start_shell())
try:
self._loop.run_forever()

Loading…
Cancel
Save