Prevent directory traversal

pull/1899/head
grossmj 3 years ago
parent 2c938b2c19
commit 6294ad9e76

@ -481,7 +481,7 @@ class BaseManager:
if not path:
return ""
orig_path = path
orig_path = os.path.normpath(path)
server_config = self.config.get_section_config("Server")
img_directory = self.get_images_directory()
@ -494,7 +494,14 @@ class BaseManager:
if re.match(r"^[A-Z]:", path) is not None:
raise NodeError("{} is not allowed on this remote server. Please only use a file from '{}'".format(path, img_directory))
if not os.path.isabs(path):
# For local server we allow using absolute path outside image directory
if server_config.getboolean("local", False) is True:
log.debug("Searching for '{}'".format(orig_path))
path = force_unix_path(path)
if os.path.exists(path):
return path
raise ImageMissingError(orig_path)
else:
for directory in valid_directory_prefices:
log.debug("Searching for image '{}' in '{}'".format(orig_path, directory))
path = self._recursive_search_file_in_directory(directory, orig_path)
@ -503,38 +510,28 @@ class BaseManager:
# Not found we try the default directory
log.debug("Searching for image '{}' in default directory".format(orig_path))
# check that the image path is in the default image directory
requested_path = os.path.relpath(orig_path, start=img_directory)
requested_path = os.path.abspath(requested_path)
common_prefix = os.path.commonprefix([requested_path, img_directory])
if common_prefix != img_directory:
raise NodeError("{} is not allowed. Please only use a file from '{}'".format(orig_path, img_directory))
s = os.path.split(orig_path)
path = force_unix_path(os.path.join(img_directory, *s))
if os.path.exists(path):
return path
raise ImageMissingError(orig_path)
# For local server we allow using absolute path outside image directory
if server_config.getboolean("local", False) is True:
log.debug("Searching for '{}'".format(orig_path))
path = force_unix_path(path)
if os.path.exists(path):
return path
raise ImageMissingError(orig_path)
# Check to see if path is an absolute path to a valid directory
path = force_unix_path(path)
for directory in valid_directory_prefices:
log.debug("Searching for image '{}' in '{}'".format(orig_path, directory))
if os.path.commonprefix([directory, path]) == directory:
if os.path.exists(path):
return path
raise ImageMissingError(orig_path)
raise NodeError("{} is not allowed on this remote server. Please only use a file from '{}'".format(path, img_directory))
def _recursive_search_file_in_directory(self, directory, searched_file):
"""
Search for a file in directory and is subdirectories
:returns: Path or None if not found
"""
s = os.path.split(searched_file)
s = os.path.split(searched_file)
for root, dirs, files in os.walk(directory):
for file in files:
# If filename is the same

