Clean files and catch file path escape. Ref #1894

pull/1906/head
grossmj 3 years ago
parent 09ac7fd7fb
commit f3d81fa450

@ -69,13 +69,15 @@ async def download_dynamips_image(filename: str) -> FileResponse:
Download a Dynamips IOS image.
"""
dynamips_manager = Dynamips.instance()
filename = urllib.parse.unquote(filename)
image_path = dynamips_manager.get_abs_image_path(filename)
if filename[0] == ".":
# Raise error if user try to escape
if filename[0] == "." or os.path.sep in filename:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
dynamips_manager = Dynamips.instance()
image_path = dynamips_manager.get_abs_image_path(filename)
if not os.path.exists(image_path):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
@ -108,13 +110,14 @@ async def download_iou_image(filename: str) -> FileResponse:
Download an IOU image.
"""
iou_manager = IOU.instance()
filename = urllib.parse.unquote(filename)
image_path = iou_manager.get_abs_image_path(filename)
if filename[0] == ".":
# Raise error if user try to escape
if filename[0] == "." or os.path.sep in filename:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
iou_manager = IOU.instance()
image_path = iou_manager.get_abs_image_path(filename)
if not os.path.exists(image_path):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
@ -138,13 +141,13 @@ async def upload_qemu_image(filename: str, request: Request) -> None:
@router.get("/qemu/images/{filename:path}")
async def download_qemu_image(filename: str) -> FileResponse:
qemu_manager = Qemu.instance()
filename = urllib.parse.unquote(filename)
# Raise error if user try to escape
if filename[0] == ".":
if filename[0] == "." or os.path.sep in filename:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
qemu_manager = Qemu.instance()
image_path = qemu_manager.get_abs_image_path(filename)
if not os.path.exists(image_path):

