mirror of
https://github.com/GNS3/gns3-server
synced 2024-11-28 11:18:11 +00:00
Reactivate project importation
This commit is contained in:
parent
c5f441e1d3
commit
149d086fd8
@ -349,25 +349,17 @@ async def export_project(
|
|||||||
async def import_project(
|
async def import_project(
|
||||||
project_id: UUID,
|
project_id: UUID,
|
||||||
request: Request,
|
request: Request,
|
||||||
path: Optional[Path] = None,
|
|
||||||
name: Optional[str] = None
|
name: Optional[str] = None
|
||||||
) -> schemas.Project:
|
) -> schemas.Project:
|
||||||
"""
|
"""
|
||||||
Import a project from a portable archive.
|
Import a project from a portable archive.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
#TODO: import project remotely
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
controller = Controller.instance()
|
controller = Controller.instance()
|
||||||
# We write the content to a temporary location and after we extract it all.
|
# We write the content to a temporary location and then we extract it all.
|
||||||
# It could be more optimal to stream this but it is not implemented in Python.
|
# It could be more optimal to stream this but it is not implemented in Python.
|
||||||
try:
|
try:
|
||||||
begin = time.time()
|
begin = time.time()
|
||||||
# use the parent directory or projects dir as a temporary working dir
|
|
||||||
if path:
|
|
||||||
working_dir = os.path.abspath(os.path.join(path, os.pardir))
|
|
||||||
else:
|
|
||||||
working_dir = controller.projects_directory()
|
working_dir = controller.projects_directory()
|
||||||
with tempfile.TemporaryDirectory(dir=working_dir) as tmpdir:
|
with tempfile.TemporaryDirectory(dir=working_dir) as tmpdir:
|
||||||
temp_project_path = os.path.join(tmpdir, "project.zip")
|
temp_project_path = os.path.join(tmpdir, "project.zip")
|
||||||
@ -375,7 +367,7 @@ async def import_project(
|
|||||||
async for chunk in request.stream():
|
async for chunk in request.stream():
|
||||||
await f.write(chunk)
|
await f.write(chunk)
|
||||||
with open(temp_project_path, "rb") as f:
|
with open(temp_project_path, "rb") as f:
|
||||||
project = await import_controller_project(controller, str(project_id), f, location=path, name=name)
|
project = await import_controller_project(controller, str(project_id), f, name=name)
|
||||||
|
|
||||||
log.info(f"Project '{project.name}' imported in {time.time() - begin:.4f} seconds")
|
log.info(f"Project '{project.name}' imported in {time.time() - begin:.4f} seconds")
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
|
@ -432,21 +432,39 @@ async def test_write_and_get_file_with_leading_slashes_in_filename(
|
|||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||||
|
|
||||||
|
|
||||||
# async def test_import(app: FastAPI, client: AsyncClient, tmpdir, controller: Controller) -> None:
|
async def test_import(app: FastAPI, client: AsyncClient, tmpdir, controller: Controller) -> None:
|
||||||
#
|
|
||||||
# with zipfile.ZipFile(str(tmpdir / "test.zip"), 'w') as myzip:
|
with zipfile_zstd.ZipFile(str(tmpdir / "test.zip"), 'w') as myzip:
|
||||||
# myzip.writestr("project.gns3", b'{"project_id": "c6992992-ac72-47dc-833b-54aa334bcd05", "version": "2.0.0", "name": "test"}')
|
myzip.writestr("project.gns3", b'{"project_id": "c6992992-ac72-47dc-833b-54aa334bcd05", "version": "2.0.0", "name": "test"}')
|
||||||
# myzip.writestr("demo", b"hello")
|
myzip.writestr("demo", b"hello")
|
||||||
#
|
|
||||||
# project_id = str(uuid.uuid4())
|
project_id = str(uuid.uuid4())
|
||||||
# with open(str(tmpdir / "test.zip"), "rb") as f:
|
with open(str(tmpdir / "test.zip"), "rb") as f:
|
||||||
# response = await client.post(app.url_path_for("import_project", project_id=project_id), content=f.read())
|
response = await client.post(app.url_path_for("import_project", project_id=project_id), content=f.read())
|
||||||
# assert response.status_code == status.HTTP_201_CREATED
|
assert response.status_code == status.HTTP_201_CREATED
|
||||||
#
|
|
||||||
# project = controller.get_project(project_id)
|
project = controller.get_project(project_id)
|
||||||
# with open(os.path.join(project.path, "demo")) as f:
|
with open(os.path.join(project.path, "demo")) as f:
|
||||||
# content = f.read()
|
content = f.read()
|
||||||
# assert content == "hello"
|
assert content == "hello"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_import_with_project_name(app: FastAPI, client: AsyncClient, tmpdir, controller: Controller) -> None:
|
||||||
|
|
||||||
|
with zipfile_zstd.ZipFile(str(tmpdir / "test.zip"), 'w') as myzip:
|
||||||
|
myzip.writestr("project.gns3", b'{"project_id": "c6992992-ac72-47dc-833b-54aa334bcd05", "version": "2.0.0", "name": "test"}')
|
||||||
|
myzip.writestr("demo", b"hello")
|
||||||
|
|
||||||
|
project_id = str(uuid.uuid4())
|
||||||
|
with open(str(tmpdir / "test.zip"), "rb") as f:
|
||||||
|
response = await client.post(
|
||||||
|
app.url_path_for("import_project", project_id=project_id),
|
||||||
|
content=f.read(),
|
||||||
|
params={"name": "my-imported-project-name"}
|
||||||
|
)
|
||||||
|
assert response.status_code == status.HTTP_201_CREATED
|
||||||
|
project = controller.get_project(project_id)
|
||||||
|
assert project.name == "my-imported-project-name"
|
||||||
|
|
||||||
|
|
||||||
async def test_duplicate(app: FastAPI, client: AsyncClient, project: Project) -> None:
|
async def test_duplicate(app: FastAPI, client: AsyncClient, project: Project) -> None:
|
||||||
|
Loading…
Reference in New Issue
Block a user