@ -484,15 +484,15 @@ class DynamipsVMHandler:
raw=True,
description="Download a Dynamips IOS image")
async def download_image(request, response):
filename = request.match_info["filename"]
dynamips_manager = Dynamips.instance()
image_path = dynamips_manager.get_abs_image_path(filename)
filename = request.match_info["filename"]
# Raise error if user try to escape
if filename[0] == ".":
if filename[0] == "." or os.path.sep in filename:
raise aiohttp.web.HTTPForbidden()
dynamips_manager = Dynamips.instance()
image_path = dynamips_manager.get_abs_image_path(filename)
await response.stream_file(image_path)
@Route.post(

@ -443,15 +443,15 @@ class IOUHandler:
raw=True,
description="Download an IOU image")
async def download_image(request, response):
filename = request.match_info["filename"]
iou_manager = IOU.instance()
image_path = iou_manager.get_abs_image_path(filename)
filename = request.match_info["filename"]
# Raise error if user try to escape
if filename[0] == ".":
if filename[0] == "." or os.path.sep in filename:
raise aiohttp.web.HTTPForbidden()
iou_manager = IOU.instance()
image_path = iou_manager.get_abs_image_path(filename)
await response.stream_file(image_path)
@Route.get(

@ -20,12 +20,12 @@ import asyncio
import json
import os
import psutil
import tempfile
from gns3server.web.route import Route
from gns3server.compute.project_manager import ProjectManager
from gns3server.compute import MODULES
from gns3server.utils.cpu_percent import CpuPercent
from gns3server.utils.path import is_safe_path
from gns3server.schemas.project import (
PROJECT_OBJECT_SCHEMA,
@ -247,14 +247,13 @@ class ProjectHandler:
pm = ProjectManager.instance()
project = pm.get_project(request.match_info["project_id"])
path = request.match_info["path"]
path = os.path.normpath(path)
path = os.path.normpath(request.match_info["path"])
# Raise error if user try to escape
if path[0] == ".":
if not is_safe_path(path, project.path):
raise aiohttp.web.HTTPForbidden()
path = os.path.join(project.path, path)
path = os.path.join(project.path, path)
await response.stream_file(path)
@Route.post(
@ -273,15 +272,14 @@ class ProjectHandler:
pm = ProjectManager.instance()
project = pm.get_project(request.match_info["project_id"])
path = request.match_info["path"]
path = os.path.normpath(path)
path = os.path.normpath(request.match_info["path"])
# Raise error if user try to escape
if path[0] == ".":
if not is_safe_path(path, project.path):
raise aiohttp.web.HTTPForbidden()
path = os.path.join(project.path, path)
response.set_status(200)
path = os.path.join(project.path, path)
response.set_status(201)
try:
os.makedirs(os.path.dirname(path), exist_ok=True)

@ -566,15 +566,15 @@ class QEMUHandler:
raw=True,
description="Download Qemu image")
async def download_image(request, response):
filename = request.match_info["filename"]
qemu_manager = Qemu.instance()
image_path = qemu_manager.get_abs_image_path(filename)
filename = request.match_info["filename"]
# Raise error if user try to escape
if filename[0] == ".":
if filename[0] == "." or os.path.sep in filename:
raise aiohttp.web.HTTPForbidden()
qemu_manager = Qemu.instance()
image_path = qemu_manager.get_abs_image_path(filename)
await response.stream_file(image_path)
@Route.get(

@ -409,20 +409,19 @@ class NodeHandler:
path = request.match_info["path"]
path = force_unix_path(path)
# Raise error if user try to escape
if path[0] == ".":
if path[0] == "." or "/../" in path:
raise aiohttp.web.HTTPForbidden()
node_type = node.node_type
path = "/project-files/{}/{}/{}".format(node_type, node.id, path)
res = await node.compute.http_query("GET", "/projects/{project_id}/files{path}".format(project_id=project.id, path=path), timeout=None, raw=True)
response.set_status(200)
response.content_type = "application/octet-stream"
response.enable_chunked_encoding()
await response.prepare(request)
await response.write(res.body)
response.set_status(res.status)
if res.status == 200:
response.content_type = "application/octet-stream"
response.enable_chunked_encoding()
await response.prepare(request)
await response.write(res.body)
# await response.write_eof() #FIXME: shound't be needed anymore
@Route.post(
@ -446,14 +445,14 @@ class NodeHandler:
path = force_unix_path(path)
# Raise error if user try to escape
if path[0] == ".":
if path[0] == "." or "/../" in path:
raise aiohttp.web.HTTPForbidden()
node_type = node.node_type
path = "/project-files/{}/{}/{}".format(node_type, node.id, path)
data = await request.content.read() #FIXME: are we handling timeout or large files correctly?
await node.compute.http_query("POST", "/projects/{project_id}/files{path}".format(project_id=project.id, path=path), data=data, timeout=None, raw=True)
response.set_status(201)
res = await node.compute.http_query("POST", "/projects/{project_id}/files{path}".format(project_id=project.id, path=path), data=data, timeout=None, raw=True)
response.set_status(res.status)
@Route.get(
r"/projects/{project_id}/nodes/{node_id}/console/ws",

@ -28,6 +28,7 @@ from gns3server.controller import Controller
from gns3server.controller.import_project import import_project
from gns3server.controller.export_project import export_project
from gns3server.utils.asyncio import aiozipstream
from gns3server.utils.path import is_safe_path
from gns3server.config import Config
@ -454,14 +455,12 @@ class ProjectHandler:
controller = Controller.instance()
project = await controller.get_loaded_project(request.match_info["project_id"])
path = request.match_info["path"]
path = os.path.normpath(path).strip('/')
path = os.path.normpath(request.match_info["path"])
# Raise error if user try to escape
if path[0] == ".":
if not is_safe_path(path, project.path):
raise aiohttp.web.HTTPForbidden()
path = os.path.join(project.path, path)
await response.stream_file(path)
@Route.post(
@ -480,15 +479,13 @@ class ProjectHandler:
controller = Controller.instance()
project = await controller.get_loaded_project(request.match_info["project_id"])
path = request.match_info["path"]
path = os.path.normpath(path).strip("/")
path = os.path.normpath(request.match_info["path"])
# Raise error if user try to escape
if path[0] == ".":
if not is_safe_path(path, project.path):
raise aiohttp.web.HTTPForbidden()
path = os.path.join(project.path, path)
response.set_status(200)
response.set_status(201)
try:
async with aiofiles.open(path, 'wb+') as f:

@ -92,7 +92,7 @@ class IndexHandler:
filename = os.path.join('static', 'web-ui', filename)
# Raise error if user try to escape
if filename[0] == ".":
if filename[0] == "." or '/../' in filename:
raise aiohttp.web.HTTPForbidden()
static = get_resource(filename)

@ -37,6 +37,18 @@ def get_default_project_directory():
return path
def is_safe_path(file_path, directory):
"""
Check that file path is safe.
(the file is stored inside directory or one of its sub-directory)
"""
requested_path = os.path.relpath(file_path, start=directory)
requested_path = os.path.abspath(requested_path)
common_prefix = os.path.commonprefix([requested_path, directory])
return common_prefix != directory
def check_path_allowed(path):
"""
If the server is non local raise an error if

Loading…
Cancel
Save