diff --git a/gns3server/compute/base_node.py b/gns3server/compute/base_node.py index 02af6c02..9d01af14 100644 --- a/gns3server/compute/base_node.py +++ b/gns3server/compute/base_node.py @@ -18,8 +18,6 @@ import sys import os import stat -import logging -import aiohttp import shutil import asyncio import tempfile @@ -27,7 +25,7 @@ import psutil import platform import re -from aiohttp.web import WebSocketResponse +from fastapi import WebSocketDisconnect from gns3server.utils.interfaces import interfaces from gns3server.compute.compute_error import ComputeError from ..compute.port_manager import PortManager @@ -38,7 +36,7 @@ from ..ubridge.ubridge_error import UbridgeError from .nios.nio_udp import NIOUDP from .error import NodeError - +import logging log = logging.getLogger(__name__) @@ -414,7 +412,7 @@ class BaseNode: await self.stop_wrap_console() await self.start_wrap_console() - async def start_websocket_console(self, request): + async def start_websocket_console(self, websocket): """ Connect to console using Websocket. @@ -428,47 +426,45 @@ class BaseNode: raise NodeError("Node {} console type is not telnet".format(self.name)) try: - (telnet_reader, telnet_writer) = await asyncio.open_connection(self._manager.port_manager.console_host, self.console) + (telnet_reader, telnet_writer) = await asyncio.open_connection(self._manager.port_manager.console_host, + self.console) except ConnectionError as e: raise NodeError("Cannot connect to node {} telnet server: {}".format(self.name, e)) log.info("Connected to Telnet server") - ws = WebSocketResponse() - await ws.prepare(request) - request.app['websockets'].add(ws) - - log.info("New client has connected to console WebSocket") + await websocket.accept() + log.info(f"New client {websocket.client.host}:{websocket.client.port} has connected to compute" + f" console WebSocket") async def ws_forward(telnet_writer): - async for msg in ws: - if msg.type == aiohttp.WSMsgType.TEXT: - telnet_writer.write(msg.data.encode()) - await telnet_writer.drain() - elif msg.type == aiohttp.WSMsgType.BINARY: - await telnet_writer.write(msg.data) - await telnet_writer.drain() - elif msg.type == aiohttp.WSMsgType.ERROR: - log.debug("Websocket connection closed with exception {}".format(ws.exception())) + try: + while True: + data = await websocket.receive_text() + if data: + telnet_writer.write(data.encode()) + await telnet_writer.drain() + except WebSocketDisconnect: + log.info(f"Client {websocket.client.host}:{websocket.client.port} has disconnected from compute" + f" console WebSocket") async def telnet_forward(telnet_reader): - while not ws.closed and not telnet_reader.at_eof(): + while not telnet_reader.at_eof(): data = await telnet_reader.read(1024) if data: - await ws.send_bytes(data) + await websocket.send_bytes(data) - try: - # keep forwarding websocket data in both direction - await asyncio.wait([ws_forward(telnet_writer), telnet_forward(telnet_reader)], return_when=asyncio.FIRST_COMPLETED) - finally: - log.info("Client has disconnected from console WebSocket") - if not ws.closed: - await ws.close() - request.app['websockets'].discard(ws) + # keep forwarding WebSocket data in both direction + done, pending = await asyncio.wait([ws_forward(telnet_writer), telnet_forward(telnet_reader)], + return_when=asyncio.FIRST_COMPLETED) + for task in done: + if task.exception(): + log.warning(f"Exception while forwarding WebSocket data to Telnet server {task.exception()}") - return ws + for task in pending: + task.cancel() @property def aux(self): diff --git a/gns3server/endpoints/compute/atm_switch_nodes.py b/gns3server/endpoints/compute/atm_switch_nodes.py index cf3621fe..01fefef8 100644 --- a/gns3server/endpoints/compute/atm_switch_nodes.py +++ b/gns3server/endpoints/compute/atm_switch_nodes.py @@ -47,7 +47,7 @@ async def dep_node(project_id: UUID, node_id: UUID): return node -@router.post("/", +@router.post("", response_model=schemas.ATMSwitch, status_code=status.HTTP_201_CREATED, responses={409: {"model": schemas.ErrorMessage, "description": "Could not create ATM switch node"}}) diff --git a/gns3server/endpoints/compute/cloud_nodes.py b/gns3server/endpoints/compute/cloud_nodes.py index 2fb0713b..af22cfa3 100644 --- a/gns3server/endpoints/compute/cloud_nodes.py +++ b/gns3server/endpoints/compute/cloud_nodes.py @@ -48,7 +48,7 @@ def dep_node(project_id: UUID, node_id: UUID): return node -@router.post("/", +@router.post("", response_model=schemas.Cloud, status_code=status.HTTP_201_CREATED, responses={409: {"model": schemas.ErrorMessage, "description": "Could not create cloud node"}}) diff --git a/gns3server/endpoints/compute/docker_nodes.py b/gns3server/endpoints/compute/docker_nodes.py index 21a8cbcc..199c1664 100644 --- a/gns3server/endpoints/compute/docker_nodes.py +++ b/gns3server/endpoints/compute/docker_nodes.py @@ -21,7 +21,7 @@ API endpoints for Docker nodes. import os -from fastapi import APIRouter, Depends, Body, status +from fastapi import APIRouter, WebSocket, Depends, Body, status from fastapi.encoders import jsonable_encoder from fastapi.responses import StreamingResponse from uuid import UUID @@ -47,7 +47,7 @@ def dep_node(project_id: UUID, node_id: UUID): return node -@router.post("/", +@router.post("", response_model=schemas.Docker, status_code=status.HTTP_201_CREATED, responses={409: {"model": schemas.ErrorMessage, "description": "Could not create Docker node"}}) @@ -290,14 +290,6 @@ async def stop_capture(adapter_number: int, port_number: int, node: DockerVM = D await node.stop_capture(adapter_number) -@router.post("/{node_id}/console/reset", - status_code=status.HTTP_204_NO_CONTENT, - responses=responses) -async def reset_console(node: DockerVM = Depends(dep_node)): - - await node.reset_console() - - @router.get("/{node_id}/adapters/{adapter_number}/ports/{port_number}/pcap", responses=responses) async def stream_pcap_file(adapter_number: int, port_number: int, node: DockerVM = Depends(dep_node)): @@ -310,18 +302,19 @@ async def stream_pcap_file(adapter_number: int, port_number: int, node: DockerVM stream = Docker.instance().stream_pcap_file(nio, node.project.id) return StreamingResponse(stream, media_type="application/vnd.tcpdump.pcap") -# @Route.get( -# r"/projects/{project_id}/docker/nodes/{node_id}/console/ws", -# description="WebSocket for console", -# parameters={ -# "project_id": "Project UUID", -# "node_id": "Node UUID", -# }) -# async def console_ws(request, response): -# -# docker_manager = Docker.instance() -# container = docker_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"]) -# return await container.start_websocket_console(request) + +@router.websocket("/{node_id}/console/ws") +async def console_ws(websocket: WebSocket, node: DockerVM = Depends(dep_node)): + """ + Console WebSocket. + """ + + await node.start_websocket_console(websocket) +@router.post("/{node_id}/console/reset", + status_code=status.HTTP_204_NO_CONTENT, + responses=responses) +async def reset_console(node: DockerVM = Depends(dep_node)): + await node.reset_console() diff --git a/gns3server/endpoints/compute/dynamips_nodes.py b/gns3server/endpoints/compute/dynamips_nodes.py index 86db8131..bee788ba 100644 --- a/gns3server/endpoints/compute/dynamips_nodes.py +++ b/gns3server/endpoints/compute/dynamips_nodes.py @@ -22,7 +22,7 @@ API endpoints for Dynamips nodes. import os import sys -from fastapi import APIRouter, Depends, status +from fastapi import APIRouter, WebSocket, Depends, status from fastapi.encoders import jsonable_encoder from fastapi.responses import StreamingResponse from typing import List @@ -56,7 +56,7 @@ def dep_node(project_id: UUID, node_id: UUID): return node -@router.post("/", +@router.post("", response_model=schemas.Dynamips, status_code=status.HTTP_201_CREATED, responses={409: {"model": schemas.ErrorMessage, "description": "Could not create Dynamips node"}}) @@ -299,18 +299,13 @@ async def duplicate_router(destination_node_id: UUID, node: Router = Depends(dep return new_node.__json__() -# @Route.get( -# r"/projects/{project_id}/dynamips/nodes/{node_id}/console/ws", -# description="WebSocket for console", -# parameters={ -# "project_id": "Project UUID", -# "node_id": "Node UUID", -# }) -# async def console_ws(request, response): -# -# dynamips_manager = Dynamips.instance() -# vm = dynamips_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"]) -# return await vm.start_websocket_console(request) +@router.websocket("/{node_id}/console/ws") +async def console_ws(websocket: WebSocket, node: Router = Depends(dep_node)): + """ + Console WebSocket. + """ + + await node.start_websocket_console(websocket) @router.post("/{node_id}/console/reset", diff --git a/gns3server/endpoints/compute/ethernet_hub_nodes.py b/gns3server/endpoints/compute/ethernet_hub_nodes.py index 796de758..bfb7aef1 100644 --- a/gns3server/endpoints/compute/ethernet_hub_nodes.py +++ b/gns3server/endpoints/compute/ethernet_hub_nodes.py @@ -47,7 +47,7 @@ def dep_node(project_id: UUID, node_id: UUID): return node -@router.post("/", +@router.post("", response_model=schemas.EthernetHub, status_code=status.HTTP_201_CREATED, responses={409: {"model": schemas.ErrorMessage, "description": "Could not create Ethernet hub node"}}) diff --git a/gns3server/endpoints/compute/ethernet_switch_nodes.py b/gns3server/endpoints/compute/ethernet_switch_nodes.py index a11fb4da..be7e37aa 100644 --- a/gns3server/endpoints/compute/ethernet_switch_nodes.py +++ b/gns3server/endpoints/compute/ethernet_switch_nodes.py @@ -47,7 +47,7 @@ def dep_node(project_id: UUID, node_id: UUID): return node -@router.post("/", +@router.post("", response_model=schemas.EthernetSwitch, status_code=status.HTTP_201_CREATED, responses={409: {"model": schemas.ErrorMessage, "description": "Could not create Ethernet switch node"}}) diff --git a/gns3server/endpoints/compute/frame_relay_switch_nodes.py b/gns3server/endpoints/compute/frame_relay_switch_nodes.py index fc0a1821..911c6f08 100644 --- a/gns3server/endpoints/compute/frame_relay_switch_nodes.py +++ b/gns3server/endpoints/compute/frame_relay_switch_nodes.py @@ -47,7 +47,7 @@ def dep_node(project_id: UUID, node_id: UUID): return node -@router.post("/", +@router.post("", response_model=schemas.FrameRelaySwitch, status_code=status.HTTP_201_CREATED, responses={409: {"model": schemas.ErrorMessage, "description": "Could not create Frame Relay switch node"}}) diff --git a/gns3server/endpoints/compute/iou_nodes.py b/gns3server/endpoints/compute/iou_nodes.py index 3ac22777..667bb9e4 100644 --- a/gns3server/endpoints/compute/iou_nodes.py +++ b/gns3server/endpoints/compute/iou_nodes.py @@ -21,7 +21,7 @@ API endpoints for IOU nodes. import os -from fastapi import APIRouter, Depends, Body, status +from fastapi import APIRouter, WebSocket, Depends, Body, status from fastapi.encoders import jsonable_encoder from fastapi.responses import StreamingResponse from typing import Union @@ -48,7 +48,7 @@ def dep_node(project_id: UUID, node_id: UUID): return node -@router.post("/", +@router.post("", response_model=schemas.IOU, status_code=status.HTTP_201_CREATED, responses={409: {"model": schemas.ErrorMessage, "description": "Could not create IOU node"}}) @@ -275,23 +275,18 @@ async def stream_pcap_file(adapter_number: int, port_number: int, node: IOUVM = return StreamingResponse(stream, media_type="application/vnd.tcpdump.pcap") +@router.websocket("/{node_id}/console/ws") +async def console_ws(websocket: WebSocket, node: IOUVM = Depends(dep_node)): + """ + Console WebSocket. + """ + + await node.start_websocket_console(websocket) + + @router.post("/{node_id}/console/reset", status_code=status.HTTP_204_NO_CONTENT, responses=responses) async def reset_console(node: IOUVM = Depends(dep_node)): await node.reset_console() - - -# @Route.get( -# r"/projects/{project_id}/iou/nodes/{node_id}/console/ws", -# description="WebSocket for console", -# parameters={ -# "project_id": "Project UUID", -# "node_id": "Node UUID", -# }) -# async def console_ws(request, response): -# -# iou_manager = IOU.instance() -# vm = iou_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"]) -# return await vm.start_websocket_console(request) diff --git a/gns3server/endpoints/compute/nat_nodes.py b/gns3server/endpoints/compute/nat_nodes.py index 2748cded..a2c40f89 100644 --- a/gns3server/endpoints/compute/nat_nodes.py +++ b/gns3server/endpoints/compute/nat_nodes.py @@ -48,7 +48,7 @@ def dep_node(project_id: UUID, node_id: UUID): return node -@router.post("/", +@router.post("", response_model=schemas.NAT, status_code=status.HTTP_201_CREATED, responses={409: {"model": schemas.ErrorMessage, "description": "Could not create NAT node"}}) diff --git a/gns3server/endpoints/compute/notifications.py b/gns3server/endpoints/compute/notifications.py index 059e91f5..7ab3ed20 100644 --- a/gns3server/endpoints/compute/notifications.py +++ b/gns3server/endpoints/compute/notifications.py @@ -19,11 +19,10 @@ API endpoints for compute notifications. """ -from fastapi import APIRouter, WebSocket, WebSocketDisconnect -from websockets.exceptions import WebSocketException -from typing import List - +import asyncio +from fastapi import APIRouter, WebSocket from gns3server.compute.notification_manager import NotificationManager +from starlette.endpoints import WebSocketEndpoint import logging log = logging.getLogger(__name__) @@ -31,48 +30,63 @@ log = logging.getLogger(__name__) router = APIRouter() -class ConnectionManager: - def __init__(self): - self.active_connections: List[WebSocket] = [] +@router.websocket_route("/notifications/ws") +class ComputeWebSocketNotifications(WebSocketEndpoint): + """ + Receive compute notifications about the controller from WebSocket stream. + """ + + async def on_connect(self, websocket: WebSocket) -> None: - async def connect(self, websocket: WebSocket): await websocket.accept() - self.active_connections.append(websocket) + log.info(f"New client {websocket.client.host}:{websocket.client.port} has connected to compute WebSocket") + self._notification_task = asyncio.ensure_future(self._stream_notifications(websocket)) - def disconnect(self, websocket: WebSocket): + async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None: - self.active_connections.remove(websocket) + self._notification_task.cancel() + log.info(f"Client {websocket.client.host}:{websocket.client.port} has disconnected from controller WebSocket" + f" with close code {close_code}") - async def close_active_connections(self): + async def _stream_notifications(self, websocket: WebSocket) -> None: - for websocket in self.active_connections: - await websocket.close() - - async def send_text(self, message: str, websocket: WebSocket): - await websocket.send_text(message) - - async def broadcast(self, message: str): - for connection in self.active_connections: - await connection.send_text(message) - - -manager = ConnectionManager() - - -@router.websocket("/notifications/ws") -async def compute_notifications(websocket: WebSocket): - - log.info("Client has disconnected from compute WebSocket") - notifications = NotificationManager.instance() - await manager.connect(websocket) - try: - log.info("New client has connected to compute WebSocket") - with notifications.queue() as queue: + with NotificationManager.instance().queue() as queue: while True: notification = await queue.get_json(5) - await manager.send_text(notification, websocket) - except (WebSocketException, WebSocketDisconnect) as e: - log.info("Client has disconnected from compute WebSocket: {}".format(e)) - finally: - await websocket.close() - manager.disconnect(websocket) + await websocket.send_text(notification) + + +if __name__ == '__main__': + + import uvicorn + from fastapi import FastAPI + from starlette.responses import HTMLResponse + + app = FastAPI() + app.include_router(router) + + html = """ + + + + + + + + """ + + @app.get("/") + async def get() -> HTMLResponse: + return HTMLResponse(html) + + uvicorn.run(app, host="localhost", port=8000) diff --git a/gns3server/endpoints/compute/projects.py b/gns3server/endpoints/compute/projects.py index 76dec4c2..8d392da9 100644 --- a/gns3server/endpoints/compute/projects.py +++ b/gns3server/endpoints/compute/projects.py @@ -19,8 +19,6 @@ API endpoints for projects. """ -import shutil -import aiohttp import os import logging @@ -108,6 +106,7 @@ async def close_project(project: Project = Depends(dep_project)): Close a project on the compute. """ + # FIXME if _notifications_listening.setdefault(project.id, 0) <= 1: await project.close() ProjectManager.instance().remove_project(project.id) @@ -234,6 +233,6 @@ async def write_file(file_path: str, request: Request, project: Project = Depend pass # FIXME except FileNotFoundError: - raise aiohttp.web.HTTPNotFound() + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) except PermissionError: - raise aiohttp.web.HTTPForbidden() + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) diff --git a/gns3server/endpoints/compute/qemu_nodes.py b/gns3server/endpoints/compute/qemu_nodes.py index f88d3391..222ac001 100644 --- a/gns3server/endpoints/compute/qemu_nodes.py +++ b/gns3server/endpoints/compute/qemu_nodes.py @@ -22,14 +22,13 @@ API endpoints for Qemu nodes. import os import sys -from fastapi import APIRouter, Depends, Body, status +from fastapi import APIRouter, WebSocket, Depends, Body, status from fastapi.encoders import jsonable_encoder from fastapi.responses import StreamingResponse from uuid import UUID from gns3server.endpoints import schemas from gns3server.compute.project_manager import ProjectManager -from gns3server.compute.compute_error import ComputeError from gns3server.compute.qemu import Qemu from gns3server.compute.qemu.qemu_vm import QemuVM @@ -50,7 +49,7 @@ def dep_node(project_id: UUID, node_id: UUID): return node -@router.post("/", +@router.post("", response_model=schemas.Qemu, status_code=status.HTTP_201_CREATED, responses={409: {"model": schemas.ErrorMessage, "description": "Could not create Qemu node"}}) @@ -281,14 +280,6 @@ async def stop_capture(adapter_number: int, port_number: int, node: QemuVM = Dep await node.stop_capture(adapter_number) -@router.post("/{node_id}/console/reset", - status_code=status.HTTP_204_NO_CONTENT, - responses=responses) -async def reset_console(node: QemuVM = Depends(dep_node)): - - await node.reset_console() - - @router.get("/{node_id}/adapters/{adapter_number}/ports/{port_number}/pcap", responses=responses) async def stream_pcap_file(adapter_number: int, port_number: int, node: QemuVM = Depends(dep_node)): @@ -302,16 +293,18 @@ async def stream_pcap_file(adapter_number: int, port_number: int, node: QemuVM = return StreamingResponse(stream, media_type="application/vnd.tcpdump.pcap") -# @Route.get( -# r"/projects/{project_id}/qemu/nodes/{node_id}/console/ws", -# description="WebSocket for console", -# parameters={ -# "project_id": "Project UUID", -# "node_id": "Node UUID", -# }) -# async def console_ws(request, response): -# -# qemu_manager = Qemu.instance() -# vm = qemu_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"]) -# return await vm.start_websocket_console(request) +@router.websocket("/{node_id}/console/ws") +async def console_ws(websocket: WebSocket, node: QemuVM = Depends(dep_node)): + """ + Console WebSocket. + """ + await node.start_websocket_console(websocket) + + +@router.post("/{node_id}/console/reset", + status_code=status.HTTP_204_NO_CONTENT, + responses=responses) +async def reset_console(node: QemuVM = Depends(dep_node)): + + await node.reset_console() diff --git a/gns3server/endpoints/compute/virtualbox_nodes.py b/gns3server/endpoints/compute/virtualbox_nodes.py index 01890866..661560cf 100644 --- a/gns3server/endpoints/compute/virtualbox_nodes.py +++ b/gns3server/endpoints/compute/virtualbox_nodes.py @@ -21,7 +21,7 @@ API endpoints for VirtualBox nodes. import os -from fastapi import APIRouter, Depends, status +from fastapi import APIRouter, WebSocket, Depends, status from fastapi.encoders import jsonable_encoder from fastapi.responses import StreamingResponse from uuid import UUID @@ -49,7 +49,7 @@ def dep_node(project_id: UUID, node_id: UUID): return node -@router.post("/", +@router.post("", response_model=schemas.VirtualBox, status_code=status.HTTP_201_CREATED, responses={409: {"model": schemas.ErrorMessage, "description": "Could not create VirtualBox node"}}) @@ -288,14 +288,6 @@ async def stop_capture(adapter_number: int, port_number: int, node: VirtualBoxVM await node.stop_capture(adapter_number) -@router.post("/{node_id}/console/reset", - status_code=status.HTTP_204_NO_CONTENT, - responses=responses) -async def reset_console(node: VirtualBoxVM = Depends(dep_node)): - - await node.reset_console() - - @router.get("/{node_id}/adapters/{adapter_number}/ports/{port_number}/pcap", responses=responses) async def stream_pcap_file(adapter_number: int, port_number: int, node: VirtualBoxVM = Depends(dep_node)): @@ -309,15 +301,18 @@ async def stream_pcap_file(adapter_number: int, port_number: int, node: VirtualB return StreamingResponse(stream, media_type="application/vnd.tcpdump.pcap") -# @Route.get( -# r"/projects/{project_id}/virtualbox/nodes/{node_id}/console/ws", -# description="WebSocket for console", -# parameters={ -# "project_id": "Project UUID", -# "node_id": "Node UUID", -# }) -# async def console_ws(request, response): -# -# virtualbox_manager = VirtualBox.instance() -# vm = virtualbox_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"]) -# return await vm.start_websocket_console(request) +@router.websocket("/{node_id}/console/ws") +async def console_ws(websocket: WebSocket, node: VirtualBoxVM = Depends(dep_node)): + """ + Console WebSocket. + """ + + await node.start_websocket_console(websocket) + + +@router.post("/{node_id}/console/reset", + status_code=status.HTTP_204_NO_CONTENT, + responses=responses) +async def reset_console(node: VirtualBoxVM = Depends(dep_node)): + + await node.reset_console() diff --git a/gns3server/endpoints/compute/vmware_nodes.py b/gns3server/endpoints/compute/vmware_nodes.py index 989ba70f..b456ca0c 100644 --- a/gns3server/endpoints/compute/vmware_nodes.py +++ b/gns3server/endpoints/compute/vmware_nodes.py @@ -21,7 +21,7 @@ API endpoints for VMware nodes. import os -from fastapi import APIRouter, Depends, status +from fastapi import APIRouter, WebSocket, Depends, status from fastapi.encoders import jsonable_encoder from fastapi.responses import StreamingResponse from uuid import UUID @@ -48,7 +48,7 @@ def dep_node(project_id: UUID, node_id: UUID): return node -@router.post("/", +@router.post("", response_model=schemas.VMware, status_code=status.HTTP_201_CREATED, responses={409: {"model": schemas.ErrorMessage, "description": "Could not create VMware node"}}) @@ -253,14 +253,6 @@ async def stop_capture(adapter_number: int, port_number: int, node: VMwareVM = D await node.stop_capture(adapter_number) -@router.post("/{node_id}/console/reset", - status_code=status.HTTP_204_NO_CONTENT, - responses=responses) -async def reset_console(node: VMwareVM = Depends(dep_node)): - - await node.reset_console() - - @router.get("/{node_id}/adapters/{adapter_number}/ports/{port_number}/pcap", responses=responses) async def stream_pcap_file(adapter_number: int, port_number: int, node: VMwareVM = Depends(dep_node)): @@ -289,16 +281,18 @@ def allocate_vmnet(node: VMwareVM = Depends(dep_node)) -> dict: return {"vmnet": vmnet} -# @Route.get( -# r"/projects/{project_id}/vmware/nodes/{node_id}/console/ws", -# description="WebSocket for console", -# parameters={ -# "project_id": "Project UUID", -# "node_id": "Node UUID", -# }) -# async def console_ws(request, response): -# -# vmware_manager = VMware.instance() -# vm = vmware_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"]) -# return await vm.start_websocket_console(request) -# +@router.websocket("/{node_id}/console/ws") +async def console_ws(websocket: WebSocket, node: VMwareVM = Depends(dep_node)): + """ + Console WebSocket. + """ + + await node.start_websocket_console(websocket) + + +@router.post("/{node_id}/console/reset", + status_code=status.HTTP_204_NO_CONTENT, + responses=responses) +async def reset_console(node: VMwareVM = Depends(dep_node)): + + await node.reset_console() diff --git a/gns3server/endpoints/compute/vpcs_nodes.py b/gns3server/endpoints/compute/vpcs_nodes.py index 064a3d21..abd64930 100644 --- a/gns3server/endpoints/compute/vpcs_nodes.py +++ b/gns3server/endpoints/compute/vpcs_nodes.py @@ -21,14 +21,13 @@ API endpoints for VPCS nodes. import os -from fastapi import APIRouter, Depends, Body, status +from fastapi import APIRouter, WebSocket, Depends, Body, status from fastapi.encoders import jsonable_encoder from fastapi.responses import StreamingResponse from uuid import UUID from gns3server.endpoints import schemas from gns3server.compute.vpcs import VPCS -from gns3server.compute.project_manager import ProjectManager from gns3server.compute.vpcs.vpcs_vm import VPCSVM router = APIRouter() @@ -48,7 +47,7 @@ def dep_node(project_id: UUID, node_id: UUID): return node -@router.post("/", +@router.post("", response_model=schemas.VPCS, status_code=status.HTTP_201_CREATED, responses={409: {"model": schemas.ErrorMessage, "description": "Could not create VMware node"}}) @@ -258,15 +257,10 @@ async def stream_pcap_file(adapter_number: int, port_number: int, node: VPCSVM = return StreamingResponse(stream, media_type="application/vnd.tcpdump.pcap") -# @Route.get( -# r"/projects/{project_id}/vpcs/nodes/{node_id}/console/ws", -# description="WebSocket for console", -# parameters={ -# "project_id": "Project UUID", -# "node_id": "Node UUID", -# }) -# async def console_ws(request, response): -# -# vpcs_manager = VPCS.instance() -# vm = vpcs_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"]) -# return await vm.start_websocket_console(request) +@router.websocket("/{node_id}/console/ws") +async def console_ws(websocket: WebSocket, node: VPCSVM = Depends(dep_node)): + """ + Console WebSocket. + """ + + await node.start_websocket_console(websocket) diff --git a/gns3server/endpoints/controller/appliances.py b/gns3server/endpoints/controller/appliances.py index 9a1abc44..3f10d0f3 100644 --- a/gns3server/endpoints/controller/appliances.py +++ b/gns3server/endpoints/controller/appliances.py @@ -25,7 +25,7 @@ from typing import Optional router = APIRouter() -@router.get("/") +@router.get("") async def get_appliances(update: Optional[bool] = None, symbol_theme: Optional[str] = "Classic"): """ Return all appliances known by the controller. diff --git a/gns3server/endpoints/controller/computes.py b/gns3server/endpoints/controller/computes.py index 4cd6285a..8a76b239 100644 --- a/gns3server/endpoints/controller/computes.py +++ b/gns3server/endpoints/controller/computes.py @@ -35,7 +35,7 @@ responses = { } -@router.post("/", +@router.post("", status_code=status.HTTP_201_CREATED, response_model=schemas.Compute, responses={404: {"model": ErrorMessage, "description": "Could not connect to compute"}, @@ -64,7 +64,7 @@ def get_compute(compute_id: Union[str, UUID]): return compute.__json__() -@router.get("/", +@router.get("", response_model=List[schemas.Compute], response_model_exclude_unset=True) async def get_computes(): diff --git a/gns3server/endpoints/controller/drawings.py b/gns3server/endpoints/controller/drawings.py index 75e5a246..fbb3967c 100644 --- a/gns3server/endpoints/controller/drawings.py +++ b/gns3server/endpoints/controller/drawings.py @@ -35,7 +35,7 @@ responses = { } -@router.get("/", +@router.get("", response_model=List[Drawing], response_model_exclude_unset=True) async def get_drawings(project_id: UUID): @@ -47,7 +47,7 @@ async def get_drawings(project_id: UUID): return [v.__json__() for v in project.drawings.values()] -@router.post("/", +@router.post("", status_code=status.HTTP_201_CREATED, response_model=Drawing, responses=responses) diff --git a/gns3server/endpoints/controller/gns3vm.py b/gns3server/endpoints/controller/gns3vm.py index 4351cceb..16669350 100644 --- a/gns3server/endpoints/controller/gns3vm.py +++ b/gns3server/endpoints/controller/gns3vm.py @@ -48,8 +48,7 @@ async def get_vms(engine: str): return vms -@router.get("/", - response_model=GNS3VM) +@router.get("", response_model=GNS3VM) async def get_gns3vm_settings(): """ Return the GNS3 VM settings. @@ -58,9 +57,7 @@ async def get_gns3vm_settings(): return Controller.instance().gns3vm.__json__() -@router.put("/", - response_model=GNS3VM, - response_model_exclude_unset=True) +@router.put("", response_model=GNS3VM, response_model_exclude_unset=True) async def update_gns3vm_settings(gns3vm_data: GNS3VM): """ Update the GNS3 VM settings. diff --git a/gns3server/endpoints/controller/links.py b/gns3server/endpoints/controller/links.py index 46b8424f..3b52b8b9 100644 --- a/gns3server/endpoints/controller/links.py +++ b/gns3server/endpoints/controller/links.py @@ -48,7 +48,7 @@ async def dep_link(project_id: UUID, link_id: UUID): return link -@router.get("/", +@router.get("", response_model=List[schemas.Link], response_model_exclude_unset=True) async def get_links(project_id: UUID): @@ -60,7 +60,7 @@ async def get_links(project_id: UUID): return [v.__json__() for v in project.links.values()] -@router.post("/", +@router.post("", status_code=status.HTTP_201_CREATED, response_model=schemas.Link, responses={404: {"model": ErrorMessage, "description": "Could not find project"}, @@ -214,4 +214,4 @@ async def reset_link(link: Link = Depends(dep_link)): # break # await proxied_response.write(data) # -# #return StreamingResponse(file_like, media_type="video/mp4")) \ No newline at end of file +# #return StreamingResponse(file_like, media_type="video/mp4")) diff --git a/gns3server/endpoints/controller/nodes.py b/gns3server/endpoints/controller/nodes.py index 90efa1c5..0f65043e 100644 --- a/gns3server/endpoints/controller/nodes.py +++ b/gns3server/endpoints/controller/nodes.py @@ -19,9 +19,10 @@ API endpoints for nodes. """ +import aiohttp import asyncio -from fastapi import APIRouter, Depends, Request, Response, status +from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect, Request, Response, status from fastapi.encoders import jsonable_encoder from fastapi.routing import APIRoute from typing import List, Callable @@ -35,6 +36,9 @@ from gns3server.controller.controller_error import ControllerForbiddenError from gns3server.endpoints.schemas.common import ErrorMessage from gns3server.endpoints import schemas +import logging +log = logging.getLogger(__name__) + node_locks = {} @@ -97,7 +101,7 @@ async def dep_node(node_id: UUID, project: Project = Depends(dep_project)): return node -@router.post("/", +@router.post("", status_code=status.HTTP_201_CREATED, response_model=schemas.Node, responses={404: {"model": ErrorMessage, "description": "Could not find project"}, @@ -117,7 +121,7 @@ async def create_node(node_data: schemas.Node, project: Project = Depends(dep_pr return node.__json__() -@router.get("/", +@router.get("", response_model=List[schemas.Node], response_model_exclude_unset=True) async def get_nodes(project: Project = Depends(dep_project)): @@ -367,59 +371,47 @@ async def post_file(file_path: str, request: Request, node: Node = Depends(dep_n raw=True) -# @Route.get( -# r"/projects/{project_id}/nodes/{node_id}/console/ws", -# parameters={ -# "project_id": "Project UUID", -# "node_id": "Node UUID" -# }, -# description="Connect to WebSocket console", -# status_codes={ -# 200: "File returned", -# 403: "Permission denied", -# 404: "The file doesn't exist" -# }) -# async def ws_console(request, response): -# -# project = await Controller.instance().get_loaded_project(request.match_info["project_id"]) -# node = project.get_node(request.match_info["node_id"]) -# compute = node.compute -# ws = aiohttp.web.WebSocketResponse() -# await ws.prepare(request) -# request.app['websockets'].add(ws) -# -# ws_console_compute_url = "ws://{compute_host}:{compute_port}/v2/compute/projects/{project_id}/{node_type}/nodes/{node_id}/console/ws".format(compute_host=compute.host, -# compute_port=compute.port, -# project_id=project.id, -# node_type=node.node_type, -# node_id=node.id) -# -# async def ws_forward(ws_client): -# async for msg in ws: -# if msg.type == aiohttp.WSMsgType.TEXT: -# await ws_client.send_str(msg.data) -# elif msg.type == aiohttp.WSMsgType.BINARY: -# await ws_client.send_bytes(msg.data) -# elif msg.type == aiohttp.WSMsgType.ERROR: -# break -# -# try: -# async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=None, force_close=True)) as session: -# async with session.ws_connect(ws_console_compute_url) as ws_client: -# asyncio.ensure_future(ws_forward(ws_client)) -# async for msg in ws_client: -# if msg.type == aiohttp.WSMsgType.TEXT: -# await ws.send_str(msg.data) -# elif msg.type == aiohttp.WSMsgType.BINARY: -# await ws.send_bytes(msg.data) -# elif msg.type == aiohttp.WSMsgType.ERROR: -# break -# finally: -# if not ws.closed: -# await ws.close() -# request.app['websockets'].discard(ws) -# -# return ws +@router.websocket("/{node_id}/console/ws") +async def ws_console(websocket: WebSocket, node: Node = Depends(dep_node)): + """ + WebSocket console. + """ + + compute = node.compute + await websocket.accept() + log.info(f"New client {websocket.client.host}:{websocket.client.port} has connected to controller console WebSocket") + ws_console_compute_url = f"ws://{compute.host}:{compute.port}/v2/compute/projects/" \ + f"{node.project.id}/{node.node_type}/nodes/{node.id}/console/ws" + + async def ws_receive(ws_console_compute): + """ + Receive WebSocket data from client and forward to compute console WebSocket. + """ + + try: + while True: + data = await websocket.receive_text() + if data: + await ws_console_compute.send_str(data) + except WebSocketDisconnect: + await ws_console_compute.close() + log.info(f"Client {websocket.client.host}:{websocket.client.port} has disconnected from controller" + f" console WebSocket") + + try: + # receive WebSocket data from compute console WebSocket and forward to client. + async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=None, force_close=True)) as session: + async with session.ws_connect(ws_console_compute_url) as ws_console_compute: + asyncio.ensure_future(ws_receive(ws_console_compute)) + async for msg in ws_console_compute: + if msg.type == aiohttp.WSMsgType.TEXT: + await websocket.send_text(msg.data) + elif msg.type == aiohttp.WSMsgType.BINARY: + await websocket.send_bytes(msg.data) + elif msg.type == aiohttp.WSMsgType.ERROR: + break + except aiohttp.client_exceptions.ClientResponseError as e: + log.error(f"Client response error received when forwarding to compute console WebSocket: {e}") @router.post("/console/reset", diff --git a/gns3server/endpoints/controller/notifications.py b/gns3server/endpoints/controller/notifications.py index e71c75a3..171e52aa 100644 --- a/gns3server/endpoints/controller/notifications.py +++ b/gns3server/endpoints/controller/notifications.py @@ -19,51 +19,58 @@ API endpoints for controller notifications. """ +import asyncio + +from fastapi import APIRouter, WebSocket +from fastapi.responses import StreamingResponse +from starlette.endpoints import WebSocketEndpoint -from fastapi import APIRouter, Request, Response, WebSocket, WebSocketDisconnect -from websockets.exceptions import WebSocketException from gns3server.controller import Controller -router = APIRouter() - import logging log = logging.getLogger(__name__) - -# @router.get("/") -# async def notification(request: Request): -# """ -# Receive notifications about the controller from HTTP -# """ -# -# controller = Controller.instance() -# -# await response.prepare(request) -# response = Response(content, media_type="application/json") -# -# with controller.notification.controller_queue() as queue: -# while True: -# msg = await queue.get_json(5) -# await response.write(("{}\n".format(msg)).encode("utf-8")) -# -# -# await response(scope, receive, send) +router = APIRouter() -@router.websocket("/ws") -async def notification_ws(websocket: WebSocket): +@router.get("") +async def http_notification(): """ - Receive notifications about the controller from a Websocket + Receive controller notifications about the controller from HTTP stream. """ - controller = Controller.instance() - await websocket.accept() - log.info("New client has connected to controller WebSocket") - try: - with controller.notification.controller_queue() as queue: + async def event_stream(): + + with Controller.instance().notification.controller_queue() as queue: + while True: + msg = await queue.get_json(5) + yield ("{}\n".format(msg)).encode("utf-8") + + return StreamingResponse(event_stream(), media_type="application/json") + + +@router.websocket_route("/ws") +class ControllerWebSocketNotifications(WebSocketEndpoint): + """ + Receive controller notifications about the controller from WebSocket stream. + """ + + async def on_connect(self, websocket: WebSocket) -> None: + + await websocket.accept() + log.info(f"New client {websocket.client.host}:{websocket.client.port} has connected to controller WebSocket") + + self._notification_task = asyncio.ensure_future(self._stream_notifications(websocket=websocket)) + + async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None: + + self._notification_task.cancel() + log.info(f"Client {websocket.client.host}:{websocket.client.port} has disconnected from controller WebSocket" + f" with close code {close_code}") + + async def _stream_notifications(self, websocket: WebSocket) -> None: + + with Controller.instance().notifications.queue() as queue: while True: notification = await queue.get_json(5) await websocket.send_text(notification) - except (WebSocketException, WebSocketDisconnect): - log.info("Client has disconnected from controller WebSocket") - await websocket.close() diff --git a/gns3server/endpoints/controller/projects.py b/gns3server/endpoints/controller/projects.py index a09af4ee..d0dda64a 100644 --- a/gns3server/endpoints/controller/projects.py +++ b/gns3server/endpoints/controller/projects.py @@ -32,7 +32,7 @@ log = logging.getLogger() from fastapi import APIRouter, Depends, Request, Body, HTTPException, status, WebSocket, WebSocketDisconnect from fastapi.encoders import jsonable_encoder from fastapi.responses import StreamingResponse, FileResponse -from websockets.exceptions import WebSocketException +from websockets.exceptions import ConnectionClosed, WebSocketException from typing import List from uuid import UUID @@ -66,7 +66,19 @@ def dep_project(project_id: UUID): CHUNK_SIZE = 1024 * 8 # 8KB -@router.post("/", +@router.get("", + response_model=List[schemas.Project], + response_model_exclude_unset=True) +def get_projects(): + """ + Return all projects. + """ + + controller = Controller.instance() + return [p.__json__() for p in controller.projects.values()] + + +@router.post("", status_code=status.HTTP_201_CREATED, response_model=schemas.Project, response_model_exclude_unset=True, @@ -78,22 +90,9 @@ async def create_project(project_data: schemas.ProjectCreate): controller = Controller.instance() project = await controller.add_project(**jsonable_encoder(project_data, exclude_unset=True)) - print(project.__json__()["variables"]) return project.__json__() -@router.get("/", - response_model=List[schemas.Project], - response_model_exclude_unset=True) -def get_projects(): - """ - Return all projects. - """ - - controller = Controller.instance() - return [p.__json__() for p in controller.projects.values()] - - @router.get("/{project_id}", response_model=schemas.Project, responses=responses) @@ -193,52 +192,57 @@ async def load_project(path: str = Body(..., embed=True)): return project.__json__() -# @router.get("/projects/{project_id}/notifications", -# summary="Receive notifications about projects", -# responses={404: {"model": ErrorMessage, "description": "Could not find project"}}) -# async def notification(project_id: UUID): -# -# controller = Controller.instance() -# project = controller.get_project(str(project_id)) -# #response.content_type = "application/json" -# #response.set_status(200) -# #response.enable_chunked_encoding() -# #await response.prepare(request) -# log.info("New client has connected to the notification stream for project ID '{}' (HTTP long-polling method)".format(project.id)) -# -# try: -# with controller.notification.project_queue(project.id) as queue: -# while True: -# msg = await queue.get_json(5) -# await response.write(("{}\n".format(msg)).encode("utf-8")) -# finally: -# log.info("Client has disconnected from notification for project ID '{}' (HTTP long-polling method)".format(project.id)) -# if project.auto_close: -# # To avoid trouble with client connecting disconnecting we sleep few seconds before checking -# # if someone else is not connected -# await asyncio.sleep(5) -# if not controller.notification.project_has_listeners(project.id): -# log.info("Project '{}' is automatically closing due to no client listening".format(project.id)) -# await project.close() +@router.get("/{project_id}/notifications") +async def notification(project_id: UUID): + """ + Receive project notifications about the controller from HTTP stream. + """ + + controller = Controller.instance() + project = controller.get_project(str(project_id)) + + log.info("New client has connected to the notification stream for project ID '{}' (HTTP steam method)".format(project.id)) + + async def event_stream(): + + try: + with controller.notification.project_queue(project.id) as queue: + while True: + msg = await queue.get_json(5) + yield ("{}\n".format(msg)).encode("utf-8") + finally: + log.info("Client has disconnected from notification for project ID '{}' (HTTP stream method)".format(project.id)) + if project.auto_close: + # To avoid trouble with client connecting disconnecting we sleep few seconds before checking + # if someone else is not connected + await asyncio.sleep(5) + if not controller.notification.project_has_listeners(project.id): + log.info("Project '{}' is automatically closing due to no client listening".format(project.id)) + await project.close() + + return StreamingResponse(event_stream(), media_type="application/json") @router.websocket("/{project_id}/notifications/ws") async def notification_ws(project_id: UUID, websocket: WebSocket): + """ + Receive project notifications about the controller from WebSocket. + """ controller = Controller.instance() project = controller.get_project(str(project_id)) await websocket.accept() - #request.app['websockets'].add(ws) - #asyncio.ensure_future(process_websocket(ws)) log.info("New client has connected to the notification stream for project ID '{}' (WebSocket method)".format(project.id)) try: with controller.notification.project_queue(project.id) as queue: while True: notification = await queue.get_json(5) await websocket.send_text(notification) - except (WebSocketException, WebSocketDisconnect): + except (ConnectionClosed, WebSocketDisconnect): log.info("Client has disconnected from notification stream for project ID '{}' (WebSocket method)".format(project.id)) + except WebSocketException as e: + log.warning("Error while sending to project event to WebSocket client: '{}'".format(e)) finally: await websocket.close() if project.auto_close: diff --git a/gns3server/endpoints/controller/snapshots.py b/gns3server/endpoints/controller/snapshots.py index f3104066..c7c57725 100644 --- a/gns3server/endpoints/controller/snapshots.py +++ b/gns3server/endpoints/controller/snapshots.py @@ -19,7 +19,6 @@ API endpoints for snapshots. """ - import logging log = logging.getLogger() @@ -48,7 +47,7 @@ def dep_project(project_id: UUID): return project -@router.post("/", +@router.post("", status_code=status.HTTP_201_CREATED, response_model=schemas.Snapshot, responses=responses) @@ -61,7 +60,7 @@ async def create_snapshot(snapshot_data: schemas.SnapshotCreate, project: Projec return snapshot.__json__() -@router.get("/", +@router.get("", response_model=List[schemas.Snapshot], response_model_exclude_unset=True, responses=responses) diff --git a/gns3server/endpoints/controller/symbols.py b/gns3server/endpoints/controller/symbols.py index aa662f84..d6555143 100644 --- a/gns3server/endpoints/controller/symbols.py +++ b/gns3server/endpoints/controller/symbols.py @@ -35,7 +35,7 @@ log = logging.getLogger(__name__) router = APIRouter() -@router.get("/") +@router.get("") def get_symbols(): controller = Controller.instance() diff --git a/gns3server/endpoints/schemas/projects.py b/gns3server/endpoints/schemas/projects.py index 07eb6690..9fc6b771 100644 --- a/gns3server/endpoints/schemas/projects.py +++ b/gns3server/endpoints/schemas/projects.py @@ -94,9 +94,10 @@ class ProjectUpdate(ProjectBase): class Project(ProjectBase): + project_id: UUID name: Optional[str] = None - project_id = UUID status: Optional[ProjectStatus] = None + filename: Optional[str] = None class ProjectFile(BaseModel):