@ -32,6 +32,7 @@ from uuid import UUID
from gns3server.compute.project_manager import ProjectManager
from gns3server.compute.project import Project
from gns3server.utils.path import is_safe_path
from gns3server import schemas
@ -200,7 +201,7 @@ async def get_compute_project_file(file_path: str, project: Project = Depends(de
path = os.path.normpath(file_path)
# Raise error if user try to escape
if path[0] == ".":
if not is_safe_path(path, project.path):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
path = os.path.join(project.path, path)
@ -216,7 +217,7 @@ async def write_compute_project_file(file_path: str, request: Request, project:
path = os.path.normpath(file_path)
# Raise error if user try to escape
if path[0] == ".":
if not is_safe_path(path, project.path):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
path = os.path.join(project.path, path)

@ -305,11 +305,11 @@ async def get_file(file_path: str, node: Node = Depends(dep_node)) -> Response:
path = f"/project-files/{node_type}/{node.id}/{path}"
res = await node.compute.http_query("GET", f"/projects/{node.project.id}/files{path}", timeout=None, raw=True)
return Response(res.body, media_type="application/octet-stream")
return Response(res.body, media_type="application/octet-stream", status_code=res.status)
@router.post("/{node_id}/files/{file_path:path}", status_code=status.HTTP_201_CREATED)
async def post_file(file_path: str, request: Request, node: Node = Depends(dep_node)) -> dict:
async def post_file(file_path: str, request: Request, node: Node = Depends(dep_node)):
"""
Write a file in the node directory.
"""
@ -324,8 +324,8 @@ async def post_file(file_path: str, request: Request, node: Node = Depends(dep_n
path = f"/project-files/{node_type}/{node.id}/{path}"
data = await request.body() # FIXME: are we handling timeout or large files correctly?
await node.compute.http_query("POST", f"/projects/{node.project.id}/files{path}", data=data, timeout=None, raw=True)
# FIXME: response with correct status code (from compute)
@router.websocket("/{node_id}/console/ws")

@ -44,6 +44,7 @@ from gns3server.controller.controller_error import ControllerError, ControllerFo
from gns3server.controller.import_project import import_project as import_controller_project
from gns3server.controller.export_project import export_project as export_controller_project
from gns3server.utils.asyncio import aiozipstream
from gns3server.utils.path import is_safe_path
from gns3server.config import Config
responses = {404: {"model": schemas.ErrorMessage, "description": "Could not find project"}}
@ -371,7 +372,7 @@ async def get_file(file_path: str, project: Project = Depends(dep_project)) -> F
path = os.path.normpath(file_path).strip("/")
# Raise error if user try to escape
if path[0] == ".":
if not is_safe_path(path, project.path):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
path = os.path.join(project.path, path)
@ -390,7 +391,7 @@ async def write_file(file_path: str, request: Request, project: Project = Depend
path = os.path.normpath(file_path).strip("/")
# Raise error if user try to escape
if path[0] == ".":
if not is_safe_path(path, project.path):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
path = os.path.join(project.path, path)

@ -1,468 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import aiohttp.web
from gns3server.web.route import Route
from gns3server.schemas.nio import NIO_SCHEMA
from gns3server.compute.iou import IOU
from gns3server.schemas.node import (
NODE_CAPTURE_SCHEMA,
NODE_LIST_IMAGES_SCHEMA,
)
from gns3server.schemas.iou import (
IOU_CREATE_SCHEMA,
IOU_START_SCHEMA,
IOU_OBJECT_SCHEMA
)
class IOUHandler:
"""
API entry points for IOU.
"""
@Route.post(
r"/projects/{project_id}/iou/nodes",
parameters={
"project_id": "Project UUID"
},
status_codes={
201: "Instance created",
400: "Invalid request",
409: "Conflict"
},
description="Create a new IOU instance",
input=IOU_CREATE_SCHEMA,
output=IOU_OBJECT_SCHEMA)
async def create(request, response):
iou = IOU.instance()
vm = await iou.create_node(request.json.pop("name"),
request.match_info["project_id"],
request.json.get("node_id"),
application_id=request.json.get("application_id"),
path=request.json.get("path"),
console=request.json.get("console"),
console_type=request.json.get("console_type", "telnet"))
for name, value in request.json.items():
if hasattr(vm, name) and getattr(vm, name) != value:
if name == "application_id":
continue # we must ignore this to avoid overwriting the application_id allocated by the controller
if name == "startup_config_content" and (vm.startup_config_content and len(vm.startup_config_content) > 0):
continue
if name == "private_config_content" and (vm.private_config_content and len(vm.private_config_content) > 0):
continue
if request.json.get("use_default_iou_values") and (name == "ram" or name == "nvram"):
continue
setattr(vm, name, value)
response.set_status(201)
response.json(vm)
@Route.get(
r"/projects/{project_id}/iou/nodes/{node_id}",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID"
},
status_codes={
200: "Success",
400: "Invalid request",
404: "Instance doesn't exist"
},
description="Get an IOU instance",
output=IOU_OBJECT_SCHEMA)
def show(request, response):
iou_manager = IOU.instance()
vm = iou_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
response.json(vm)
@Route.put(
r"/projects/{project_id}/iou/nodes/{node_id}",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID"
},
status_codes={
200: "Instance updated",
400: "Invalid request",
404: "Instance doesn't exist",
409: "Conflict"
},
description="Update an IOU instance",
input=IOU_OBJECT_SCHEMA,
output=IOU_OBJECT_SCHEMA)
async def update(request, response):
iou_manager = IOU.instance()
vm = iou_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
for name, value in request.json.items():
if hasattr(vm, name) and getattr(vm, name) != value:
if name == "application_id":
continue # we must ignore this to avoid overwriting the application_id allocated by the IOU manager
setattr(vm, name, value)
if vm.use_default_iou_values:
# update the default IOU values in case the image or use_default_iou_values have changed
# this is important to have the correct NVRAM amount in order to correctly push the configs to the NVRAM
await vm.update_default_iou_values()
vm.updated()
response.json(vm)
@Route.delete(
r"/projects/{project_id}/iou/nodes/{node_id}",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID"
},
status_codes={
204: "Instance deleted",
400: "Invalid request",
404: "Instance doesn't exist"
},
description="Delete an IOU instance")
async def delete(request, response):
await IOU.instance().delete_node(request.match_info["node_id"])
response.set_status(204)
@Route.post(
r"/projects/{project_id}/iou/nodes/{node_id}/duplicate",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID"
},
status_codes={
201: "Instance duplicated",
404: "Instance doesn't exist"
},
description="Duplicate a IOU instance")
async def duplicate(request, response):
new_node = await IOU.instance().duplicate_node(
request.match_info["node_id"],
request.json["destination_node_id"]
)
response.set_status(201)
response.json(new_node)
@Route.post(
r"/projects/{project_id}/iou/nodes/{node_id}/start",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID"
},
status_codes={
200: "Instance started",
400: "Invalid request",
404: "Instance doesn't exist"
},
input=IOU_START_SCHEMA,
output=IOU_OBJECT_SCHEMA,
description="Start an IOU instance")
async def start(request, response):
iou_manager = IOU.instance()
vm = iou_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
for name, value in request.json.items():
if hasattr(vm, name) and getattr(vm, name) != value:
setattr(vm, name, value)
await vm.start()
response.json(vm)
@Route.post(
r"/projects/{project_id}/iou/nodes/{node_id}/stop",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID"
},
status_codes={
204: "Instance stopped",
400: "Invalid request",
404: "Instance doesn't exist"
},
description="Stop an IOU instance")
async def stop(request, response):
iou_manager = IOU.instance()
vm = iou_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
await vm.stop()
response.set_status(204)
@Route.post(
r"/projects/{project_id}/iou/nodes/{node_id}/suspend",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID"
},
status_codes={
204: "Instance suspended",
400: "Invalid request",
404: "Instance doesn't exist"
},
description="Suspend an IOU instance (does nothing)")
def suspend(request, response):
iou_manager = IOU.instance()
iou_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
response.set_status(204)
@Route.post(
r"/projects/{project_id}/iou/nodes/{node_id}/reload",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID",
},
status_codes={
204: "Instance reloaded",
400: "Invalid request",
404: "Instance doesn't exist"
},
description="Reload an IOU instance")
async def reload(request, response):
iou_manager = IOU.instance()
vm = iou_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
await vm.reload()
response.set_status(204)
@Route.post(
r"/projects/{project_id}/iou/nodes/{node_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID",
"adapter_number": "Network adapter where the nio is located",
"port_number": "Port where the nio should be added"
},
status_codes={
201: "NIO created",
400: "Invalid request",
404: "Instance doesn't exist"
},
description="Add a NIO to a IOU instance",
input=NIO_SCHEMA,
output=NIO_SCHEMA)
async def create_nio(request, response):
iou_manager = IOU.instance()
vm = iou_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
nio_type = request.json["type"]
if nio_type not in ("nio_udp", "nio_tap", "nio_ethernet", "nio_generic_ethernet"):
raise aiohttp.web.HTTPConflict(text="NIO of type {} is not supported".format(nio_type))
nio = iou_manager.create_nio(request.json)
await vm.adapter_add_nio_binding(int(request.match_info["adapter_number"]), int(request.match_info["port_number"]), nio)
response.set_status(201)
response.json(nio)
@Route.put(
r"/projects/{project_id}/iou/nodes/{node_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID",
"adapter_number": "Network adapter where the nio is located",
"port_number": "Port where the nio should be added"
},
status_codes={
201: "NIO updated",
400: "Invalid request",
404: "Instance doesn't exist"
},
description="Update a NIO on an IOU instance",
input=NIO_SCHEMA,
output=NIO_SCHEMA)
async def update_nio(request, response):
iou_manager = IOU.instance()
vm = iou_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
adapter_number = int(request.match_info["adapter_number"])
port_number = int(request.match_info["port_number"])
nio = vm.get_nio(adapter_number, port_number)
if "filters" in request.json:
nio.filters = request.json["filters"]
await vm.adapter_update_nio_binding(adapter_number, port_number, nio)
response.set_status(201)
response.json(request.json)
@Route.delete(
r"/projects/{project_id}/iou/nodes/{node_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID",
"adapter_number": "Network adapter where the nio is located",
"port_number": "Port from where the nio should be removed"
},
status_codes={
204: "NIO deleted",
400: "Invalid request",
404: "Instance doesn't exist"
},
description="Remove a NIO from a IOU instance")
async def delete_nio(request, response):
iou_manager = IOU.instance()
vm = iou_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
await vm.adapter_remove_nio_binding(int(request.match_info["adapter_number"]), int(request.match_info["port_number"]))
response.set_status(204)
@Route.post(
r"/projects/{project_id}/iou/nodes/{node_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/start_capture",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID",
"adapter_number": "Adapter to start a packet capture",
"port_number": "Port on the adapter"
},
status_codes={
200: "Capture started",
400: "Invalid request",
404: "Instance doesn't exist",
409: "VM not started"
},
description="Start a packet capture on an IOU VM instance",
input=NODE_CAPTURE_SCHEMA)
async def start_capture(request, response):
iou_manager = IOU.instance()
vm = iou_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
adapter_number = int(request.match_info["adapter_number"])
port_number = int(request.match_info["port_number"])
pcap_file_path = os.path.join(vm.project.capture_working_directory(), request.json["capture_file_name"])
await vm.start_capture(adapter_number, port_number, pcap_file_path, request.json["data_link_type"])
response.json({"pcap_file_path": str(pcap_file_path)})
@Route.post(
r"/projects/{project_id}/iou/nodes/{node_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/stop_capture",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID",
"adapter_number": "Adapter to stop a packet capture",
"port_number": "Port on the adapter (always 0)"
},
status_codes={
204: "Capture stopped",
400: "Invalid request",
404: "Instance doesn't exist",
409: "VM not started"
},
description="Stop a packet capture on an IOU VM instance")
async def stop_capture(request, response):
iou_manager = IOU.instance()
vm = iou_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
adapter_number = int(request.match_info["adapter_number"])
port_number = int(request.match_info["port_number"])
await vm.stop_capture(adapter_number, port_number)
response.set_status(204)
@Route.get(
r"/projects/{project_id}/iou/nodes/{node_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/pcap",
description="Stream the pcap capture file",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID",
"adapter_number": "Adapter to steam a packet capture",
"port_number": "Port on the adapter (always 0)"
},
status_codes={
200: "File returned",
403: "Permission denied",
404: "The file doesn't exist"
})
async def stream_pcap_file(request, response):
iou_manager = IOU.instance()
vm = iou_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
adapter_number = int(request.match_info["adapter_number"])
port_number = int(request.match_info["port_number"])
nio = vm.get_nio(adapter_number, port_number)
await iou_manager.stream_pcap_file(nio, vm.project.id, request, response)
@Route.get(
r"/iou/images",
status_codes={
200: "List of IOU images",
},
description="Retrieve the list of IOU images",
output=NODE_LIST_IMAGES_SCHEMA)
async def list_iou_images(request, response):
iou_manager = IOU.instance()
images = await iou_manager.list_images()
response.set_status(200)
response.json(images)
@Route.post(
r"/iou/images/{filename:.+}",
parameters={
"filename": "Image filename"
},
status_codes={
204: "Image uploaded",
},
raw=True,
description="Upload an IOU image")
async def upload_image(request, response):
iou_manager = IOU.instance()
await iou_manager.write_image(request.match_info["filename"], request.content)
response.set_status(204)
@Route.get(
r"/iou/images/{filename:.+}",
parameters={
"filename": "Image filename"
},
status_codes={
200: "Image returned",
},
raw=True,
description="Download an IOU image")
async def download_image(request, response):
filename = request.match_info["filename"]
iou_manager = IOU.instance()
image_path = iou_manager.get_abs_image_path(filename)
# Raise error if user try to escape
if filename[0] == ".":
raise aiohttp.web.HTTPForbidden()
await response.stream_file(image_path)
@Route.get(
r"/projects/{project_id}/iou/nodes/{node_id}/console/ws",
description="WebSocket for console",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID",
})
async def console_ws(request, response):
iou_manager = IOU.instance()
vm = iou_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
return await vm.start_websocket_console(request)

