1
0
mirror of https://github.com/GNS3/gns3-server synced 2024-11-24 17:28:08 +00:00

Refactor WebSocket console code to work with FastAPI.

Fix endpoint routes.
This commit is contained in:
grossmj 2020-10-19 15:00:41 +10:30
parent 5341ccdbd6
commit bd8565b2b9
27 changed files with 338 additions and 370 deletions

View File

@ -18,8 +18,6 @@
import sys
import os
import stat
import logging
import aiohttp
import shutil
import asyncio
import tempfile
@ -27,7 +25,7 @@ import psutil
import platform
import re
from aiohttp.web import WebSocketResponse
from fastapi import WebSocketDisconnect
from gns3server.utils.interfaces import interfaces
from gns3server.compute.compute_error import ComputeError
from ..compute.port_manager import PortManager
@ -38,7 +36,7 @@ from ..ubridge.ubridge_error import UbridgeError
from .nios.nio_udp import NIOUDP
from .error import NodeError
import logging
log = logging.getLogger(__name__)
@ -414,7 +412,7 @@ class BaseNode:
await self.stop_wrap_console()
await self.start_wrap_console()
async def start_websocket_console(self, request):
async def start_websocket_console(self, websocket):
"""
Connect to console using Websocket.
@ -428,47 +426,45 @@ class BaseNode:
raise NodeError("Node {} console type is not telnet".format(self.name))
try:
(telnet_reader, telnet_writer) = await asyncio.open_connection(self._manager.port_manager.console_host, self.console)
(telnet_reader, telnet_writer) = await asyncio.open_connection(self._manager.port_manager.console_host,
self.console)
except ConnectionError as e:
raise NodeError("Cannot connect to node {} telnet server: {}".format(self.name, e))
log.info("Connected to Telnet server")
ws = WebSocketResponse()
await ws.prepare(request)
request.app['websockets'].add(ws)
log.info("New client has connected to console WebSocket")
await websocket.accept()
log.info(f"New client {websocket.client.host}:{websocket.client.port} has connected to compute"
f" console WebSocket")
async def ws_forward(telnet_writer):
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
telnet_writer.write(msg.data.encode())
await telnet_writer.drain()
elif msg.type == aiohttp.WSMsgType.BINARY:
await telnet_writer.write(msg.data)
await telnet_writer.drain()
elif msg.type == aiohttp.WSMsgType.ERROR:
log.debug("Websocket connection closed with exception {}".format(ws.exception()))
try:
while True:
data = await websocket.receive_text()
if data:
telnet_writer.write(data.encode())
await telnet_writer.drain()
except WebSocketDisconnect:
log.info(f"Client {websocket.client.host}:{websocket.client.port} has disconnected from compute"
f" console WebSocket")
async def telnet_forward(telnet_reader):
while not ws.closed and not telnet_reader.at_eof():
while not telnet_reader.at_eof():
data = await telnet_reader.read(1024)
if data:
await ws.send_bytes(data)
await websocket.send_bytes(data)
try:
# keep forwarding websocket data in both direction
await asyncio.wait([ws_forward(telnet_writer), telnet_forward(telnet_reader)], return_when=asyncio.FIRST_COMPLETED)
finally:
log.info("Client has disconnected from console WebSocket")
if not ws.closed:
await ws.close()
request.app['websockets'].discard(ws)
# keep forwarding WebSocket data in both direction
done, pending = await asyncio.wait([ws_forward(telnet_writer), telnet_forward(telnet_reader)],
return_when=asyncio.FIRST_COMPLETED)
for task in done:
if task.exception():
log.warning(f"Exception while forwarding WebSocket data to Telnet server {task.exception()}")
return ws
for task in pending:
task.cancel()
@property
def aux(self):

View File

@ -47,7 +47,7 @@ async def dep_node(project_id: UUID, node_id: UUID):
return node
@router.post("/",
@router.post("",
response_model=schemas.ATMSwitch,
status_code=status.HTTP_201_CREATED,
responses={409: {"model": schemas.ErrorMessage, "description": "Could not create ATM switch node"}})

View File

@ -48,7 +48,7 @@ def dep_node(project_id: UUID, node_id: UUID):
return node
@router.post("/",
@router.post("",
response_model=schemas.Cloud,
status_code=status.HTTP_201_CREATED,
responses={409: {"model": schemas.ErrorMessage, "description": "Could not create cloud node"}})

View File

@ -21,7 +21,7 @@ API endpoints for Docker nodes.
import os
from fastapi import APIRouter, Depends, Body, status
from fastapi import APIRouter, WebSocket, Depends, Body, status
from fastapi.encoders import jsonable_encoder
from fastapi.responses import StreamingResponse
from uuid import UUID
@ -47,7 +47,7 @@ def dep_node(project_id: UUID, node_id: UUID):
return node
@router.post("/",
@router.post("",
response_model=schemas.Docker,
status_code=status.HTTP_201_CREATED,
responses={409: {"model": schemas.ErrorMessage, "description": "Could not create Docker node"}})
@ -290,14 +290,6 @@ async def stop_capture(adapter_number: int, port_number: int, node: DockerVM = D
await node.stop_capture(adapter_number)
@router.post("/{node_id}/console/reset",
status_code=status.HTTP_204_NO_CONTENT,
responses=responses)
async def reset_console(node: DockerVM = Depends(dep_node)):
await node.reset_console()
@router.get("/{node_id}/adapters/{adapter_number}/ports/{port_number}/pcap",
responses=responses)
async def stream_pcap_file(adapter_number: int, port_number: int, node: DockerVM = Depends(dep_node)):
@ -310,18 +302,19 @@ async def stream_pcap_file(adapter_number: int, port_number: int, node: DockerVM
stream = Docker.instance().stream_pcap_file(nio, node.project.id)
return StreamingResponse(stream, media_type="application/vnd.tcpdump.pcap")
# @Route.get(
# r"/projects/{project_id}/docker/nodes/{node_id}/console/ws",
# description="WebSocket for console",
# parameters={
# "project_id": "Project UUID",
# "node_id": "Node UUID",
# })
# async def console_ws(request, response):
#
# docker_manager = Docker.instance()
# container = docker_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
# return await container.start_websocket_console(request)
@router.websocket("/{node_id}/console/ws")
async def console_ws(websocket: WebSocket, node: DockerVM = Depends(dep_node)):
"""
Console WebSocket.
"""
await node.start_websocket_console(websocket)
@router.post("/{node_id}/console/reset",
status_code=status.HTTP_204_NO_CONTENT,
responses=responses)
async def reset_console(node: DockerVM = Depends(dep_node)):
await node.reset_console()

View File

@ -22,7 +22,7 @@ API endpoints for Dynamips nodes.
import os
import sys
from fastapi import APIRouter, Depends, status
from fastapi import APIRouter, WebSocket, Depends, status
from fastapi.encoders import jsonable_encoder
from fastapi.responses import StreamingResponse
from typing import List
@ -56,7 +56,7 @@ def dep_node(project_id: UUID, node_id: UUID):
return node
@router.post("/",
@router.post("",
response_model=schemas.Dynamips,
status_code=status.HTTP_201_CREATED,
responses={409: {"model": schemas.ErrorMessage, "description": "Could not create Dynamips node"}})
@ -299,18 +299,13 @@ async def duplicate_router(destination_node_id: UUID, node: Router = Depends(dep
return new_node.__json__()
# @Route.get(
# r"/projects/{project_id}/dynamips/nodes/{node_id}/console/ws",
# description="WebSocket for console",
# parameters={
# "project_id": "Project UUID",
# "node_id": "Node UUID",
# })
# async def console_ws(request, response):
#
# dynamips_manager = Dynamips.instance()
# vm = dynamips_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
# return await vm.start_websocket_console(request)
@router.websocket("/{node_id}/console/ws")
async def console_ws(websocket: WebSocket, node: Router = Depends(dep_node)):
"""
Console WebSocket.
"""
await node.start_websocket_console(websocket)
@router.post("/{node_id}/console/reset",

View File

@ -47,7 +47,7 @@ def dep_node(project_id: UUID, node_id: UUID):
return node
@router.post("/",
@router.post("",
response_model=schemas.EthernetHub,
status_code=status.HTTP_201_CREATED,
responses={409: {"model": schemas.ErrorMessage, "description": "Could not create Ethernet hub node"}})

View File

@ -47,7 +47,7 @@ def dep_node(project_id: UUID, node_id: UUID):
return node
@router.post("/",
@router.post("",
response_model=schemas.EthernetSwitch,
status_code=status.HTTP_201_CREATED,
responses={409: {"model": schemas.ErrorMessage, "description": "Could not create Ethernet switch node"}})

View File

@ -47,7 +47,7 @@ def dep_node(project_id: UUID, node_id: UUID):
return node
@router.post("/",
@router.post("",
response_model=schemas.FrameRelaySwitch,
status_code=status.HTTP_201_CREATED,
responses={409: {"model": schemas.ErrorMessage, "description": "Could not create Frame Relay switch node"}})

View File

@ -21,7 +21,7 @@ API endpoints for IOU nodes.
import os
from fastapi import APIRouter, Depends, Body, status
from fastapi import APIRouter, WebSocket, Depends, Body, status
from fastapi.encoders import jsonable_encoder
from fastapi.responses import StreamingResponse
from typing import Union
@ -48,7 +48,7 @@ def dep_node(project_id: UUID, node_id: UUID):
return node
@router.post("/",
@router.post("",
response_model=schemas.IOU,
status_code=status.HTTP_201_CREATED,
responses={409: {"model": schemas.ErrorMessage, "description": "Could not create IOU node"}})
@ -275,23 +275,18 @@ async def stream_pcap_file(adapter_number: int, port_number: int, node: IOUVM =
return StreamingResponse(stream, media_type="application/vnd.tcpdump.pcap")
@router.websocket("/{node_id}/console/ws")
async def console_ws(websocket: WebSocket, node: IOUVM = Depends(dep_node)):
"""
Console WebSocket.
"""
await node.start_websocket_console(websocket)
@router.post("/{node_id}/console/reset",
status_code=status.HTTP_204_NO_CONTENT,
responses=responses)
async def reset_console(node: IOUVM = Depends(dep_node)):
await node.reset_console()
# @Route.get(
# r"/projects/{project_id}/iou/nodes/{node_id}/console/ws",
# description="WebSocket for console",
# parameters={
# "project_id": "Project UUID",
# "node_id": "Node UUID",
# })
# async def console_ws(request, response):
#
# iou_manager = IOU.instance()
# vm = iou_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
# return await vm.start_websocket_console(request)

View File

@ -48,7 +48,7 @@ def dep_node(project_id: UUID, node_id: UUID):
return node
@router.post("/",
@router.post("",
response_model=schemas.NAT,
status_code=status.HTTP_201_CREATED,
responses={409: {"model": schemas.ErrorMessage, "description": "Could not create NAT node"}})

View File

@ -19,11 +19,10 @@
API endpoints for compute notifications.
"""
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from websockets.exceptions import WebSocketException
from typing import List
import asyncio
from fastapi import APIRouter, WebSocket
from gns3server.compute.notification_manager import NotificationManager
from starlette.endpoints import WebSocketEndpoint
import logging
log = logging.getLogger(__name__)
@ -31,48 +30,63 @@ log = logging.getLogger(__name__)
router = APIRouter()
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
@router.websocket_route("/notifications/ws")
class ComputeWebSocketNotifications(WebSocketEndpoint):
"""
Receive compute notifications about the controller from WebSocket stream.
"""
async def on_connect(self, websocket: WebSocket) -> None:
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
log.info(f"New client {websocket.client.host}:{websocket.client.port} has connected to compute WebSocket")
self._notification_task = asyncio.ensure_future(self._stream_notifications(websocket))
def disconnect(self, websocket: WebSocket):
async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None:
self.active_connections.remove(websocket)
self._notification_task.cancel()
log.info(f"Client {websocket.client.host}:{websocket.client.port} has disconnected from controller WebSocket"
f" with close code {close_code}")
async def close_active_connections(self):
async def _stream_notifications(self, websocket: WebSocket) -> None:
for websocket in self.active_connections:
await websocket.close()
async def send_text(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@router.websocket("/notifications/ws")
async def compute_notifications(websocket: WebSocket):
log.info("Client has disconnected from compute WebSocket")
notifications = NotificationManager.instance()
await manager.connect(websocket)
try:
log.info("New client has connected to compute WebSocket")
with notifications.queue() as queue:
with NotificationManager.instance().queue() as queue:
while True:
notification = await queue.get_json(5)
await manager.send_text(notification, websocket)
except (WebSocketException, WebSocketDisconnect) as e:
log.info("Client has disconnected from compute WebSocket: {}".format(e))
finally:
await websocket.close()
manager.disconnect(websocket)
await websocket.send_text(notification)
if __name__ == '__main__':
import uvicorn
from fastapi import FastAPI
from starlette.responses import HTMLResponse
app = FastAPI()
app.include_router(router)
html = """
<!DOCTYPE html>
<html>
<body>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/notifications/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
</script>
</body>
</html>
"""
@app.get("/")
async def get() -> HTMLResponse:
return HTMLResponse(html)
uvicorn.run(app, host="localhost", port=8000)

View File

@ -19,8 +19,6 @@
API endpoints for projects.
"""
import shutil
import aiohttp
import os
import logging
@ -108,6 +106,7 @@ async def close_project(project: Project = Depends(dep_project)):
Close a project on the compute.
"""
# FIXME
if _notifications_listening.setdefault(project.id, 0) <= 1:
await project.close()
ProjectManager.instance().remove_project(project.id)
@ -234,6 +233,6 @@ async def write_file(file_path: str, request: Request, project: Project = Depend
pass # FIXME
except FileNotFoundError:
raise aiohttp.web.HTTPNotFound()
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
except PermissionError:
raise aiohttp.web.HTTPForbidden()
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)

View File

@ -22,14 +22,13 @@ API endpoints for Qemu nodes.
import os
import sys
from fastapi import APIRouter, Depends, Body, status
from fastapi import APIRouter, WebSocket, Depends, Body, status
from fastapi.encoders import jsonable_encoder
from fastapi.responses import StreamingResponse
from uuid import UUID
from gns3server.endpoints import schemas
from gns3server.compute.project_manager import ProjectManager
from gns3server.compute.compute_error import ComputeError
from gns3server.compute.qemu import Qemu
from gns3server.compute.qemu.qemu_vm import QemuVM
@ -50,7 +49,7 @@ def dep_node(project_id: UUID, node_id: UUID):
return node
@router.post("/",
@router.post("",
response_model=schemas.Qemu,
status_code=status.HTTP_201_CREATED,
responses={409: {"model": schemas.ErrorMessage, "description": "Could not create Qemu node"}})
@ -281,14 +280,6 @@ async def stop_capture(adapter_number: int, port_number: int, node: QemuVM = Dep
await node.stop_capture(adapter_number)
@router.post("/{node_id}/console/reset",
status_code=status.HTTP_204_NO_CONTENT,
responses=responses)
async def reset_console(node: QemuVM = Depends(dep_node)):
await node.reset_console()
@router.get("/{node_id}/adapters/{adapter_number}/ports/{port_number}/pcap",
responses=responses)
async def stream_pcap_file(adapter_number: int, port_number: int, node: QemuVM = Depends(dep_node)):
@ -302,16 +293,18 @@ async def stream_pcap_file(adapter_number: int, port_number: int, node: QemuVM =
return StreamingResponse(stream, media_type="application/vnd.tcpdump.pcap")
# @Route.get(
# r"/projects/{project_id}/qemu/nodes/{node_id}/console/ws",
# description="WebSocket for console",
# parameters={
# "project_id": "Project UUID",
# "node_id": "Node UUID",
# })
# async def console_ws(request, response):
#
# qemu_manager = Qemu.instance()
# vm = qemu_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
# return await vm.start_websocket_console(request)
@router.websocket("/{node_id}/console/ws")
async def console_ws(websocket: WebSocket, node: QemuVM = Depends(dep_node)):
"""
Console WebSocket.
"""
await node.start_websocket_console(websocket)
@router.post("/{node_id}/console/reset",
status_code=status.HTTP_204_NO_CONTENT,
responses=responses)
async def reset_console(node: QemuVM = Depends(dep_node)):
await node.reset_console()

View File

@ -21,7 +21,7 @@ API endpoints for VirtualBox nodes.
import os
from fastapi import APIRouter, Depends, status
from fastapi import APIRouter, WebSocket, Depends, status
from fastapi.encoders import jsonable_encoder
from fastapi.responses import StreamingResponse
from uuid import UUID
@ -49,7 +49,7 @@ def dep_node(project_id: UUID, node_id: UUID):
return node
@router.post("/",
@router.post("",
response_model=schemas.VirtualBox,
status_code=status.HTTP_201_CREATED,
responses={409: {"model": schemas.ErrorMessage, "description": "Could not create VirtualBox node"}})
@ -288,14 +288,6 @@ async def stop_capture(adapter_number: int, port_number: int, node: VirtualBoxVM
await node.stop_capture(adapter_number)
@router.post("/{node_id}/console/reset",
status_code=status.HTTP_204_NO_CONTENT,
responses=responses)
async def reset_console(node: VirtualBoxVM = Depends(dep_node)):
await node.reset_console()
@router.get("/{node_id}/adapters/{adapter_number}/ports/{port_number}/pcap",
responses=responses)
async def stream_pcap_file(adapter_number: int, port_number: int, node: VirtualBoxVM = Depends(dep_node)):
@ -309,15 +301,18 @@ async def stream_pcap_file(adapter_number: int, port_number: int, node: VirtualB
return StreamingResponse(stream, media_type="application/vnd.tcpdump.pcap")
# @Route.get(
# r"/projects/{project_id}/virtualbox/nodes/{node_id}/console/ws",
# description="WebSocket for console",
# parameters={
# "project_id": "Project UUID",
# "node_id": "Node UUID",
# })
# async def console_ws(request, response):
#
# virtualbox_manager = VirtualBox.instance()
# vm = virtualbox_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
# return await vm.start_websocket_console(request)
@router.websocket("/{node_id}/console/ws")
async def console_ws(websocket: WebSocket, node: VirtualBoxVM = Depends(dep_node)):
"""
Console WebSocket.
"""
await node.start_websocket_console(websocket)
@router.post("/{node_id}/console/reset",
status_code=status.HTTP_204_NO_CONTENT,
responses=responses)
async def reset_console(node: VirtualBoxVM = Depends(dep_node)):
await node.reset_console()

View File

@ -21,7 +21,7 @@ API endpoints for VMware nodes.
import os
from fastapi import APIRouter, Depends, status
from fastapi import APIRouter, WebSocket, Depends, status
from fastapi.encoders import jsonable_encoder
from fastapi.responses import StreamingResponse
from uuid import UUID
@ -48,7 +48,7 @@ def dep_node(project_id: UUID, node_id: UUID):
return node
@router.post("/",
@router.post("",
response_model=schemas.VMware,
status_code=status.HTTP_201_CREATED,
responses={409: {"model": schemas.ErrorMessage, "description": "Could not create VMware node"}})
@ -253,14 +253,6 @@ async def stop_capture(adapter_number: int, port_number: int, node: VMwareVM = D
await node.stop_capture(adapter_number)
@router.post("/{node_id}/console/reset",
status_code=status.HTTP_204_NO_CONTENT,
responses=responses)
async def reset_console(node: VMwareVM = Depends(dep_node)):
await node.reset_console()
@router.get("/{node_id}/adapters/{adapter_number}/ports/{port_number}/pcap",
responses=responses)
async def stream_pcap_file(adapter_number: int, port_number: int, node: VMwareVM = Depends(dep_node)):
@ -289,16 +281,18 @@ def allocate_vmnet(node: VMwareVM = Depends(dep_node)) -> dict:
return {"vmnet": vmnet}
# @Route.get(
# r"/projects/{project_id}/vmware/nodes/{node_id}/console/ws",
# description="WebSocket for console",
# parameters={
# "project_id": "Project UUID",
# "node_id": "Node UUID",
# })
# async def console_ws(request, response):
#
# vmware_manager = VMware.instance()
# vm = vmware_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
# return await vm.start_websocket_console(request)
#
@router.websocket("/{node_id}/console/ws")
async def console_ws(websocket: WebSocket, node: VMwareVM = Depends(dep_node)):
"""
Console WebSocket.
"""
await node.start_websocket_console(websocket)
@router.post("/{node_id}/console/reset",
status_code=status.HTTP_204_NO_CONTENT,
responses=responses)
async def reset_console(node: VMwareVM = Depends(dep_node)):
await node.reset_console()

View File

@ -21,14 +21,13 @@ API endpoints for VPCS nodes.
import os
from fastapi import APIRouter, Depends, Body, status
from fastapi import APIRouter, WebSocket, Depends, Body, status
from fastapi.encoders import jsonable_encoder
from fastapi.responses import StreamingResponse
from uuid import UUID
from gns3server.endpoints import schemas
from gns3server.compute.vpcs import VPCS
from gns3server.compute.project_manager import ProjectManager
from gns3server.compute.vpcs.vpcs_vm import VPCSVM
router = APIRouter()
@ -48,7 +47,7 @@ def dep_node(project_id: UUID, node_id: UUID):
return node
@router.post("/",
@router.post("",
response_model=schemas.VPCS,
status_code=status.HTTP_201_CREATED,
responses={409: {"model": schemas.ErrorMessage, "description": "Could not create VMware node"}})
@ -258,15 +257,10 @@ async def stream_pcap_file(adapter_number: int, port_number: int, node: VPCSVM =
return StreamingResponse(stream, media_type="application/vnd.tcpdump.pcap")
# @Route.get(
# r"/projects/{project_id}/vpcs/nodes/{node_id}/console/ws",
# description="WebSocket for console",
# parameters={
# "project_id": "Project UUID",
# "node_id": "Node UUID",
# })
# async def console_ws(request, response):
#
# vpcs_manager = VPCS.instance()
# vm = vpcs_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
# return await vm.start_websocket_console(request)
@router.websocket("/{node_id}/console/ws")
async def console_ws(websocket: WebSocket, node: VPCSVM = Depends(dep_node)):
"""
Console WebSocket.
"""
await node.start_websocket_console(websocket)

View File

@ -25,7 +25,7 @@ from typing import Optional
router = APIRouter()
@router.get("/")
@router.get("")
async def get_appliances(update: Optional[bool] = None, symbol_theme: Optional[str] = "Classic"):
"""
Return all appliances known by the controller.

View File

@ -35,7 +35,7 @@ responses = {
}
@router.post("/",
@router.post("",
status_code=status.HTTP_201_CREATED,
response_model=schemas.Compute,
responses={404: {"model": ErrorMessage, "description": "Could not connect to compute"},
@ -64,7 +64,7 @@ def get_compute(compute_id: Union[str, UUID]):
return compute.__json__()
@router.get("/",
@router.get("",
response_model=List[schemas.Compute],
response_model_exclude_unset=True)
async def get_computes():

View File

@ -35,7 +35,7 @@ responses = {
}
@router.get("/",
@router.get("",
response_model=List[Drawing],
response_model_exclude_unset=True)
async def get_drawings(project_id: UUID):
@ -47,7 +47,7 @@ async def get_drawings(project_id: UUID):
return [v.__json__() for v in project.drawings.values()]
@router.post("/",
@router.post("",
status_code=status.HTTP_201_CREATED,
response_model=Drawing,
responses=responses)

View File

@ -48,8 +48,7 @@ async def get_vms(engine: str):
return vms
@router.get("/",
response_model=GNS3VM)
@router.get("", response_model=GNS3VM)
async def get_gns3vm_settings():
"""
Return the GNS3 VM settings.
@ -58,9 +57,7 @@ async def get_gns3vm_settings():
return Controller.instance().gns3vm.__json__()
@router.put("/",
response_model=GNS3VM,
response_model_exclude_unset=True)
@router.put("", response_model=GNS3VM, response_model_exclude_unset=True)
async def update_gns3vm_settings(gns3vm_data: GNS3VM):
"""
Update the GNS3 VM settings.

View File

@ -48,7 +48,7 @@ async def dep_link(project_id: UUID, link_id: UUID):
return link
@router.get("/",
@router.get("",
response_model=List[schemas.Link],
response_model_exclude_unset=True)
async def get_links(project_id: UUID):
@ -60,7 +60,7 @@ async def get_links(project_id: UUID):
return [v.__json__() for v in project.links.values()]
@router.post("/",
@router.post("",
status_code=status.HTTP_201_CREATED,
response_model=schemas.Link,
responses={404: {"model": ErrorMessage, "description": "Could not find project"},
@ -214,4 +214,4 @@ async def reset_link(link: Link = Depends(dep_link)):
# break
# await proxied_response.write(data)
#
# #return StreamingResponse(file_like, media_type="video/mp4"))
# #return StreamingResponse(file_like, media_type="video/mp4"))

View File

@ -19,9 +19,10 @@
API endpoints for nodes.
"""
import aiohttp
import asyncio
from fastapi import APIRouter, Depends, Request, Response, status
from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect, Request, Response, status
from fastapi.encoders import jsonable_encoder
from fastapi.routing import APIRoute
from typing import List, Callable
@ -35,6 +36,9 @@ from gns3server.controller.controller_error import ControllerForbiddenError
from gns3server.endpoints.schemas.common import ErrorMessage
from gns3server.endpoints import schemas
import logging
log = logging.getLogger(__name__)
node_locks = {}
@ -97,7 +101,7 @@ async def dep_node(node_id: UUID, project: Project = Depends(dep_project)):
return node
@router.post("/",
@router.post("",
status_code=status.HTTP_201_CREATED,
response_model=schemas.Node,
responses={404: {"model": ErrorMessage, "description": "Could not find project"},
@ -117,7 +121,7 @@ async def create_node(node_data: schemas.Node, project: Project = Depends(dep_pr
return node.__json__()
@router.get("/",
@router.get("",
response_model=List[schemas.Node],
response_model_exclude_unset=True)
async def get_nodes(project: Project = Depends(dep_project)):
@ -367,59 +371,47 @@ async def post_file(file_path: str, request: Request, node: Node = Depends(dep_n
raw=True)
# @Route.get(
# r"/projects/{project_id}/nodes/{node_id}/console/ws",
# parameters={
# "project_id": "Project UUID",
# "node_id": "Node UUID"
# },
# description="Connect to WebSocket console",
# status_codes={
# 200: "File returned",
# 403: "Permission denied",
# 404: "The file doesn't exist"
# })
# async def ws_console(request, response):
#
# project = await Controller.instance().get_loaded_project(request.match_info["project_id"])
# node = project.get_node(request.match_info["node_id"])
# compute = node.compute
# ws = aiohttp.web.WebSocketResponse()
# await ws.prepare(request)
# request.app['websockets'].add(ws)
#
# ws_console_compute_url = "ws://{compute_host}:{compute_port}/v2/compute/projects/{project_id}/{node_type}/nodes/{node_id}/console/ws".format(compute_host=compute.host,
# compute_port=compute.port,
# project_id=project.id,
# node_type=node.node_type,
# node_id=node.id)
#
# async def ws_forward(ws_client):
# async for msg in ws:
# if msg.type == aiohttp.WSMsgType.TEXT:
# await ws_client.send_str(msg.data)
# elif msg.type == aiohttp.WSMsgType.BINARY:
# await ws_client.send_bytes(msg.data)
# elif msg.type == aiohttp.WSMsgType.ERROR:
# break
#
# try:
# async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=None, force_close=True)) as session:
# async with session.ws_connect(ws_console_compute_url) as ws_client:
# asyncio.ensure_future(ws_forward(ws_client))
# async for msg in ws_client:
# if msg.type == aiohttp.WSMsgType.TEXT:
# await ws.send_str(msg.data)
# elif msg.type == aiohttp.WSMsgType.BINARY:
# await ws.send_bytes(msg.data)
# elif msg.type == aiohttp.WSMsgType.ERROR:
# break
# finally:
# if not ws.closed:
# await ws.close()
# request.app['websockets'].discard(ws)
#
# return ws
@router.websocket("/{node_id}/console/ws")
async def ws_console(websocket: WebSocket, node: Node = Depends(dep_node)):
"""
WebSocket console.
"""
compute = node.compute
await websocket.accept()
log.info(f"New client {websocket.client.host}:{websocket.client.port} has connected to controller console WebSocket")
ws_console_compute_url = f"ws://{compute.host}:{compute.port}/v2/compute/projects/" \
f"{node.project.id}/{node.node_type}/nodes/{node.id}/console/ws"
async def ws_receive(ws_console_compute):
"""
Receive WebSocket data from client and forward to compute console WebSocket.
"""
try:
while True:
data = await websocket.receive_text()
if data:
await ws_console_compute.send_str(data)
except WebSocketDisconnect:
await ws_console_compute.close()
log.info(f"Client {websocket.client.host}:{websocket.client.port} has disconnected from controller"
f" console WebSocket")
try:
# receive WebSocket data from compute console WebSocket and forward to client.
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=None, force_close=True)) as session:
async with session.ws_connect(ws_console_compute_url) as ws_console_compute:
asyncio.ensure_future(ws_receive(ws_console_compute))
async for msg in ws_console_compute:
if msg.type == aiohttp.WSMsgType.TEXT:
await websocket.send_text(msg.data)
elif msg.type == aiohttp.WSMsgType.BINARY:
await websocket.send_bytes(msg.data)
elif msg.type == aiohttp.WSMsgType.ERROR:
break
except aiohttp.client_exceptions.ClientResponseError as e:
log.error(f"Client response error received when forwarding to compute console WebSocket: {e}")
@router.post("/console/reset",

View File

@ -19,51 +19,58 @@
API endpoints for controller notifications.
"""
import asyncio
from fastapi import APIRouter, WebSocket
from fastapi.responses import StreamingResponse
from starlette.endpoints import WebSocketEndpoint
from fastapi import APIRouter, Request, Response, WebSocket, WebSocketDisconnect
from websockets.exceptions import WebSocketException
from gns3server.controller import Controller
router = APIRouter()
import logging
log = logging.getLogger(__name__)
# @router.get("/")
# async def notification(request: Request):
# """
# Receive notifications about the controller from HTTP
# """
#
# controller = Controller.instance()
#
# await response.prepare(request)
# response = Response(content, media_type="application/json")
#
# with controller.notification.controller_queue() as queue:
# while True:
# msg = await queue.get_json(5)
# await response.write(("{}\n".format(msg)).encode("utf-8"))
#
#
# await response(scope, receive, send)
router = APIRouter()
@router.websocket("/ws")
async def notification_ws(websocket: WebSocket):
@router.get("")
async def http_notification():
"""
Receive notifications about the controller from a Websocket
Receive controller notifications about the controller from HTTP stream.
"""
controller = Controller.instance()
await websocket.accept()
log.info("New client has connected to controller WebSocket")
try:
with controller.notification.controller_queue() as queue:
async def event_stream():
with Controller.instance().notification.controller_queue() as queue:
while True:
msg = await queue.get_json(5)
yield ("{}\n".format(msg)).encode("utf-8")
return StreamingResponse(event_stream(), media_type="application/json")
@router.websocket_route("/ws")
class ControllerWebSocketNotifications(WebSocketEndpoint):
"""
Receive controller notifications about the controller from WebSocket stream.
"""
async def on_connect(self, websocket: WebSocket) -> None:
await websocket.accept()
log.info(f"New client {websocket.client.host}:{websocket.client.port} has connected to controller WebSocket")
self._notification_task = asyncio.ensure_future(self._stream_notifications(websocket=websocket))
async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None:
self._notification_task.cancel()
log.info(f"Client {websocket.client.host}:{websocket.client.port} has disconnected from controller WebSocket"
f" with close code {close_code}")
async def _stream_notifications(self, websocket: WebSocket) -> None:
with Controller.instance().notifications.queue() as queue:
while True:
notification = await queue.get_json(5)
await websocket.send_text(notification)
except (WebSocketException, WebSocketDisconnect):
log.info("Client has disconnected from controller WebSocket")
await websocket.close()

View File

@ -32,7 +32,7 @@ log = logging.getLogger()
from fastapi import APIRouter, Depends, Request, Body, HTTPException, status, WebSocket, WebSocketDisconnect
from fastapi.encoders import jsonable_encoder
from fastapi.responses import StreamingResponse, FileResponse
from websockets.exceptions import WebSocketException
from websockets.exceptions import ConnectionClosed, WebSocketException
from typing import List
from uuid import UUID
@ -66,7 +66,19 @@ def dep_project(project_id: UUID):
CHUNK_SIZE = 1024 * 8 # 8KB
@router.post("/",
@router.get("",
response_model=List[schemas.Project],
response_model_exclude_unset=True)
def get_projects():
"""
Return all projects.
"""
controller = Controller.instance()
return [p.__json__() for p in controller.projects.values()]
@router.post("",
status_code=status.HTTP_201_CREATED,
response_model=schemas.Project,
response_model_exclude_unset=True,
@ -78,22 +90,9 @@ async def create_project(project_data: schemas.ProjectCreate):
controller = Controller.instance()
project = await controller.add_project(**jsonable_encoder(project_data, exclude_unset=True))
print(project.__json__()["variables"])
return project.__json__()
@router.get("/",
response_model=List[schemas.Project],
response_model_exclude_unset=True)
def get_projects():
"""
Return all projects.
"""
controller = Controller.instance()
return [p.__json__() for p in controller.projects.values()]
@router.get("/{project_id}",
response_model=schemas.Project,
responses=responses)
@ -193,52 +192,57 @@ async def load_project(path: str = Body(..., embed=True)):
return project.__json__()
# @router.get("/projects/{project_id}/notifications",
# summary="Receive notifications about projects",
# responses={404: {"model": ErrorMessage, "description": "Could not find project"}})
# async def notification(project_id: UUID):
#
# controller = Controller.instance()
# project = controller.get_project(str(project_id))
# #response.content_type = "application/json"
# #response.set_status(200)
# #response.enable_chunked_encoding()
# #await response.prepare(request)
# log.info("New client has connected to the notification stream for project ID '{}' (HTTP long-polling method)".format(project.id))
#
# try:
# with controller.notification.project_queue(project.id) as queue:
# while True:
# msg = await queue.get_json(5)
# await response.write(("{}\n".format(msg)).encode("utf-8"))
# finally:
# log.info("Client has disconnected from notification for project ID '{}' (HTTP long-polling method)".format(project.id))
# if project.auto_close:
# # To avoid trouble with client connecting disconnecting we sleep few seconds before checking
# # if someone else is not connected
# await asyncio.sleep(5)
# if not controller.notification.project_has_listeners(project.id):
# log.info("Project '{}' is automatically closing due to no client listening".format(project.id))
# await project.close()
@router.get("/{project_id}/notifications")
async def notification(project_id: UUID):
"""
Receive project notifications about the controller from HTTP stream.
"""
controller = Controller.instance()
project = controller.get_project(str(project_id))
log.info("New client has connected to the notification stream for project ID '{}' (HTTP steam method)".format(project.id))
async def event_stream():
try:
with controller.notification.project_queue(project.id) as queue:
while True:
msg = await queue.get_json(5)
yield ("{}\n".format(msg)).encode("utf-8")
finally:
log.info("Client has disconnected from notification for project ID '{}' (HTTP stream method)".format(project.id))
if project.auto_close:
# To avoid trouble with client connecting disconnecting we sleep few seconds before checking
# if someone else is not connected
await asyncio.sleep(5)
if not controller.notification.project_has_listeners(project.id):
log.info("Project '{}' is automatically closing due to no client listening".format(project.id))
await project.close()
return StreamingResponse(event_stream(), media_type="application/json")
@router.websocket("/{project_id}/notifications/ws")
async def notification_ws(project_id: UUID, websocket: WebSocket):
"""
Receive project notifications about the controller from WebSocket.
"""
controller = Controller.instance()
project = controller.get_project(str(project_id))
await websocket.accept()
#request.app['websockets'].add(ws)
#asyncio.ensure_future(process_websocket(ws))
log.info("New client has connected to the notification stream for project ID '{}' (WebSocket method)".format(project.id))
try:
with controller.notification.project_queue(project.id) as queue:
while True:
notification = await queue.get_json(5)
await websocket.send_text(notification)
except (WebSocketException, WebSocketDisconnect):
except (ConnectionClosed, WebSocketDisconnect):
log.info("Client has disconnected from notification stream for project ID '{}' (WebSocket method)".format(project.id))
except WebSocketException as e:
log.warning("Error while sending to project event to WebSocket client: '{}'".format(e))
finally:
await websocket.close()
if project.auto_close:

View File

@ -19,7 +19,6 @@
API endpoints for snapshots.
"""
import logging
log = logging.getLogger()
@ -48,7 +47,7 @@ def dep_project(project_id: UUID):
return project
@router.post("/",
@router.post("",
status_code=status.HTTP_201_CREATED,
response_model=schemas.Snapshot,
responses=responses)
@ -61,7 +60,7 @@ async def create_snapshot(snapshot_data: schemas.SnapshotCreate, project: Projec
return snapshot.__json__()
@router.get("/",
@router.get("",
response_model=List[schemas.Snapshot],
response_model_exclude_unset=True,
responses=responses)

View File

@ -35,7 +35,7 @@ log = logging.getLogger(__name__)
router = APIRouter()
@router.get("/")
@router.get("")
def get_symbols():
controller = Controller.instance()

View File

@ -94,9 +94,10 @@ class ProjectUpdate(ProjectBase):
class Project(ProjectBase):
project_id: UUID
name: Optional[str] = None
project_id = UUID
status: Optional[ProjectStatus] = None
filename: Optional[str] = None
class ProjectFile(BaseModel):