pull/2346/merge
John Fleming 3 months ago committed by GitHub
commit 3fe23d0a3e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -16,7 +16,7 @@ jobs:
runs-on: ubuntu-20.04 # Downgrade Ubuntu to 20.04 to fix missing Python 3.6
strategy:
matrix:
python-version: ["3.6", "3.7", "3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v3

@ -32,6 +32,7 @@ from gns3server.utils.interfaces import interfaces
from ..compute.port_manager import PortManager
from ..utils.asyncio import wait_run_in_executor, locking
from ..utils.asyncio.telnet_server import AsyncioTelnetServer
from ..utils.asyncio.sftelnetproxymuxer import SFTelnetProxyMuxer
from ..ubridge.hypervisor import Hypervisor
from ..ubridge.ubridge_error import UbridgeError
from .nios.nio_udp import NIOUDP
@ -374,32 +375,9 @@ class BaseNode:
if not self._wrap_console or self._console_type != "telnet":
return
remaining_trial = 60
while True:
try:
(self._wrap_console_reader, self._wrap_console_writer) = await asyncio.open_connection(
host="127.0.0.1",
port=self._internal_console_port
)
break
except (OSError, ConnectionRefusedError) as e:
if remaining_trial <= 0:
raise e
await asyncio.sleep(0.1)
remaining_trial -= 1
await AsyncioTelnetServer.write_client_intro(self._wrap_console_writer, echo=True)
server = AsyncioTelnetServer(
reader=self._wrap_console_reader,
writer=self._wrap_console_writer,
binary=True,
echo=True
)
# warning: this will raise OSError exception if there is a problem...
self._wrapper_telnet_server = await asyncio.start_server(
server.run,
self._manager.port_manager.console_host,
self.console
)
#)
server = SFTelnetProxyMuxer(binary=True, echo=True, remote_port=self._internal_console_port, listen_port=self.console)
self._wrapper_telnet_server = await server.start_proxy()
async def stop_wrap_console(self):
"""
@ -407,15 +385,8 @@ class BaseNode:
"""
if self._wrapper_telnet_server:
self._wrap_console_writer.close()
if sys.version_info >= (3, 7, 0):
try:
await self._wrap_console_writer.wait_closed()
except ConnectionResetError:
pass
self._wrapper_telnet_server.close()
await self._wrapper_telnet_server.wait_closed()
self._wrapper_telnet_server = None
await self._wrapper_telnet_server.shutdown()
#self._wrapper_telnet_server = None
async def reset_wrap_console(self):
"""

@ -0,0 +1,268 @@
import socket
import asyncio
import telnetlib3
import logging
import pdb
log = logging.getLogger(__name__)
class SFTelnetProxyMuxer:
def __init__(self, remote_ip=None, remote_port=None, listen_ip=None, listen_port=None, reader=None, writer=None, heartbeattimer=None):
if remote_ip == None:
remote_ip = '127.0.0.1'
self.remote_ip = remote_ip
self.remote_port = remote_port
# make the remote_info look like the same format as client_info later from sock('peername')
self.remote_info = f"('{self.remote_ip}', {self.remote_port})"
if listen_ip == None:
listen_ip = '0.0.0.0'
self.listen_ip = listen_ip
self.listen_port = listen_port
# use reader for remote server info
if reader:
self.reader = reader
# use writer for remote server info
if writer:
self.writer = writer
self.clients = set()
self.server = None
self.remote_reader = None
self.remote_writer = None
self.lock = asyncio.Lock() # Lock for coordinating access to the remote server
# Telnet protocol constants
self.IAC = b"\xff" # Interpret as Command
# Telnet NOP command. Will be used as a heartbeat to clients.
self.NOP = b"\xf1"
# Telnet Are You There.
self.AYT = b"\xf6"
if not remote_port:
raise ValueError("remote_port is a required value")
if not listen_port:
raise ValueError("listen_port is a required value")
# how often do we check the remote telnet server is up and each telnet client connected to gns3 is up.
self.heartbeattimer = heartbeattimer
if not heartbeattimer:
self.heartbeattimer = 30
self.isshutdown = False
async def handle_client(self, reader, writer):
client_info = writer.get_extra_info('peername')
sock = writer.get_extra_info('socket')
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
log.debug(f"New client connected: {client_info}")
self.clients.add(writer)
try:
await asyncio.sleep(1)
while True and not self.isshutdown:
try:
# Set a timeout for the read operation, without should() the socket closes after timeout.
data = await asyncio.shield(asyncio.wait_for(reader.read((4*1024*1024)), timeout=self.heartbeattimer))
if not data:
log.debug(f"No data from socket read, start over read loop.")
continue
if reader.at_eof():
log.debug(f"Client {client_info} closed tcp session with eof.")
writer.close()
self.clients.discard(writer)
break
async with self.lock:
if self.remote_writer is not None:
log.debug(f"Sending data from from client {client_info} to server {self.remote_info}")
self.remote_writer.write(data)
await self.remote_writer.drain()
continue
except asyncio.TimeoutError:
log.debug(f"No data read from {client_info}, send heartbeat to test client socket.")
try:
log.debug(f"Heatbeat: Are you there {client_info}?")
writer.send_iac(self.IAC + self.NOP)
await writer.drain()
continue
except asyncio.TimeoutError:
log.debug(f"Heatbeat: No reply from {client_info}, closing socket.")
writer.close()
self.clients.discard(writer)
break
except Exception as e:
log.debug(f"Heateat: Unknown error from {client_info}, closing socket. Exeption {e}")
writer.close()
self.clients.discard(writer)
break
finally:
log.debug(f"Heatbeat: {client_info} Yes I am.")
except Exception as e:
log.debyg(f"Error in handling data from client {client_info}:")
writer.close()
self.clients.discard(writer)
break
except Exception as e:
log.debug(f"Error in managing client {client_info}: {e}")
finally:
# Safely remove the writer from clients set and close the connection
writer.close()
self.clients.discard(writer)
log.debug(f"Client {client_info} disconnected. Remaining clients: {len(list(self.clients))}")
log.debug(f"Connection with client {client_info} closed.")
async def broadcast_to_clients(self, data):
if not self.clients:
log.debug(f"Warning: No clients connected, ignoring data.")
return
for writer in set(self.clients):
client_info = writer.get_extra_info('peername')
try:
#log.debug(f"Clients connected: {writer}, sending data: {data}")
writer.write(data)
await asyncio.wait_for(writer.drain(), timeout=2.0)
except Exception as e:
log.debug(f"Lost connection to client {client_info}")
writer.close()
self.clients.discard(writer)
async def handle_remote_server(self):
log.debug("Start handler for remote server")
while True and not self.isshutdown:
log.debug("main run loop")
try:
if self.remote_ip and self.remote_port:
log.debug(f"Looks like we're running a server {self.listen_ip}.")
self.remote_reader, self.remote_writer = await telnetlib3.open_connection(
host=self.remote_ip, port=self.remote_port
)
sock = self.remote_writer.get_extra_info('socket')
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
elif self.remote_reader and self.remote_writer and not self.remote_ip and not self.remote_port:
log.debug(f"Looks like we're running with reader and writer.")
if self.remote_reader.at_eof() or self.remote_writer.at_eof():
log.debug(f"reader/writer EOFed.")
break
else:
raise ValueError("Server state incorrect. self.remote_reader or self.remote_writer close (eof).")
while True and not self.isshutdown:
try:
#data = await self.remote_reader.read((4*1024*1024))
data = await asyncio.shield(asyncio.wait_for(self.remote_reader.read((4*1024*1024)), timeout=self.heartbeattimer))
if not data:
log.debug(f"No data from remote telnet server {self.remote_info}.")
continue
if self.remote_reader.at_eof():
log.debug(f"Remote server {self.remote_info} closed tcp session with eof.")
break
except asyncio.TimeoutError:
log.debug(f"No data from server {self.remote_info}, send heartbeat to test socket.")
try:
log.debug(f"Heatbeat: Are you there {self.remote_info}?")
# NOP and AYT cause QEMU to spam everyone's console with junk.
# This causes everyone to close the session and eof tcp which makes me sad.
# Will need to research more... or did i call this wrong and just fix it?
#self.remote_writer.send_iac(self.IAC + self.NOP)
await self.remote_writer.drain()
continue
except Exception as e:
log.debug(f"Heateat: Unknown error from {self.remote_info}, closing socket. Exeption {e}")
self.remote_writer.close()
break
finally:
log.debug(f"Heatbeat: {self.remote_info} Yes I am.")
except Exception as e:
log.debug("Failed to read socket data exception: {e}")
break
#if not self.clients:
# log.debug("No clients connected, but console data found. Skipping.")
# continue
#log.debug("Sending data to clients data: {data}")
await self.broadcast_to_clients(data)
except ConnectionRefusedError as e:
error_msg = f"Warning: Connection to remote server {self.remote_info} refused."
log.debug(error_msg)
await self.broadcast_to_clients(f"\r{error_msg}\n\r")
except TimeoutError as e:
error_msg = f"Warning: Connection to remote server {self.remote_info} timedout."
log.debug(error_msg)
await self.broadcast_to_clients(f"\r{error_msg}\n\r")
except Exception as e:
error_msg = f"Warning: Connection to remote server {self.remote_info} unknown error: {e}."
log.debug(error_msg)
await self.broadcast_to_clients(f"\r{error_msg}\n\r")
log.debug("end run loop")
async def start_proxy(self):
log.debug("Starting telnet proxy.")
asyncio.create_task(self.handle_remote_server())
self.server = await telnetlib3.create_server(
host=self.listen_ip, port=self.listen_port,
shell=self.handle_client
)
#async with self.server:
# log.debug("Startup of telnet proxy complete.")
# await self.server.wait_closed()
return self
async def shutdown(self):
log.debug(f"Set shutdown")
self.isshutdown = True
if self.server:
try:
log.debug(f"Shuting down tcp listen port {self.remote_port}")
self.server.close()
await self.server.wait_closed()
except Exception as e:
log.debug(f"Failed to shutdown listen port: {self.remote_port} {e}")
for client in self.clients:
try:
try:
client_info = client.get_extra_info('peername')
except:
client_info = "Unknown"
log.debug("Shuting down tcp session to {client_info}")
client.close()
await client.wait_closed()
except Exception as e:
log.debug(f"Closing client connect {client_info} failed {e}")
if self.remote_writer:
try:
self.remote_writer.close()
#await self.remote_writer.wait_closed()
except Exception as e:
log.debug(f"Failed to shutdown listen port: {self.remote_info} {e}")
log.debug("No remaining work to do for shutdown.")
if __name__ == "__main__":
async def main():
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s'
)
#pdb.set_trace()
## Example usage
log.debug("Start proxy")
try:
#await asyncio.sleep(0)
server = SFTelnetProxyMuxer(remote_ip="10.1.18.100", remote_port=5003, listen_ip="0.0.0.0", listen_port=5000)
_wrapper_telnet_server = await server.start_proxy()
#await proxy.start_proxy()
except OSError as e:
log.debug(f"Can't start proxy: {e}")
# To shut down the proxy
# asyncio.run(proxy.shutdown())
asyncio.run(main())

@ -17,3 +17,4 @@ importlib-resources>=1.3; python_version < '3.9'
truststore>=0.8.0; python_version >= '3.10'
setuptools>=60.8.1; python_version >= '3.7'
setuptools==59.6.0; python_version < '3.7' # v59.6.0 is the last version to support Python 3.6
telnetlib3>=2.0.4; python_version >= '3.7' # 2.0.0 only support 3.7 and higher.

Loading…
Cancel
Save