diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 6763ce51..4d1f5390 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -16,7 +16,7 @@ jobs: runs-on: ubuntu-20.04 # Downgrade Ubuntu to 20.04 to fix missing Python 3.6 strategy: matrix: - python-version: ["3.6", "3.7", "3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"] steps: - uses: actions/checkout@v3 diff --git a/gns3server/compute/base_node.py b/gns3server/compute/base_node.py index 639a5d41..d7652c47 100644 --- a/gns3server/compute/base_node.py +++ b/gns3server/compute/base_node.py @@ -32,6 +32,7 @@ from gns3server.utils.interfaces import interfaces from ..compute.port_manager import PortManager from ..utils.asyncio import wait_run_in_executor, locking from ..utils.asyncio.telnet_server import AsyncioTelnetServer +from ..utils.asyncio.sftelnetproxymuxer import SFTelnetProxyMuxer from ..ubridge.hypervisor import Hypervisor from ..ubridge.ubridge_error import UbridgeError from .nios.nio_udp import NIOUDP @@ -374,32 +375,9 @@ class BaseNode: if not self._wrap_console or self._console_type != "telnet": return - remaining_trial = 60 - while True: - try: - (self._wrap_console_reader, self._wrap_console_writer) = await asyncio.open_connection( - host="127.0.0.1", - port=self._internal_console_port - ) - break - except (OSError, ConnectionRefusedError) as e: - if remaining_trial <= 0: - raise e - await asyncio.sleep(0.1) - remaining_trial -= 1 - await AsyncioTelnetServer.write_client_intro(self._wrap_console_writer, echo=True) - server = AsyncioTelnetServer( - reader=self._wrap_console_reader, - writer=self._wrap_console_writer, - binary=True, - echo=True - ) - # warning: this will raise OSError exception if there is a problem... - self._wrapper_telnet_server = await asyncio.start_server( - server.run, - self._manager.port_manager.console_host, - self.console - ) + #) + server = SFTelnetProxyMuxer(binary=True, echo=True, remote_port=self._internal_console_port, listen_port=self.console) + self._wrapper_telnet_server = await server.start_proxy() async def stop_wrap_console(self): """ @@ -407,15 +385,8 @@ class BaseNode: """ if self._wrapper_telnet_server: - self._wrap_console_writer.close() - if sys.version_info >= (3, 7, 0): - try: - await self._wrap_console_writer.wait_closed() - except ConnectionResetError: - pass - self._wrapper_telnet_server.close() - await self._wrapper_telnet_server.wait_closed() - self._wrapper_telnet_server = None + await self._wrapper_telnet_server.shutdown() + #self._wrapper_telnet_server = None async def reset_wrap_console(self): """ diff --git a/gns3server/utils/asyncio/sftelnetproxymuxer.py b/gns3server/utils/asyncio/sftelnetproxymuxer.py new file mode 100644 index 00000000..7ee74640 --- /dev/null +++ b/gns3server/utils/asyncio/sftelnetproxymuxer.py @@ -0,0 +1,268 @@ +import socket +import asyncio +import telnetlib3 +import logging +import pdb +log = logging.getLogger(__name__) + +class SFTelnetProxyMuxer: + def __init__(self, remote_ip=None, remote_port=None, listen_ip=None, listen_port=None, reader=None, writer=None, heartbeattimer=None): + if remote_ip == None: + remote_ip = '127.0.0.1' + self.remote_ip = remote_ip + self.remote_port = remote_port + # make the remote_info look like the same format as client_info later from sock('peername') + self.remote_info = f"('{self.remote_ip}', {self.remote_port})" + if listen_ip == None: + listen_ip = '0.0.0.0' + self.listen_ip = listen_ip + self.listen_port = listen_port + # use reader for remote server info + if reader: + self.reader = reader + # use writer for remote server info + if writer: + self.writer = writer + self.clients = set() + self.server = None + self.remote_reader = None + self.remote_writer = None + self.lock = asyncio.Lock() # Lock for coordinating access to the remote server + # Telnet protocol constants + self.IAC = b"\xff" # Interpret as Command + # Telnet NOP command. Will be used as a heartbeat to clients. + self.NOP = b"\xf1" + # Telnet Are You There. + self.AYT = b"\xf6" + if not remote_port: + raise ValueError("remote_port is a required value") + if not listen_port: + raise ValueError("listen_port is a required value") + # how often do we check the remote telnet server is up and each telnet client connected to gns3 is up. + self.heartbeattimer = heartbeattimer + if not heartbeattimer: + self.heartbeattimer = 30 + self.isshutdown = False + + async def handle_client(self, reader, writer): + client_info = writer.get_extra_info('peername') + sock = writer.get_extra_info('socket') + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + log.debug(f"New client connected: {client_info}") + self.clients.add(writer) + + try: + await asyncio.sleep(1) + while True and not self.isshutdown: + try: + # Set a timeout for the read operation, without should() the socket closes after timeout. + data = await asyncio.shield(asyncio.wait_for(reader.read((4*1024*1024)), timeout=self.heartbeattimer)) + if not data: + log.debug(f"No data from socket read, start over read loop.") + continue + if reader.at_eof(): + log.debug(f"Client {client_info} closed tcp session with eof.") + writer.close() + self.clients.discard(writer) + break + + async with self.lock: + if self.remote_writer is not None: + log.debug(f"Sending data from from client {client_info} to server {self.remote_info}") + self.remote_writer.write(data) + await self.remote_writer.drain() + continue + + except asyncio.TimeoutError: + log.debug(f"No data read from {client_info}, send heartbeat to test client socket.") + try: + log.debug(f"Heatbeat: Are you there {client_info}?") + writer.send_iac(self.IAC + self.NOP) + await writer.drain() + continue + except asyncio.TimeoutError: + log.debug(f"Heatbeat: No reply from {client_info}, closing socket.") + writer.close() + self.clients.discard(writer) + break + except Exception as e: + log.debug(f"Heateat: Unknown error from {client_info}, closing socket. Exeption {e}") + writer.close() + self.clients.discard(writer) + break + finally: + log.debug(f"Heatbeat: {client_info} Yes I am.") + except Exception as e: + log.debyg(f"Error in handling data from client {client_info}:") + writer.close() + self.clients.discard(writer) + break + + except Exception as e: + log.debug(f"Error in managing client {client_info}: {e}") + + finally: + # Safely remove the writer from clients set and close the connection + writer.close() + self.clients.discard(writer) + log.debug(f"Client {client_info} disconnected. Remaining clients: {len(list(self.clients))}") + log.debug(f"Connection with client {client_info} closed.") + + + async def broadcast_to_clients(self, data): + if not self.clients: + log.debug(f"Warning: No clients connected, ignoring data.") + return + + for writer in set(self.clients): + client_info = writer.get_extra_info('peername') + try: + #log.debug(f"Clients connected: {writer}, sending data: {data}") + writer.write(data) + await asyncio.wait_for(writer.drain(), timeout=2.0) + except Exception as e: + log.debug(f"Lost connection to client {client_info}") + writer.close() + self.clients.discard(writer) + + async def handle_remote_server(self): + log.debug("Start handler for remote server") + while True and not self.isshutdown: + log.debug("main run loop") + try: + if self.remote_ip and self.remote_port: + log.debug(f"Looks like we're running a server {self.listen_ip}.") + self.remote_reader, self.remote_writer = await telnetlib3.open_connection( + host=self.remote_ip, port=self.remote_port + ) + sock = self.remote_writer.get_extra_info('socket') + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + elif self.remote_reader and self.remote_writer and not self.remote_ip and not self.remote_port: + log.debug(f"Looks like we're running with reader and writer.") + if self.remote_reader.at_eof() or self.remote_writer.at_eof(): + log.debug(f"reader/writer EOFed.") + break + else: + raise ValueError("Server state incorrect. self.remote_reader or self.remote_writer close (eof).") + + while True and not self.isshutdown: + + try: + #data = await self.remote_reader.read((4*1024*1024)) + data = await asyncio.shield(asyncio.wait_for(self.remote_reader.read((4*1024*1024)), timeout=self.heartbeattimer)) + if not data: + log.debug(f"No data from remote telnet server {self.remote_info}.") + continue + if self.remote_reader.at_eof(): + log.debug(f"Remote server {self.remote_info} closed tcp session with eof.") + break + except asyncio.TimeoutError: + log.debug(f"No data from server {self.remote_info}, send heartbeat to test socket.") + try: + log.debug(f"Heatbeat: Are you there {self.remote_info}?") + # NOP and AYT cause QEMU to spam everyone's console with junk. + # This causes everyone to close the session and eof tcp which makes me sad. + # Will need to research more... or did i call this wrong and just fix it? + #self.remote_writer.send_iac(self.IAC + self.NOP) + await self.remote_writer.drain() + continue + except Exception as e: + log.debug(f"Heateat: Unknown error from {self.remote_info}, closing socket. Exeption {e}") + self.remote_writer.close() + break + finally: + log.debug(f"Heatbeat: {self.remote_info} Yes I am.") + + except Exception as e: + log.debug("Failed to read socket data exception: {e}") + break + #if not self.clients: + # log.debug("No clients connected, but console data found. Skipping.") + # continue + #log.debug("Sending data to clients data: {data}") + await self.broadcast_to_clients(data) + except ConnectionRefusedError as e: + error_msg = f"Warning: Connection to remote server {self.remote_info} refused." + log.debug(error_msg) + await self.broadcast_to_clients(f"\r{error_msg}\n\r") + + except TimeoutError as e: + error_msg = f"Warning: Connection to remote server {self.remote_info} timedout." + log.debug(error_msg) + await self.broadcast_to_clients(f"\r{error_msg}\n\r") + + except Exception as e: + error_msg = f"Warning: Connection to remote server {self.remote_info} unknown error: {e}." + log.debug(error_msg) + await self.broadcast_to_clients(f"\r{error_msg}\n\r") + + log.debug("end run loop") + + async def start_proxy(self): + log.debug("Starting telnet proxy.") + asyncio.create_task(self.handle_remote_server()) + self.server = await telnetlib3.create_server( + host=self.listen_ip, port=self.listen_port, + shell=self.handle_client + ) + #async with self.server: + # log.debug("Startup of telnet proxy complete.") + # await self.server.wait_closed() + return self + + async def shutdown(self): + log.debug(f"Set shutdown") + self.isshutdown = True + if self.server: + try: + log.debug(f"Shuting down tcp listen port {self.remote_port}") + self.server.close() + await self.server.wait_closed() + except Exception as e: + log.debug(f"Failed to shutdown listen port: {self.remote_port} {e}") + + for client in self.clients: + try: + try: + client_info = client.get_extra_info('peername') + except: + client_info = "Unknown" + log.debug("Shuting down tcp session to {client_info}") + client.close() + await client.wait_closed() + except Exception as e: + log.debug(f"Closing client connect {client_info} failed {e}") + if self.remote_writer: + try: + self.remote_writer.close() + #await self.remote_writer.wait_closed() + except Exception as e: + log.debug(f"Failed to shutdown listen port: {self.remote_info} {e}") + + log.debug("No remaining work to do for shutdown.") + + + +if __name__ == "__main__": + + async def main(): + logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s' + ) + #pdb.set_trace() + ## Example usage + log.debug("Start proxy") + + try: + #await asyncio.sleep(0) + server = SFTelnetProxyMuxer(remote_ip="10.1.18.100", remote_port=5003, listen_ip="0.0.0.0", listen_port=5000) + _wrapper_telnet_server = await server.start_proxy() + #await proxy.start_proxy() + except OSError as e: + log.debug(f"Can't start proxy: {e}") + + # To shut down the proxy + # asyncio.run(proxy.shutdown()) + + asyncio.run(main()) diff --git a/requirements.txt b/requirements.txt index df39d266..e097043f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,4 @@ importlib-resources>=1.3; python_version < '3.9' truststore>=0.8.0; python_version >= '3.10' setuptools>=60.8.1; python_version >= '3.7' setuptools==59.6.0; python_version < '3.7' # v59.6.0 is the last version to support Python 3.6 +telnetlib3>=2.0.4; python_version >= '3.7' # 2.0.0 only support 3.7 and higher.