@ -1,116 +0,0 @@
#
# Copyright (C) 2016 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import aiohttp
from gns3server.web.route import Route
from gns3server.controller import Controller
from gns3server.compute.port_manager import PortManager
from gns3server.compute.project_manager import ProjectManager
from gns3server.version import __version__
from gns3server.utils.get_resource import get_resource
class IndexHandler:
@Route.get(
r"/",
description="Home page of the GNS3 server"
)
async def index(request, response):
raise aiohttp.web.HTTPFound(location="/static/web-ui/bundled")
@Route.get(
r"/debug",
description="Old index page"
)
def upload(request, response):
response.template("index.html")
@Route.get(
r"/upload",
description="Placeholder page for the old /upload"
)
def upload(request, response):
response.template("upload.html")
@Route.get(
r"/compute",
description="Resources used by the GNS3 computes"
)
def compute(request, response):
response.template("compute.html",
port_manager=PortManager.instance(),
project_manager=ProjectManager.instance())
@Route.get(
r"/controller",
description="Resources used by the GNS3 controller server"
)
def controller(request, response):
response.template("controller.html",
controller=Controller.instance())
@Route.get(
r"/projects/{project_id}",
description="List of the GNS3 projects"
)
def project(request, response):
controller = Controller.instance()
response.template("project.html",
project=controller.get_project(request.match_info["project_id"]))
@Route.get(
r"/static/web-ui/{filename:.+}",
parameters={
"filename": "Static filename"
},
status_codes={
200: "Static file returned",
404: "Static cannot be found",
},
raw=True,
description="Get static resource")
async def webui(request, response):
filename = request.match_info["filename"]
filename = os.path.normpath(filename).strip("/")
filename = os.path.join('static', 'web-ui', filename)
# Raise error if user try to escape
if filename[0] == "." or '/../' in filename:
raise aiohttp.web.HTTPForbidden()
static = get_resource(filename)
if static is None or not os.path.exists(static):
static = get_resource(os.path.join('static', 'web-ui', 'index.html'))
# guesstype prefers to have text/html type than application/javascript
# which results with warnings in Firefox 66 on Windows
# Ref. gns3-server#1559
_, ext = os.path.splitext(static)
mimetype = ext == '.js' and 'application/javascript' or None
await response.stream_file(static, status=200, set_content_type=mimetype)
@Route.get(
r"/v1/version",
description="Old 1.0 API"
)
def get_v1(request, response):
response.json({"version": __version__})

