# -*- coding: utf-8 -*- # # Copyright (C) 2015 GNS3 Technologies Inc. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ Interface for Dynamips hypervisor management module ("hypervisor") http://github.com/GNS3/dynamips/blob/master/README.hypervisor#L46 """ import re import time import logging import asyncio from .dynamips_error import DynamipsError log = logging.getLogger(__name__) class DynamipsHypervisor: """ Creates a new connection to a Dynamips server (also called hypervisor) :param working_dir: working directory :param host: the hostname or ip address string of the Dynamips server :param port: the tcp port integer (defaults to 7200) :param timeout: timeout integer for how long to wait for a response to commands sent to the hypervisor (defaults to 30 seconds) """ # Used to parse Dynamips response codes error_re = re.compile(r"""^2[0-9]{2}-""") success_re = re.compile(r"""^1[0-9]{2}\s{1}""") def __init__(self, working_dir, host, port=7200, timeout=30.0): self._host = host self._port = port self._devices = [] self._working_dir = working_dir self._version = "N/A" self._timeout = timeout self._reader = None self._writer = None self._io_lock = asyncio.Lock() async def connect(self, timeout=10): """ Connects to the hypervisor. """ # connect to a local address by default # if listening to all addresses (IPv4 or IPv6) if self._host == "0.0.0.0": host = "127.0.0.1" elif self._host == "::": host = "::1" else: host = self._host begin = time.time() connection_success = False last_exception = None while time.time() - begin < timeout: await asyncio.sleep(0.01) try: self._reader, self._writer = await asyncio.wait_for(asyncio.open_connection(host, self._port), timeout=1) except (asyncio.TimeoutError, OSError) as e: last_exception = e continue connection_success = True break if not connection_success: raise DynamipsError("Couldn't connect to hypervisor on {}:{} :{}".format(host, self._port, last_exception)) else: log.info("Connected to Dynamips hypervisor on {}:{} after {:.4f} seconds".format(host, self._port, time.time() - begin)) try: version = await self.send("hypervisor version") self._version = version[0].split("-", 1)[0] log.info("Dynamips version {} detected".format(self._version)) except IndexError: log.warning("Dynamips version could not be detected") self._version = "Unknown" # this forces to send the working dir to Dynamips await self.set_working_dir(self._working_dir) @property def version(self): """ Returns Dynamips version. :returns: version string """ return self._version async def close(self): """ Closes the connection to this hypervisor (but leave it running). """ await self.send("hypervisor close") self._writer.close() self._reader, self._writer = None async def stop(self): """ Stops this hypervisor (will no longer run). """ try: # try to properly stop the hypervisor await self.send("hypervisor stop") except DynamipsError: pass try: if self._writer is not None: await self._writer.drain() self._writer.close() except OSError as e: log.debug("Stopping hypervisor {}:{} {}".format(self._host, self._port, e)) self._reader = self._writer = None async def reset(self): """ Resets this hypervisor (used to get an empty configuration). """ await self.send("hypervisor reset") async def set_working_dir(self, working_dir): """ Sets the working directory for this hypervisor. :param working_dir: path to the working directory """ # encase working_dir in quotes to protect spaces in the path await self.send('hypervisor working_dir "{}"'.format(working_dir)) self._working_dir = working_dir log.debug("Working directory set to {}".format(self._working_dir)) @property def working_dir(self): """ Returns current working directory :returns: path to the working directory """ return self._working_dir @property def devices(self): """ Returns the list of devices managed by this hypervisor instance. :returns: a list of device instances """ return self._devices @property def port(self): """ Returns the port used to start the hypervisor. :returns: port number (integer) """ return self._port @port.setter def port(self, port): """ Sets the port used to start the hypervisor. :param port: port number (integer) """ self._port = port @property def host(self): """ Returns the host (binding) used to start the hypervisor. :returns: host/address (string) """ return self._host @host.setter def host(self, host): """ Sets the host (binding) used to start the hypervisor. :param host: host/address (string) """ self._host = host async def send(self, command): """ Sends commands to this hypervisor. :param command: a Dynamips hypervisor command :returns: results as a list """ # Dynamips responses are of the form: # 1xx yyyyyy\r\n # 1xx yyyyyy\r\n # ... # 100-yyyy\r\n # or # 2xx-yyyy\r\n # # Where 1xx is a code from 100-199 for a success or 200-299 for an error # The result might be multiple lines and might be less than the buffer size # but still have more data. The only thing we know for sure is the last line # will begin with '100-' or a '2xx-' and end with '\r\n' async with self._io_lock: if self._writer is None or self._reader is None: raise DynamipsError("Not connected") try: command = command.strip() + '\n' log.debug("sending {}".format(command)) self._writer.write(command.encode()) await self._writer.drain() except OSError as e: raise DynamipsError("Could not send Dynamips command '{command}' to {host}:{port}: {error}, process running: {run}" .format(command=command.strip(), host=self._host, port=self._port, error=e, run=self.is_running())) # Now retrieve the result data = [] buf = '' retries = 0 max_retries = 10 while True: try: try: # line = await self._reader.readline() # this can lead to ValueError: Line is too long chunk = await self._reader.read(1024) # match to Dynamips' buffer size except asyncio.CancelledError: # task has been canceled but continue to read # any remaining data sent by the hypervisor continue except ConnectionResetError as e: # Sometimes WinError 64 (ERROR_NETNAME_DELETED) is returned here on Windows. # These happen if connection reset is received before IOCP could complete # a previous operation. Ignore and try again.... log.warning("Connection reset received while reading Dynamips response: {}".format(e)) continue if not chunk: if retries > max_retries: raise DynamipsError("No data returned from {host}:{port}, Dynamips process running: {run}" .format(host=self._host, port=self._port, run=self.is_running())) else: retries += 1 await asyncio.sleep(0.1) continue retries = 0 buf += chunk.decode("utf-8", errors="ignore") except OSError as e: raise DynamipsError("Could not read response for '{command}' from {host}:{port}: {error}, process running: {run}" .format(command=command.strip(), host=self._host, port=self._port, error=e, run=self.is_running())) # If the buffer doesn't end in '\n' then we can't be done try: if buf[-1] != '\n': continue except IndexError: raise DynamipsError("Could not communicate with {host}:{port}, Dynamips process running: {run}" .format(host=self._host, port=self._port, run=self.is_running())) data += buf.split('\r\n') if data[-1] == '': data.pop() buf = '' # Does it contain an error code? if self.error_re.search(data[-1]): raise DynamipsError("Dynamips error when running command '{}': {}".format(command, data[-1][4:])) # Or does the last line begin with '100-'? Then we are done! if data[-1][:4] == '100-': data[-1] = data[-1][4:] if data[-1] == 'OK': data.pop() break # Remove success responses codes for index in range(len(data)): if self.success_re.search(data[index]): data[index] = data[index][4:] log.debug("returned result {}".format(data)) return data