mirror of
https://github.com/GNS3/gns3-server
synced 2024-11-28 11:18:11 +00:00
Checks for compression levels + tests
This commit is contained in:
parent
015e17338c
commit
162af5bb7a
@ -41,7 +41,7 @@ from pathlib import Path
|
||||
from gns3server import schemas
|
||||
from gns3server.controller import Controller
|
||||
from gns3server.controller.project import Project
|
||||
from gns3server.controller.controller_error import ControllerError, ControllerForbiddenError
|
||||
from gns3server.controller.controller_error import ControllerError, ControllerBadRequestError
|
||||
from gns3server.controller.import_project import import_project as import_controller_project
|
||||
from gns3server.controller.export_project import export_project as export_controller_project
|
||||
from gns3server.utils.asyncio import aiozipstream
|
||||
@ -286,6 +286,7 @@ async def export_project(
|
||||
include_images: bool = False,
|
||||
reset_mac_addresses: bool = False,
|
||||
compression: schemas.ProjectCompression = "zstd",
|
||||
compression_level: int = None,
|
||||
) -> StreamingResponse:
|
||||
"""
|
||||
Export a project as a portable archive.
|
||||
@ -294,14 +295,23 @@ async def export_project(
|
||||
compression_query = compression.lower()
|
||||
if compression_query == "zip":
|
||||
compression = zipfile.ZIP_DEFLATED
|
||||
if compression_level is not None and (compression_level < 0 or compression_level > 9):
|
||||
raise ControllerBadRequestError("Compression level must be between 0 and 9 for ZIP compression")
|
||||
elif compression_query == "none":
|
||||
compression = zipfile.ZIP_STORED
|
||||
elif compression_query == "bzip2":
|
||||
compression = zipfile.ZIP_BZIP2
|
||||
if compression_level is not None and (compression_level < 1 or compression_level > 9):
|
||||
raise ControllerBadRequestError("Compression level must be between 1 and 9 for BZIP2 compression")
|
||||
elif compression_query == "lzma":
|
||||
compression = zipfile.ZIP_LZMA
|
||||
elif compression_query == "zstd":
|
||||
compression = zipfile.ZIP_ZSTANDARD
|
||||
if compression_level is not None and (compression_level < 1 or compression_level > 22):
|
||||
raise ControllerBadRequestError("Compression level must be between 1 and 22 for Zstandard compression")
|
||||
|
||||
if compression_level is not None and compression_query in ("none", "lzma"):
|
||||
raise ControllerBadRequestError(f"Compression level is not supported for '{compression_query}' compression method")
|
||||
|
||||
try:
|
||||
begin = time.time()
|
||||
@ -309,8 +319,10 @@ async def export_project(
|
||||
working_dir = os.path.abspath(os.path.join(project.path, os.pardir))
|
||||
|
||||
async def streamer():
|
||||
log.info(f"Exporting project '{project.name}' with '{compression_query}' compression "
|
||||
f"(level {compression_level})")
|
||||
with tempfile.TemporaryDirectory(dir=working_dir) as tmpdir:
|
||||
with aiozipstream.ZipFile(compression=compression) as zstream:
|
||||
with aiozipstream.ZipFile(compression=compression, compresslevel=compression_level) as zstream:
|
||||
await export_controller_project(
|
||||
zstream,
|
||||
project,
|
||||
|
@ -17,7 +17,6 @@
|
||||
|
||||
import uuid
|
||||
import os
|
||||
import zipfile
|
||||
import json
|
||||
import pytest
|
||||
|
||||
@ -26,6 +25,7 @@ from httpx import AsyncClient
|
||||
from unittest.mock import patch, MagicMock
|
||||
from tests.utils import asyncio_patch
|
||||
|
||||
import gns3server.utils.zipfile_zstd as zipfile_zstd
|
||||
from gns3server.controller import Controller
|
||||
from gns3server.controller.project import Project
|
||||
|
||||
@ -261,7 +261,7 @@ async def test_export_with_images(app: FastAPI, client: AsyncClient, tmpdir, pro
|
||||
with open(str(tmpdir / 'project.zip'), 'wb+') as f:
|
||||
f.write(response.content)
|
||||
|
||||
with zipfile.ZipFile(str(tmpdir / 'project.zip')) as myzip:
|
||||
with zipfile_zstd.ZipFile(str(tmpdir / 'project.zip')) as myzip:
|
||||
with myzip.open("a") as myfile:
|
||||
content = myfile.read()
|
||||
assert content == b"hello"
|
||||
@ -304,7 +304,7 @@ async def test_export_without_images(app: FastAPI, client: AsyncClient, tmpdir,
|
||||
with open(str(tmpdir / 'project.zip'), 'wb+') as f:
|
||||
f.write(response.content)
|
||||
|
||||
with zipfile.ZipFile(str(tmpdir / 'project.zip')) as myzip:
|
||||
with zipfile_zstd.ZipFile(str(tmpdir / 'project.zip')) as myzip:
|
||||
with myzip.open("a") as myfile:
|
||||
content = myfile.read()
|
||||
assert content == b"hello"
|
||||
@ -313,6 +313,67 @@ async def test_export_without_images(app: FastAPI, client: AsyncClient, tmpdir,
|
||||
myzip.getinfo("images/IOS/test.image")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"compression, compression_level, status_code",
|
||||
(
|
||||
("none", None, status.HTTP_200_OK),
|
||||
("none", 4, status.HTTP_400_BAD_REQUEST),
|
||||
("zip", None, status.HTTP_200_OK),
|
||||
("zip", 1, status.HTTP_200_OK),
|
||||
("zip", 12, status.HTTP_400_BAD_REQUEST),
|
||||
("bzip2", None, status.HTTP_200_OK),
|
||||
("bzip2", 1, status.HTTP_200_OK),
|
||||
("bzip2", 13, status.HTTP_400_BAD_REQUEST),
|
||||
("lzma", None, status.HTTP_200_OK),
|
||||
("lzma", 1, status.HTTP_400_BAD_REQUEST),
|
||||
("zstd", None, status.HTTP_200_OK),
|
||||
("zstd", 12, status.HTTP_200_OK),
|
||||
("zstd", 23, status.HTTP_400_BAD_REQUEST),
|
||||
)
|
||||
)
|
||||
async def test_export_compression(
|
||||
app: FastAPI,
|
||||
client: AsyncClient,
|
||||
tmpdir,
|
||||
project: Project,
|
||||
compression: str,
|
||||
compression_level: int,
|
||||
status_code: int
|
||||
) -> None:
|
||||
|
||||
project.dump = MagicMock()
|
||||
os.makedirs(project.path, exist_ok=True)
|
||||
|
||||
topology = {
|
||||
"topology": {
|
||||
"nodes": [
|
||||
{
|
||||
"node_type": "qemu"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
with open(os.path.join(project.path, "test.gns3"), 'w+') as f:
|
||||
json.dump(topology, f)
|
||||
|
||||
params = {"compression": compression}
|
||||
if compression_level:
|
||||
params["compression_level"] = compression_level
|
||||
response = await client.get(app.url_path_for("export_project", project_id=project.id), params=params)
|
||||
assert response.status_code == status_code
|
||||
|
||||
if response.status_code == status.HTTP_200_OK:
|
||||
assert response.headers['CONTENT-TYPE'] == 'application/gns3project'
|
||||
assert response.headers['CONTENT-DISPOSITION'] == 'attachment; filename="{}.gns3project"'.format(project.name)
|
||||
|
||||
with open(str(tmpdir / 'project.zip'), 'wb+') as f:
|
||||
f.write(response.content)
|
||||
|
||||
with zipfile_zstd.ZipFile(str(tmpdir / 'project.zip')) as myzip:
|
||||
with myzip.open("project.gns3") as myfile:
|
||||
myfile.read()
|
||||
|
||||
|
||||
async def test_get_file(app: FastAPI, client: AsyncClient, project: Project) -> None:
|
||||
|
||||
os.makedirs(project.path, exist_ok=True)
|
||||
|
Loading…
Reference in New Issue
Block a user