@ -186,6 +186,13 @@ async def test_upload_image(app: FastAPI, client: AsyncClient, images_dir: str)
assert checksum == "033bd94b1168d7e4f0d644c3c95e35bf"
async def test_download_image_escape(app: FastAPI, client: AsyncClient, tmpdir) -> None:
file_path = "foo/%2e%2e/%2e%2e/%2e%2e/%2e%2e/%2e%2e/%2e%2e/etc/passwd"
response = await client.get(app.url_path_for("download_dynamips_image", filename=file_path))
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.skipif(not sys.platform.startswith("win") and os.getuid() == 0, reason="Root can delete any image")
async def test_upload_image_permission_denied(app: FastAPI, client: AsyncClient, images_dir: str) -> None:

@ -401,10 +401,10 @@ async def test_images(app: FastAPI, client: AsyncClient, fake_iou_bin: str) -> N
assert response.json() == [{"filename": "iou.bin", "path": "iou.bin", "filesize": 7, "md5sum": "e573e8f5c93c6c00783f20c7a170aa6c"}]
async def test_image_vm(app: FastAPI, client: AsyncClient, tmpdir) -> None:
async def test_upload_image(app: FastAPI, client: AsyncClient, tmpdir) -> None:
with patch("gns3server.compute.IOU.get_images_directory", return_value=str(tmpdir)):
response = await client.post(app.url_path_for("download_iou_image", filename="test2"), content=b"TEST")
response = await client.post(app.url_path_for("upload_iou_image", filename="test2"), content=b"TEST")
assert response.status_code == status.HTTP_204_NO_CONTENT
with open(str(tmpdir / "test2")) as f:
@ -415,6 +415,13 @@ async def test_image_vm(app: FastAPI, client: AsyncClient, tmpdir) -> None:
assert checksum == "033bd94b1168d7e4f0d644c3c95e35bf"
async def test_download_image_escape(app: FastAPI, client: AsyncClient, tmpdir) -> None:
file_path = "foo/%2e%2e/%2e%2e/%2e%2e/%2e%2e/%2e%2e/%2e%2e/etc/passwd"
response = await client.get(app.url_path_for("download_iou_image", filename=file_path))
assert response.status_code == status.HTTP_403_FORBIDDEN
async def test_iou_duplicate(app: FastAPI, client: AsyncClient, vm: dict, base_params: dict) -> None:
# create destination node first

@ -396,6 +396,13 @@ async def test_upload_image_forbiden_location(app: FastAPI, client: AsyncClient,
assert response.status_code == status.HTTP_403_FORBIDDEN
async def test_download_image_escape(app: FastAPI, client: AsyncClient, tmpdir) -> None:
file_path = "foo/%2e%2e/%2e%2e/%2e%2e/%2e%2e/%2e%2e/%2e%2e/etc/passwd"
response = await client.get(app.url_path_for("download_qemu_image", filename=file_path))
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.skipif(not sys.platform.startswith("win") and os.getuid() == 0, reason="Root can delete any image")
async def test_upload_image_permission_denied(app: FastAPI, client: AsyncClient, images_dir: str) -> None:

@ -240,6 +240,7 @@ async def test_get_file(app: FastAPI, client: AsyncClient, project: Project, com
response = MagicMock()
response.body = b"world"
response.status = status.HTTP_200_OK
compute.http_query = AsyncioMagicMock(return_value=response)
response = await client.get(app.url_path_for("get_file", project_id=project.id, node_id=node.id, file_path="hello"))

Loading…
Cancel
Save