mirror of
https://github.com/GNS3/gns3-server
synced 2024-12-25 16:28:11 +00:00
Merge pull request #2047 from GNS3/local-server-param-refactoring
"Local" command line parameter only for one purpose
This commit is contained in:
commit
2cddb2c05a
@ -25,7 +25,7 @@ import psutil
|
||||
from gns3server.config import Config
|
||||
from gns3server.utils.cpu_percent import CpuPercent
|
||||
from gns3server.version import __version__
|
||||
from gns3server.utils.path import get_default_project_directory
|
||||
from gns3server.utils.path import get_default_project_directory, is_safe_path
|
||||
from gns3server.compute.port_manager import PortManager
|
||||
from gns3server.compute.project_manager import ProjectManager
|
||||
from gns3server.utils.interfaces import interfaces
|
||||
@ -81,8 +81,7 @@ def compute_version() -> dict:
|
||||
Retrieve the server version number.
|
||||
"""
|
||||
|
||||
local_server = Config.instance().settings.Server.local
|
||||
return {"version": __version__, "local": local_server}
|
||||
return {"version": __version__}
|
||||
|
||||
|
||||
@router.get("/statistics")
|
||||
@ -155,12 +154,17 @@ async def create_qemu_image(image_data: schemas.QemuImageCreate) -> Response:
|
||||
Create a Qemu image.
|
||||
"""
|
||||
|
||||
if os.path.isabs(image_data.path):
|
||||
if Config.instance().settings.Server.local is False:
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
||||
#FIXME: move to controller
|
||||
raise NotImplementedError()
|
||||
|
||||
# Raise error if user try to escape
|
||||
#if not is_safe_path(image_data.path, project.path):
|
||||
# raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
||||
|
||||
await Qemu.instance().create_disk(
|
||||
image_data.qemu_img, image_data.path, jsonable_encoder(image_data, exclude_unset=True)
|
||||
image_data.qemu_img,
|
||||
image_data.path,
|
||||
jsonable_encoder(image_data, exclude_unset=True, exclude={"qemu_img", "path"})
|
||||
)
|
||||
|
||||
return Response(status_code=status.HTTP_204_NO_CONTENT)
|
||||
@ -176,9 +180,12 @@ async def update_qemu_image(image_data: schemas.QemuImageUpdate) -> Response:
|
||||
Update a Qemu image.
|
||||
"""
|
||||
|
||||
if os.path.isabs(image_data.path):
|
||||
if Config.instance().settings.Server.local is False:
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
||||
#FIXME: move to controller
|
||||
raise NotImplementedError()
|
||||
|
||||
# Raise error if user try to escape
|
||||
#if not is_safe_path(image_data.path, project.path):
|
||||
# raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
||||
|
||||
if image_data.extend:
|
||||
await Qemu.instance().resize_disk(image_data.qemu_img, image_data.path, image_data.extend)
|
||||
|
@ -85,7 +85,6 @@ async def upload_image(
|
||||
if os.path.commonprefix([base_images_directory, full_path]) != base_images_directory:
|
||||
raise ControllerForbiddenError(f"Cannot write image, '{image_path}' is forbidden")
|
||||
|
||||
print(image_path)
|
||||
if await images_repo.get_image(image_path):
|
||||
raise ControllerBadRequestError(f"Image '{image_path}' already exists")
|
||||
|
||||
|
@ -342,10 +342,10 @@ async def import_project(
|
||||
Import a project from a portable archive.
|
||||
"""
|
||||
|
||||
controller = Controller.instance()
|
||||
if Config.instance().settings.Server.local is False:
|
||||
raise ControllerForbiddenError("The server is not local")
|
||||
#TODO: import project remotely
|
||||
raise NotImplementedError()
|
||||
|
||||
controller = Controller.instance()
|
||||
# We write the content to a temporary location and after we extract it all.
|
||||
# It could be more optimal to stream this but it is not implemented in Python.
|
||||
try:
|
||||
@ -385,16 +385,9 @@ async def duplicate_project(
|
||||
Duplicate a project.
|
||||
"""
|
||||
|
||||
if project_data.path:
|
||||
if Config.instance().settings.Server.local is False:
|
||||
raise ControllerForbiddenError("The server is not a local server")
|
||||
location = project_data.path
|
||||
else:
|
||||
location = None
|
||||
|
||||
reset_mac_addresses = project_data.reset_mac_addresses
|
||||
new_project = await project.duplicate(
|
||||
name=project_data.name, location=location, reset_mac_addresses=reset_mac_addresses
|
||||
name=project_data.name, reset_mac_addresses=reset_mac_addresses
|
||||
)
|
||||
await rbac_repo.add_permission_to_user_with_path(current_user.user_id, f"/projects/{new_project.id}/*")
|
||||
return new_project.asdict()
|
||||
@ -423,7 +416,7 @@ async def get_file(file_path: str, project: Project = Depends(dep_project)) -> F
|
||||
@router.post("/{project_id}/files/{file_path:path}", status_code=status.HTTP_204_NO_CONTENT)
|
||||
async def write_file(file_path: str, request: Request, project: Project = Depends(dep_project)) -> Response:
|
||||
"""
|
||||
Write a file from a project.
|
||||
Write a file to a project.
|
||||
"""
|
||||
|
||||
file_path = urllib.parse.unquote(file_path)
|
||||
|
@ -85,10 +85,6 @@ class Project:
|
||||
"variables": self._variables
|
||||
}
|
||||
|
||||
def is_local(self):
|
||||
|
||||
return Config.instance().settings.Server.local
|
||||
|
||||
@property
|
||||
def id(self):
|
||||
|
||||
@ -101,12 +97,12 @@ class Project:
|
||||
|
||||
@path.setter
|
||||
def path(self, path):
|
||||
check_path_allowed(path)
|
||||
|
||||
if hasattr(self, "_path"):
|
||||
if path != self._path and self.is_local() is False:
|
||||
if path != self._path:
|
||||
raise ComputeForbiddenError("Changing the project directory path is not allowed")
|
||||
|
||||
check_path_allowed(path)
|
||||
self._path = path
|
||||
|
||||
@property
|
||||
|
@ -267,6 +267,7 @@ class Qemu(BaseManager):
|
||||
command.append(path)
|
||||
command.append(f"{img_size}M")
|
||||
|
||||
print(command)
|
||||
process = await asyncio.create_subprocess_exec(*command)
|
||||
await process.wait()
|
||||
except (OSError, subprocess.SubprocessError) as e:
|
||||
|
@ -1038,7 +1038,7 @@ class Project:
|
||||
while self._loading:
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
async def duplicate(self, name=None, location=None, reset_mac_addresses=True):
|
||||
async def duplicate(self, name=None, reset_mac_addresses=True):
|
||||
"""
|
||||
Duplicate a project
|
||||
|
||||
@ -1047,7 +1047,6 @@ class Project:
|
||||
It's a little slower but we have only one implementation to maintain.
|
||||
|
||||
:param name: Name of the new project. A new one will be generated in case of conflicts
|
||||
:param location: Parent directory of the new project
|
||||
:param reset_mac_addresses: Reset MAC addresses for the new project
|
||||
"""
|
||||
# If the project was not open we open it temporary
|
||||
@ -1062,11 +1061,8 @@ class Project:
|
||||
|
||||
# use the parent directory of the project we are duplicating as a
|
||||
# temporary directory to avoid no space left issues when '/tmp'
|
||||
# is location on another partition.
|
||||
if location:
|
||||
working_dir = os.path.abspath(os.path.join(location, os.pardir))
|
||||
else:
|
||||
working_dir = os.path.abspath(os.path.join(self.path, os.pardir))
|
||||
# is located on another partition.
|
||||
working_dir = os.path.abspath(os.path.join(self.path, os.pardir))
|
||||
|
||||
with tempfile.TemporaryDirectory(dir=working_dir) as tmpdir:
|
||||
# Do not compress the exported project when duplicating
|
||||
@ -1090,7 +1086,11 @@ class Project:
|
||||
# import the temporary project
|
||||
with open(project_path, "rb") as f:
|
||||
project = await import_project(
|
||||
self._controller, str(uuid.uuid4()), f, location=location, name=name, keep_compute_id=True
|
||||
self._controller,
|
||||
str(uuid.uuid4()),
|
||||
f,
|
||||
name=name,
|
||||
keep_compute_id=True
|
||||
)
|
||||
|
||||
log.info(f"Project '{project.name}' duplicated in {time.time() - begin:.4f} seconds")
|
||||
|
@ -255,9 +255,6 @@ class Server:
|
||||
self._set_config_defaults_from_command_line(args)
|
||||
config = Config.instance().settings
|
||||
|
||||
if config.Server.local:
|
||||
log.warning("Local mode is enabled. Beware, clients will have full control on your filesystem")
|
||||
|
||||
if not config.Server.compute_password.get_secret_value():
|
||||
alphabet = string.ascii_letters + string.digits + string.punctuation
|
||||
generated_password = ''.join(secrets.choice(alphabet) for _ in range(16))
|
||||
|
@ -60,8 +60,7 @@ def check_path_allowed(path: str):
|
||||
if len(os.path.commonprefix([project_directory, path])) == len(project_directory):
|
||||
return
|
||||
|
||||
if Config.instance().settings.Server.local is False:
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="The path is not allowed")
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"The path {path} is not allowed")
|
||||
|
||||
|
||||
def get_mountpoint(path: str):
|
||||
|
@ -45,7 +45,7 @@ async def test_version_output(app: FastAPI, compute_client: AsyncClient) -> None
|
||||
|
||||
response = await compute_client.get(app.url_path_for("compute:compute_version"))
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert response.json() == {'local': True, 'version': __version__}
|
||||
assert response.json() == {'version': __version__}
|
||||
|
||||
|
||||
async def test_compute_authentication(app: FastAPI, compute_client: AsyncClient) -> None:
|
||||
|
@ -36,35 +36,13 @@ def base_params(tmpdir) -> dict:
|
||||
|
||||
params = {
|
||||
"name": "test",
|
||||
"path": str(tmpdir),
|
||||
"project_id": str(uuid.uuid4())
|
||||
}
|
||||
return params
|
||||
|
||||
|
||||
async def test_create_project_with_path(app: FastAPI, compute_client: AsyncClient, base_params: dict) -> None:
|
||||
|
||||
with patch("gns3server.compute.project.Project.is_local", return_value=True):
|
||||
response = await compute_client.post(app.url_path_for("compute:create_compute_project"), json=base_params)
|
||||
assert response.status_code == status.HTTP_201_CREATED
|
||||
assert response.json()["project_id"] == base_params["project_id"]
|
||||
|
||||
|
||||
async def test_create_project_with_path_and_empty_variables(app: FastAPI,
|
||||
compute_client: AsyncClient,
|
||||
base_params: dict) -> None:
|
||||
|
||||
base_params["variables"] = None
|
||||
with patch("gns3server.compute.project.Project.is_local", return_value=True):
|
||||
|
||||
response = await compute_client.post(app.url_path_for("compute:create_compute_project"), json=base_params)
|
||||
assert response.status_code == status.HTTP_201_CREATED
|
||||
assert response.json()["project_id"] == base_params["project_id"]
|
||||
|
||||
|
||||
async def test_create_project_without_dir(app: FastAPI, compute_client: AsyncClient, base_params: dict) -> None:
|
||||
|
||||
del base_params["path"]
|
||||
response = await compute_client.post(app.url_path_for("compute:create_compute_project"), json=base_params)
|
||||
assert response.status_code == status.HTTP_201_CREATED
|
||||
assert response.json()["project_id"] == base_params["project_id"]
|
||||
@ -158,9 +136,8 @@ async def test_close_project_invalid_uuid(app: FastAPI, compute_client: AsyncCli
|
||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||
|
||||
|
||||
async def test_get_file(app: FastAPI, compute_client: AsyncClient, config, tmpdir) -> None:
|
||||
async def test_get_file(app: FastAPI, compute_client: AsyncClient) -> None:
|
||||
|
||||
config.settings.Server.projects_path = str(tmpdir)
|
||||
project = ProjectManager.instance().create_project(project_id="01010203-0405-0607-0809-0a0b0c0d0e0b")
|
||||
|
||||
with open(os.path.join(project.path, "hello"), "w+") as f:
|
||||
|
@ -417,57 +417,57 @@ async def test_upload_image_permission_denied(app: FastAPI, compute_client: Asyn
|
||||
assert response.status_code == status.HTTP_409_CONFLICT
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_img_relative(app: FastAPI, compute_client: AsyncClient):
|
||||
|
||||
params = {
|
||||
"qemu_img": "/tmp/qemu-img",
|
||||
"path": "hda.qcow2",
|
||||
"format": "qcow2",
|
||||
"preallocation": "metadata",
|
||||
"cluster_size": 64,
|
||||
"refcount_bits": 12,
|
||||
"lazy_refcounts": "off",
|
||||
"size": 100
|
||||
}
|
||||
with asyncio_patch("gns3server.compute.Qemu.create_disk"):
|
||||
response = await compute_client.post(app.url_path_for("compute:create_qemu_image"), json=params)
|
||||
assert response.status_code == status.HTTP_204_NO_CONTENT
|
||||
|
||||
|
||||
async def test_create_img_absolute_non_local(app: FastAPI, compute_client: AsyncClient, config) -> None:
|
||||
|
||||
config.settings.Server.local = False
|
||||
params = {
|
||||
"qemu_img": "/tmp/qemu-img",
|
||||
"path": "/tmp/hda.qcow2",
|
||||
"format": "qcow2",
|
||||
"preallocation": "metadata",
|
||||
"cluster_size": 64,
|
||||
"refcount_bits": 12,
|
||||
"lazy_refcounts": "off",
|
||||
"size": 100
|
||||
}
|
||||
with asyncio_patch("gns3server.compute.Qemu.create_disk"):
|
||||
response = await compute_client.post(app.url_path_for("compute:create_qemu_image"), json=params)
|
||||
assert response.status_code == 403
|
||||
|
||||
|
||||
async def test_create_img_absolute_local(app: FastAPI, compute_client: AsyncClient, config) -> None:
|
||||
|
||||
params = {
|
||||
"qemu_img": "/tmp/qemu-img",
|
||||
"path": "/tmp/hda.qcow2",
|
||||
"format": "qcow2",
|
||||
"preallocation": "metadata",
|
||||
"cluster_size": 64,
|
||||
"refcount_bits": 12,
|
||||
"lazy_refcounts": "off",
|
||||
"size": 100
|
||||
}
|
||||
with asyncio_patch("gns3server.compute.Qemu.create_disk"):
|
||||
response = await compute_client.post(app.url_path_for("compute:create_qemu_image"), json=params)
|
||||
assert response.status_code == status.HTTP_204_NO_CONTENT
|
||||
# @pytest.mark.asyncio
|
||||
# async def test_create_img_relative(app: FastAPI, compute_client: AsyncClient):
|
||||
#
|
||||
# params = {
|
||||
# "qemu_img": "/tmp/qemu-img",
|
||||
# "path": "hda.qcow2",
|
||||
# "format": "qcow2",
|
||||
# "preallocation": "metadata",
|
||||
# "cluster_size": 64,
|
||||
# "refcount_bits": 12,
|
||||
# "lazy_refcounts": "off",
|
||||
# "size": 100
|
||||
# }
|
||||
# with asyncio_patch("gns3server.compute.Qemu.create_disk"):
|
||||
# response = await compute_client.post(app.url_path_for("compute:create_qemu_image"), json=params)
|
||||
# assert response.status_code == status.HTTP_204_NO_CONTENT
|
||||
#
|
||||
#
|
||||
# async def test_create_img_absolute_non_local(app: FastAPI, compute_client: AsyncClient, config) -> None:
|
||||
#
|
||||
# config.settings.Server.local = False
|
||||
# params = {
|
||||
# "qemu_img": "/tmp/qemu-img",
|
||||
# "path": "/tmp/hda.qcow2",
|
||||
# "format": "qcow2",
|
||||
# "preallocation": "metadata",
|
||||
# "cluster_size": 64,
|
||||
# "refcount_bits": 12,
|
||||
# "lazy_refcounts": "off",
|
||||
# "size": 100
|
||||
# }
|
||||
# with asyncio_patch("gns3server.compute.Qemu.create_disk"):
|
||||
# response = await compute_client.post(app.url_path_for("compute:create_qemu_image"), json=params)
|
||||
# assert response.status_code == 403
|
||||
#
|
||||
#
|
||||
# async def test_create_img_absolute_local(app: FastAPI, compute_client: AsyncClient, config) -> None:
|
||||
#
|
||||
# params = {
|
||||
# "qemu_img": "/tmp/qemu-img",
|
||||
# "path": "/tmp/hda.qcow2",
|
||||
# "format": "qcow2",
|
||||
# "preallocation": "metadata",
|
||||
# "cluster_size": 64,
|
||||
# "refcount_bits": 12,
|
||||
# "lazy_refcounts": "off",
|
||||
# "size": 100
|
||||
# }
|
||||
# with asyncio_patch("gns3server.compute.Qemu.create_disk"):
|
||||
# response = await compute_client.post(app.url_path_for("compute:create_qemu_image"), json=params)
|
||||
# assert response.status_code == status.HTTP_204_NO_CONTENT
|
||||
|
||||
|
||||
async def test_capabilities(app: FastAPI, compute_client: AsyncClient) -> None:
|
||||
|
@ -41,9 +41,9 @@ async def project(app: FastAPI, client: AsyncClient, controller: Controller) ->
|
||||
return controller.get_project(u)
|
||||
|
||||
|
||||
async def test_create_project_with_path(app: FastAPI, client: AsyncClient, controller: Controller, tmpdir) -> None:
|
||||
async def test_create_project_with_path(app: FastAPI, client: AsyncClient, controller: Controller, config) -> None:
|
||||
|
||||
params = {"name": "test", "path": str(tmpdir), "project_id": "00010203-0405-0607-0809-0a0b0c0d0e0f"}
|
||||
params = {"name": "test", "path": str(config.settings.Server.projects_path), "project_id": "00010203-0405-0607-0809-0a0b0c0d0e0f"}
|
||||
response = await client.post(app.url_path_for("create_project"), json=params)
|
||||
assert response.status_code == status.HTTP_201_CREATED
|
||||
assert response.json()["name"] == "test"
|
||||
@ -128,9 +128,9 @@ async def test_update_project_with_variables(app: FastAPI, client: AsyncClient,
|
||||
assert response.json()["variables"] == variables
|
||||
|
||||
|
||||
async def test_list_projects(app: FastAPI, client: AsyncClient, controller: Controller, tmpdir) -> None:
|
||||
async def test_list_projects(app: FastAPI, client: AsyncClient, controller: Controller) -> None:
|
||||
|
||||
params = {"name": "test", "path": str(tmpdir), "project_id": "00010203-0405-0607-0809-0a0b0c0d0e0f"}
|
||||
params = {"name": "test", "project_id": "00010203-0405-0607-0809-0a0b0c0d0e0f"}
|
||||
await client.post(app.url_path_for("create_project"), json=params)
|
||||
response = await client.get(app.url_path_for("get_projects"))
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
@ -371,21 +371,21 @@ async def test_write_and_get_file_with_leading_slashes_in_filename(
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
|
||||
|
||||
async def test_import(app: FastAPI, client: AsyncClient, tmpdir, controller: Controller) -> None:
|
||||
|
||||
with zipfile.ZipFile(str(tmpdir / "test.zip"), 'w') as myzip:
|
||||
myzip.writestr("project.gns3", b'{"project_id": "c6992992-ac72-47dc-833b-54aa334bcd05", "version": "2.0.0", "name": "test"}')
|
||||
myzip.writestr("demo", b"hello")
|
||||
|
||||
project_id = str(uuid.uuid4())
|
||||
with open(str(tmpdir / "test.zip"), "rb") as f:
|
||||
response = await client.post(app.url_path_for("import_project", project_id=project_id), content=f.read())
|
||||
assert response.status_code == status.HTTP_201_CREATED
|
||||
|
||||
project = controller.get_project(project_id)
|
||||
with open(os.path.join(project.path, "demo")) as f:
|
||||
content = f.read()
|
||||
assert content == "hello"
|
||||
# async def test_import(app: FastAPI, client: AsyncClient, tmpdir, controller: Controller) -> None:
|
||||
#
|
||||
# with zipfile.ZipFile(str(tmpdir / "test.zip"), 'w') as myzip:
|
||||
# myzip.writestr("project.gns3", b'{"project_id": "c6992992-ac72-47dc-833b-54aa334bcd05", "version": "2.0.0", "name": "test"}')
|
||||
# myzip.writestr("demo", b"hello")
|
||||
#
|
||||
# project_id = str(uuid.uuid4())
|
||||
# with open(str(tmpdir / "test.zip"), "rb") as f:
|
||||
# response = await client.post(app.url_path_for("import_project", project_id=project_id), content=f.read())
|
||||
# assert response.status_code == status.HTTP_201_CREATED
|
||||
#
|
||||
# project = controller.get_project(project_id)
|
||||
# with open(os.path.join(project.path, "demo")) as f:
|
||||
# content = f.read()
|
||||
# assert content == "hello"
|
||||
|
||||
|
||||
async def test_duplicate(app: FastAPI, client: AsyncClient, project: Project) -> None:
|
||||
|
@ -17,7 +17,6 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import os
|
||||
import sys
|
||||
import uuid
|
||||
import pytest
|
||||
from uuid import uuid4
|
||||
@ -28,7 +27,6 @@ from gns3server.compute.project import Project
|
||||
from gns3server.compute.notification_manager import NotificationManager
|
||||
from gns3server.compute.compute_error import ComputeError, ComputeForbiddenError
|
||||
from gns3server.compute.vpcs import VPCS, VPCSVM
|
||||
from gns3server.config import Config
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
@ -76,28 +74,18 @@ async def test_clean_tmp_directory():
|
||||
async def test_path(projects_dir):
|
||||
|
||||
directory = projects_dir
|
||||
with patch("gns3server.compute.project.Project.is_local", return_value=True):
|
||||
with patch("gns3server.utils.path.get_default_project_directory", return_value=directory):
|
||||
p = Project(project_id=str(uuid4()))
|
||||
assert p.path == os.path.join(directory, p.id)
|
||||
assert os.path.exists(os.path.join(directory, p.id))
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_init_path(tmpdir):
|
||||
|
||||
with patch("gns3server.compute.project.Project.is_local", return_value=True):
|
||||
p = Project(path=str(tmpdir), project_id=str(uuid4()))
|
||||
assert p.path == str(tmpdir)
|
||||
with patch("gns3server.utils.path.get_default_project_directory", return_value=directory):
|
||||
p = Project(project_id=str(uuid4()))
|
||||
assert p.path == os.path.join(directory, p.id)
|
||||
assert os.path.exists(os.path.join(directory, p.id))
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_changing_path_not_allowed(tmpdir):
|
||||
|
||||
with patch("gns3server.compute.project.Project.is_local", return_value=False):
|
||||
with pytest.raises(ComputeForbiddenError):
|
||||
p = Project(project_id=str(uuid4()))
|
||||
p.path = str(tmpdir)
|
||||
with pytest.raises(ComputeForbiddenError):
|
||||
p = Project(project_id=str(uuid4()))
|
||||
p.path = str(tmpdir)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@ -135,21 +123,19 @@ async def test_json_with_variables():
|
||||
async def test_node_working_directory(node, projects_dir):
|
||||
|
||||
directory = projects_dir
|
||||
with patch("gns3server.compute.project.Project.is_local", return_value=True):
|
||||
p = Project(project_id=str(uuid4()))
|
||||
assert p.node_working_directory(node) == os.path.join(directory, p.id, 'project-files', node.module_name, node.id)
|
||||
assert os.path.exists(p.node_working_directory(node))
|
||||
p = Project(project_id=str(uuid4()))
|
||||
assert p.node_working_directory(node) == os.path.join(directory, p.id, 'project-files', node.module_name, node.id)
|
||||
assert os.path.exists(p.node_working_directory(node))
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_node_working_path(node, projects_dir):
|
||||
|
||||
directory = projects_dir
|
||||
with patch("gns3server.compute.project.Project.is_local", return_value=True):
|
||||
p = Project(project_id=str(uuid4()))
|
||||
assert p.node_working_path(node) == os.path.join(directory, p.id, 'project-files', node.module_name, node.id)
|
||||
# after this execution directory structure should not be created
|
||||
assert not os.path.exists(p.node_working_path(node))
|
||||
p = Project(project_id=str(uuid4()))
|
||||
assert p.node_working_path(node) == os.path.join(directory, p.id, 'project-files', node.module_name, node.id)
|
||||
# after this execution directory structure should not be created
|
||||
assert not os.path.exists(p.node_working_path(node))
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@ -194,9 +180,8 @@ async def test_project_close(node, compute_project):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_files(tmpdir, config):
|
||||
async def test_list_files():
|
||||
|
||||
config.settings.Server.projects_path = str(tmpdir)
|
||||
project = Project(project_id=str(uuid4()))
|
||||
path = project.path
|
||||
os.makedirs(os.path.join(path, "vm-1", "dynamips"))
|
||||
|
@ -359,7 +359,7 @@ def ubridge_path(config):
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def run_around_tests(monkeypatch, config, port_manager):#port_manager, controller, config):
|
||||
def run_around_tests(monkeypatch, config, port_manager):
|
||||
"""
|
||||
This setup a temporary project file environment around tests
|
||||
"""
|
||||
|
@ -21,6 +21,7 @@ import uuid
|
||||
import json
|
||||
import zipfile
|
||||
|
||||
from pathlib import Path
|
||||
from tests.utils import asyncio_patch, AsyncioMagicMock
|
||||
|
||||
from gns3server.controller.import_project import import_project, _move_files_to_compute
|
||||
@ -74,12 +75,13 @@ async def test_import_project(tmpdir, controller):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_import_project_override(tmpdir, controller):
|
||||
async def test_import_project_override(projects_dir, controller):
|
||||
"""
|
||||
In the case of snapshot we will import a project for
|
||||
override the previous keeping the same project id & location
|
||||
"""
|
||||
|
||||
tmpdir = Path(projects_dir)
|
||||
project_id = str(uuid.uuid4())
|
||||
topology = {
|
||||
"project_id": project_id,
|
||||
@ -523,11 +525,12 @@ async def test_move_files_to_compute(tmpdir):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_import_project_name_and_location(tmpdir, controller):
|
||||
async def test_import_project_name_and_location(projects_dir, controller):
|
||||
"""
|
||||
Import a project with a different location and name
|
||||
"""
|
||||
|
||||
tmpdir = Path(projects_dir)
|
||||
project_id = str(uuid.uuid4())
|
||||
topology = {
|
||||
"project_id": str(uuid.uuid4()),
|
||||
|
@ -128,18 +128,19 @@ def test_path_exist(tmpdir):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_init_path(tmpdir):
|
||||
async def test_init_path(projects_dir):
|
||||
|
||||
p = Project(path=str(tmpdir), project_id=str(uuid4()), name="Test")
|
||||
assert p.path == str(tmpdir)
|
||||
project_id = str(uuid4())
|
||||
p = Project(project_id=project_id, name="Test")
|
||||
assert p.path == os.path.join(projects_dir, project_id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_changing_path_with_quote_not_allowed(tmpdir):
|
||||
async def test_changing_path_with_quote_not_allowed(projects_dir):
|
||||
|
||||
with pytest.raises(ControllerForbiddenError):
|
||||
p = Project(project_id=str(uuid4()), name="Test")
|
||||
p.path = str(tmpdir / "project\"53")
|
||||
p.path = os.path.join(projects_dir, "project\"53")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
@ -15,7 +15,7 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
import os
|
||||
import json
|
||||
import pytest
|
||||
|
||||
@ -171,7 +171,7 @@ def demo_topology():
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_open(controller, tmpdir):
|
||||
async def test_open(controller, projects_dir):
|
||||
|
||||
simple_topology = {
|
||||
"auto_close": True,
|
||||
@ -192,12 +192,12 @@ async def test_open(controller, tmpdir):
|
||||
"version": "2.0.0"
|
||||
}
|
||||
|
||||
with open(str(tmpdir / "demo.gns3"), "w+") as f:
|
||||
with open(os.path.join(projects_dir, "demo.gns3"), "w+") as f:
|
||||
json.dump(simple_topology, f)
|
||||
|
||||
project = Project(name="demo",
|
||||
project_id="64ba8408-afbf-4b66-9cdd-1fd854427478",
|
||||
path=str(tmpdir),
|
||||
path=str(projects_dir),
|
||||
controller=controller,
|
||||
filename="demo.gns3",
|
||||
status="closed")
|
||||
|
@ -23,17 +23,11 @@ from fastapi import HTTPException
|
||||
from gns3server.utils.path import check_path_allowed, get_default_project_directory
|
||||
|
||||
|
||||
def test_check_path_allowed(config, tmpdir):
|
||||
def test_check_path_allowed():
|
||||
|
||||
config.settings.Server.local = False
|
||||
config.settings.Server.projects_path = str(tmpdir)
|
||||
with pytest.raises(HTTPException):
|
||||
check_path_allowed("/private")
|
||||
|
||||
config.settings.Server.local = True
|
||||
check_path_allowed(str(tmpdir / "hello" / "world"))
|
||||
check_path_allowed("/private")
|
||||
|
||||
|
||||
def test_get_default_project_directory(config):
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user