diff --git a/gns3server/api/routes/compute/qemu_nodes.py b/gns3server/api/routes/compute/qemu_nodes.py
index 66ce046a..d60625d5 100644
--- a/gns3server/api/routes/compute/qemu_nodes.py
+++ b/gns3server/api/routes/compute/qemu_nodes.py
@@ -139,13 +139,6 @@ async def start_qemu_node(node: QemuVM = Depends(dep_node)) -> Response:
Start a Qemu node.
"""
- qemu_manager = Qemu.instance()
- hardware_accel = qemu_manager.config.settings.Qemu.enable_hardware_acceleration
- if hardware_accel and "-machine accel=tcg" not in node.options:
- pm = ProjectManager.instance()
- if pm.check_hardware_virtualization(node) is False:
- pass # FIXME: check this
- # raise ComputeError("Cannot start VM with hardware acceleration (KVM/HAX) enabled because hardware virtualization (VT-x/AMD-V) is already used by another software like VMware or VirtualBox")
await node.start()
return Response(status_code=status.HTTP_204_NO_CONTENT)
diff --git a/gns3server/api/routes/controller/__init__.py b/gns3server/api/routes/controller/__init__.py
index 71e0cc02..7f6cdcdd 100644
--- a/gns3server/api/routes/controller/__init__.py
+++ b/gns3server/api/routes/controller/__init__.py
@@ -28,6 +28,7 @@ from . import projects
from . import snapshots
from . import symbols
from . import templates
+from . import images
from . import users
from . import groups
from . import roles
@@ -61,9 +62,17 @@ router.include_router(
tags=["Permissions"]
)
+router.include_router(
+ images.router,
+ dependencies=[Depends(get_current_active_user)],
+ prefix="/images",
+ tags=["Images"]
+)
+
router.include_router(
templates.router,
dependencies=[Depends(get_current_active_user)],
+ prefix="/templates",
tags=["Templates"]
)
diff --git a/gns3server/api/routes/controller/groups.py b/gns3server/api/routes/controller/groups.py
index b9ac05c4..c29a6fc1 100644
--- a/gns3server/api/routes/controller/groups.py
+++ b/gns3server/api/routes/controller/groups.py
@@ -25,6 +25,7 @@ from typing import List
from gns3server import schemas
from gns3server.controller.controller_error import (
+ ControllerError,
ControllerBadRequestError,
ControllerNotFoundError,
ControllerForbiddenError,
@@ -126,7 +127,7 @@ async def delete_user_group(
success = await users_repo.delete_user_group(user_group_id)
if not success:
- raise ControllerNotFoundError(f"User group '{user_group_id}' could not be deleted")
+ raise ControllerError(f"User group '{user_group_id}' could not be deleted")
return Response(status_code=status.HTTP_204_NO_CONTENT)
diff --git a/gns3server/api/routes/controller/images.py b/gns3server/api/routes/controller/images.py
new file mode 100644
index 00000000..510b13db
--- /dev/null
+++ b/gns3server/api/routes/controller/images.py
@@ -0,0 +1,175 @@
+#
+# Copyright (C) 2021 GNS3 Technologies Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""
+API routes for images.
+"""
+
+import os
+import logging
+import urllib.parse
+
+from fastapi import APIRouter, Request, Response, Depends, status
+from sqlalchemy.orm.exc import MultipleResultsFound
+from typing import List
+from gns3server import schemas
+from pydantic import ValidationError
+
+from gns3server.utils.images import InvalidImageError, default_images_directory, write_image
+from gns3server.db.repositories.images import ImagesRepository
+from gns3server.db.repositories.templates import TemplatesRepository
+from gns3server.services.templates import TemplatesService
+from gns3server.db.repositories.rbac import RbacRepository
+from gns3server.controller import Controller
+from gns3server.controller.controller_error import (
+ ControllerError,
+ ControllerNotFoundError,
+ ControllerForbiddenError,
+ ControllerBadRequestError
+)
+
+from .dependencies.authentication import get_current_active_user
+from .dependencies.database import get_repository
+
+log = logging.getLogger(__name__)
+
+router = APIRouter()
+
+
+@router.get("", response_model=List[schemas.Image])
+async def get_images(
+ images_repo: ImagesRepository = Depends(get_repository(ImagesRepository)),
+) -> List[schemas.Image]:
+ """
+ Return all images.
+ """
+
+ return await images_repo.get_images()
+
+
+@router.post("/upload/{image_path:path}", response_model=schemas.Image, status_code=status.HTTP_201_CREATED)
+async def upload_image(
+ image_path: str,
+ request: Request,
+ image_type: schemas.ImageType = schemas.ImageType.qemu,
+ images_repo: ImagesRepository = Depends(get_repository(ImagesRepository)),
+ templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository)),
+ current_user: schemas.User = Depends(get_current_active_user),
+ rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
+) -> schemas.Image:
+ """
+ Upload an image.
+
+ Example: curl -X POST http://host:port/v3/images/upload/my_image_name.qcow2?image_type=qemu \
+ -H 'Authorization: Bearer ' --data-binary @"/path/to/image.qcow2"
+ """
+
+ image_path = urllib.parse.unquote(image_path)
+ image_dir, image_name = os.path.split(image_path)
+ directory = default_images_directory(image_type)
+ full_path = os.path.abspath(os.path.join(directory, image_dir, image_name))
+ if os.path.commonprefix([directory, full_path]) != directory:
+ raise ControllerForbiddenError(f"Cannot write image, '{image_path}' is forbidden")
+
+ if await images_repo.get_image(image_path):
+ raise ControllerBadRequestError(f"Image '{image_path}' already exists")
+
+ try:
+ image = await write_image(image_name, image_type, full_path, request.stream(), images_repo)
+ except (OSError, InvalidImageError) as e:
+ raise ControllerError(f"Could not save {image_type} image '{image_path}': {e}")
+
+ try:
+ # attempt to automatically create a template based on image checksum
+ template = await Controller.instance().appliance_manager.install_appliance_from_image(
+ image.checksum,
+ images_repo,
+ directory
+ )
+
+ if template:
+ template_create = schemas.TemplateCreate(**template)
+ template = await TemplatesService(templates_repo).create_template(template_create)
+ template_id = template.get("template_id")
+ await rbac_repo.add_permission_to_user_with_path(current_user.user_id, f"/templates/{template_id}/*")
+ log.info(f"Template '{template.get('name')}' version {template.get('version')} "
+ f"has been created using image '{image_name}'")
+
+ except (ControllerError, ValidationError, InvalidImageError) as e:
+ log.warning(f"Could not automatically create template using image '{image_path}': {e}")
+
+ return image
+
+
+@router.get("/{image_path:path}", response_model=schemas.Image)
+async def get_image(
+ image_path: str,
+ images_repo: ImagesRepository = Depends(get_repository(ImagesRepository)),
+) -> schemas.Image:
+ """
+ Return an image.
+ """
+
+ image_path = urllib.parse.unquote(image_path)
+ image = await images_repo.get_image(image_path)
+ if not image:
+ raise ControllerNotFoundError(f"Image '{image_path}' not found")
+ return image
+
+
+@router.delete("/{image_path:path}", status_code=status.HTTP_204_NO_CONTENT)
+async def delete_image(
+ image_path: str,
+ images_repo: ImagesRepository = Depends(get_repository(ImagesRepository)),
+) -> None:
+ """
+ Delete an image.
+ """
+
+ image_path = urllib.parse.unquote(image_path)
+
+ try:
+ image = await images_repo.get_image(image_path)
+ except MultipleResultsFound:
+ raise ControllerBadRequestError(f"Image '{image_path}' matches multiple images. "
+ f"Please include the relative path of the image")
+
+ if not image:
+ raise ControllerNotFoundError(f"Image '{image_path}' not found")
+
+ if await images_repo.get_image_templates(image.image_id):
+ raise ControllerError(f"Image '{image_path}' is used by one or more templates")
+
+ try:
+ os.remove(image.path)
+ except OSError:
+ log.warning(f"Could not delete image file {image.path}")
+
+ success = await images_repo.delete_image(image_path)
+ if not success:
+ raise ControllerError(f"Image '{image_path}' could not be deleted")
+
+
+@router.post("/prune", status_code=status.HTTP_204_NO_CONTENT)
+async def prune_images(
+ images_repo: ImagesRepository = Depends(get_repository(ImagesRepository)),
+) -> Response:
+ """
+ Prune images not attached to any template.
+ """
+
+ await images_repo.prune_images()
+ return Response(status_code=status.HTTP_204_NO_CONTENT)
diff --git a/gns3server/api/routes/controller/roles.py b/gns3server/api/routes/controller/roles.py
index 8da951d6..f0ed7f1f 100644
--- a/gns3server/api/routes/controller/roles.py
+++ b/gns3server/api/routes/controller/roles.py
@@ -25,6 +25,7 @@ from typing import List
from gns3server import schemas
from gns3server.controller.controller_error import (
+ ControllerError,
ControllerBadRequestError,
ControllerNotFoundError,
ControllerForbiddenError,
@@ -119,7 +120,7 @@ async def delete_role(
success = await rbac_repo.delete_role(role_id)
if not success:
- raise ControllerNotFoundError(f"Role '{role_id}' could not be deleted")
+ raise ControllerError(f"Role '{role_id}' could not be deleted")
return Response(status_code=status.HTTP_204_NO_CONTENT)
diff --git a/gns3server/api/routes/controller/templates.py b/gns3server/api/routes/controller/templates.py
index 4f1914b5..c5982212 100644
--- a/gns3server/api/routes/controller/templates.py
+++ b/gns3server/api/routes/controller/templates.py
@@ -25,14 +25,15 @@ import logging
log = logging.getLogger(__name__)
-from fastapi import APIRouter, Request, Response, HTTPException, Depends, Response, status
-from typing import List
+from fastapi import APIRouter, Request, HTTPException, Depends, Response, status
+from typing import List, Optional
from uuid import UUID
from gns3server import schemas
from gns3server.db.repositories.templates import TemplatesRepository
from gns3server.services.templates import TemplatesService
from gns3server.db.repositories.rbac import RbacRepository
+from gns3server.db.repositories.images import ImagesRepository
from .dependencies.authentication import get_current_active_user
from .dependencies.database import get_repository
@@ -42,7 +43,7 @@ responses = {404: {"model": schemas.ErrorMessage, "description": "Could not find
router = APIRouter(responses=responses)
-@router.post("/templates", response_model=schemas.Template, status_code=status.HTTP_201_CREATED)
+@router.post("", response_model=schemas.Template, status_code=status.HTTP_201_CREATED)
async def create_template(
template_create: schemas.TemplateCreate,
templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository)),
@@ -59,7 +60,7 @@ async def create_template(
return template
-@router.get("/templates/{template_id}", response_model=schemas.Template, response_model_exclude_unset=True)
+@router.get("/{template_id}", response_model=schemas.Template, response_model_exclude_unset=True)
async def get_template(
template_id: UUID,
request: Request,
@@ -81,7 +82,7 @@ async def get_template(
return template
-@router.put("/templates/{template_id}", response_model=schemas.Template, response_model_exclude_unset=True)
+@router.put("/{template_id}", response_model=schemas.Template, response_model_exclude_unset=True)
async def update_template(
template_id: UUID,
template_update: schemas.TemplateUpdate,
@@ -94,13 +95,12 @@ async def update_template(
return await TemplatesService(templates_repo).update_template(template_id, template_update)
-@router.delete(
- "/templates/{template_id}",
- status_code=status.HTTP_204_NO_CONTENT,
-)
+@router.delete("/{template_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_template(
template_id: UUID,
+ prune_images: Optional[bool] = False,
templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository)),
+ images_repo: RbacRepository = Depends(get_repository(ImagesRepository)),
rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
) -> Response:
"""
@@ -109,10 +109,12 @@ async def delete_template(
await TemplatesService(templates_repo).delete_template(template_id)
await rbac_repo.delete_all_permissions_with_path(f"/templates/{template_id}")
+ if prune_images:
+ await images_repo.prune_images()
return Response(status_code=status.HTTP_204_NO_CONTENT)
-@router.get("/templates", response_model=List[schemas.Template], response_model_exclude_unset=True)
+@router.get("", response_model=List[schemas.Template], response_model_exclude_unset=True)
async def get_templates(
templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository)),
current_user: schemas.User = Depends(get_current_active_user),
@@ -139,7 +141,7 @@ async def get_templates(
return user_templates
-@router.post("/templates/{template_id}/duplicate", response_model=schemas.Template, status_code=status.HTTP_201_CREATED)
+@router.post("/{template_id}/duplicate", response_model=schemas.Template, status_code=status.HTTP_201_CREATED)
async def duplicate_template(
template_id: UUID, templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository)),
current_user: schemas.User = Depends(get_current_active_user),
diff --git a/gns3server/api/routes/controller/users.py b/gns3server/api/routes/controller/users.py
index d90cf79c..76b704f7 100644
--- a/gns3server/api/routes/controller/users.py
+++ b/gns3server/api/routes/controller/users.py
@@ -26,6 +26,7 @@ from typing import List
from gns3server import schemas
from gns3server.controller.controller_error import (
+ ControllerError,
ControllerBadRequestError,
ControllerNotFoundError,
ControllerForbiddenError,
@@ -207,7 +208,7 @@ async def delete_user(
success = await users_repo.delete_user(user_id)
if not success:
- raise ControllerNotFoundError(f"User '{user_id}' could not be deleted")
+ raise ControllerError(f"User '{user_id}' could not be deleted")
return Response(status_code=status.HTTP_204_NO_CONTENT)
diff --git a/gns3server/controller/appliance.py b/gns3server/controller/appliance.py
index 7913f0ab..c422f478 100644
--- a/gns3server/controller/appliance.py
+++ b/gns3server/controller/appliance.py
@@ -56,10 +56,28 @@ class Appliance:
def name(self):
return self._data.get("name")
+ @property
+ def images(self):
+ return self._data.get("images")
+
+ @property
+ def versions(self):
+ return self._data.get("versions")
+
@symbol.setter
def symbol(self, new_symbol):
self._data["symbol"] = new_symbol
+ @property
+ def type(self):
+
+ if "iou" in self._data:
+ return "iou"
+ elif "dynamips" in self._data:
+ return "dynamips"
+ else:
+ return "qemu"
+
def asdict(self):
"""
Appliance data (a hash)
diff --git a/gns3server/controller/appliance_manager.py b/gns3server/controller/appliance_manager.py
index 375c778a..a4f8f095 100644
--- a/gns3server/controller/appliance_manager.py
+++ b/gns3server/controller/appliance_manager.py
@@ -16,10 +16,12 @@
# along with this program. If not, see .
import os
-import shutil
import json
import uuid
import asyncio
+import aiofiles
+
+from aiohttp.client_exceptions import ClientError
from .appliance import Appliance
from ..config import Config
@@ -27,6 +29,9 @@ from ..utils.asyncio import locking
from ..utils.get_resource import get_resource
from ..utils.http_client import HTTPClient
from .controller_error import ControllerError
+from .appliance_to_template import ApplianceToTemplate
+from ..utils.images import InvalidImageError, write_image, md5sum
+from ..utils.asyncio import wait_run_in_executor
import logging
@@ -77,6 +82,90 @@ class ApplianceManager:
os.makedirs(appliances_path, exist_ok=True)
return appliances_path
+ def _find_appliance_from_image_checksum(self, image_checksum):
+ """
+ Find an appliance and version that matches an image checksum.
+ """
+
+ for appliance in self._appliances.values():
+ if appliance.images:
+ for image in appliance.images:
+ if image.get("md5sum") == image_checksum:
+ return appliance, image.get("version")
+
+ async def _download_image(self, image_dir, image_name, image_type, image_url, images_repo):
+ """
+ Download an image.
+ """
+
+ log.info(f"Downloading image '{image_name}' from '{image_url}'")
+ image_path = os.path.join(image_dir, image_name)
+ try:
+ async with HTTPClient.get(image_url) as response:
+ if response.status != 200:
+ raise ControllerError(f"Could not download '{image_name}' due to HTTP error code {response.status}")
+ await write_image(image_name, image_type, image_path, response.content.iter_any(), images_repo)
+ except (OSError, InvalidImageError) as e:
+ raise ControllerError(f"Could not save {image_type} image '{image_path}': {e}")
+ except ClientError as e:
+ raise ControllerError(f"Could not connect to download '{image_name}': {e}")
+ except asyncio.TimeoutError:
+ raise ControllerError(f"Timeout while downloading '{image_name}' from '{image_url}'")
+
+ async def _find_appliance_version_images(self, appliance, version, images_repo, image_dir):
+ """
+ Find all the images belonging to a specific appliance version.
+ """
+
+ version_images = version.get("images")
+ if version_images:
+ for appliance_key, appliance_file in version_images.items():
+ for image in appliance.images:
+ if appliance_file == image.get("filename"):
+ image_checksum = image.get("md5sum")
+ image_in_db = await images_repo.get_image_by_checksum(image_checksum)
+ if image_in_db:
+ version_images[appliance_key] = image_in_db.filename
+ else:
+ # check if the image is on disk
+ image_path = os.path.join(image_dir, appliance_file)
+ if os.path.exists(image_path) and await wait_run_in_executor(md5sum, image_path) == image_checksum:
+ async with aiofiles.open(image_path, "rb") as f:
+ await write_image(appliance_file, appliance.type, image_path, f, images_repo)
+ else:
+ # download the image if there is a direct download URL
+ direct_download_url = image.get("direct_download_url")
+ if direct_download_url:
+ await self._download_image(
+ image_dir,
+ appliance_file,
+ appliance.type,
+ direct_download_url,
+ images_repo)
+ else:
+ raise ControllerError(f"Could not find '{appliance_file}'")
+
+ async def install_appliance_from_image(self, image_checksum, images_repo, image_dir):
+ """
+ Find the image checksum in appliance files
+ """
+
+ from . import Controller
+
+ appliance_info = self._find_appliance_from_image_checksum(image_checksum)
+ if appliance_info:
+ appliance, image_version = appliance_info
+ if appliance.versions:
+ for version in appliance.versions:
+ if version.get("name") == image_version:
+ await self._find_appliance_version_images(appliance, version, images_repo, image_dir)
+ # downloading missing custom symbol for this appliance
+ if appliance.symbol and not appliance.symbol.startswith(":/symbols/"):
+ destination_path = os.path.join(Controller.instance().symbols.symbols_path(), appliance.symbol)
+ if not os.path.exists(destination_path):
+ await self._download_symbol(appliance.symbol, destination_path)
+ return ApplianceToTemplate().new_template(appliance.asdict(), version, "local") # FIXME: "local"
+
def load_appliances(self, symbol_theme="Classic"):
"""
Loads appliance files from disk.
@@ -98,15 +187,17 @@ class ApplianceManager:
if not file.endswith(".gns3a") and not file.endswith(".gns3appliance"):
continue
path = os.path.join(directory, file)
- appliance_id = uuid.uuid3(
- uuid.NAMESPACE_URL, path
- ) # Generate UUID from path to avoid change between reboots
+ # Generate UUID from path to avoid change between reboots
+ appliance_id = uuid.uuid5(
+ uuid.NAMESPACE_X500,
+ path
+ )
try:
with open(path, encoding="utf-8") as f:
appliance = Appliance(appliance_id, json.load(f), builtin=builtin)
json_data = appliance.asdict() # Check if loaded without error
if appliance.status != "broken":
- self._appliances[appliance.id] = appliance
+ self._appliances[appliance.id] = appliance
if not appliance.symbol or appliance.symbol.startswith(":/symbols/"):
# apply a default symbol if the appliance has none or a default symbol
default_symbol = self._get_default_symbol(json_data, symbol_theme)
@@ -157,6 +248,7 @@ class ApplianceManager:
"""
symbol_url = f"https://raw.githubusercontent.com/GNS3/gns3-registry/master/symbols/{symbol}"
+ log.info(f"Downloading symbol '{symbol}'")
async with HTTPClient.get(symbol_url) as response:
if response.status != 200:
log.warning(
diff --git a/gns3server/controller/appliance_to_template.py b/gns3server/controller/appliance_to_template.py
new file mode 100644
index 00000000..1cfc0c87
--- /dev/null
+++ b/gns3server/controller/appliance_to_template.py
@@ -0,0 +1,132 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2021 GNS3 Technologies Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+
+import logging
+log = logging.getLogger(__name__)
+
+
+class ApplianceToTemplate:
+ """
+ Appliance installation.
+ """
+
+ def new_template(self, appliance_config, version, server):
+ """
+ Creates a new template from an appliance.
+ """
+
+ new_template = {
+ "compute_id": server,
+ "name": appliance_config["name"],
+ "version": version.get("name")
+ }
+
+ if "usage" in appliance_config:
+ new_template["usage"] = appliance_config["usage"]
+
+ if appliance_config["category"] == "multilayer_switch":
+ new_template["category"] = "switch"
+ else:
+ new_template["category"] = appliance_config["category"]
+
+ if "symbol" in appliance_config:
+ new_template["symbol"] = appliance_config.get("symbol")
+
+ if new_template.get("symbol") is None:
+ if appliance_config["category"] == "guest":
+ if "docker" in appliance_config:
+ new_template["symbol"] = ":/symbols/docker_guest.svg"
+ else:
+ new_template["symbol"] = ":/symbols/qemu_guest.svg"
+ elif appliance_config["category"] == "router":
+ new_template["symbol"] = ":/symbols/router.svg"
+ elif appliance_config["category"] == "switch":
+ new_template["symbol"] = ":/symbols/ethernet_switch.svg"
+ elif appliance_config["category"] == "multilayer_switch":
+ new_template["symbol"] = ":/symbols/multilayer_switch.svg"
+ elif appliance_config["category"] == "firewall":
+ new_template["symbol"] = ":/symbols/firewall.svg"
+
+ if "qemu" in appliance_config:
+ new_template["template_type"] = "qemu"
+ self._add_qemu_config(new_template, appliance_config, version)
+ elif "iou" in appliance_config:
+ new_template["template_type"] = "iou"
+ self._add_iou_config(new_template, appliance_config, version)
+ elif "dynamips" in appliance_config:
+ new_template["template_type"] = "dynamips"
+ self._add_dynamips_config(new_template, appliance_config, version)
+ elif "docker" in appliance_config:
+ new_template["template_type"] = "docker"
+ self._add_docker_config(new_template, appliance_config)
+
+ return new_template
+
+ def _add_qemu_config(self, new_config, appliance_config, version):
+
+ new_config.update(appliance_config["qemu"])
+
+ # the following properties are not valid for a template
+ new_config.pop("kvm", None)
+ new_config.pop("path", None)
+ new_config.pop("arch", None)
+
+ options = appliance_config["qemu"].get("options", "")
+ if appliance_config["qemu"].get("kvm", "allow") == "disable" and "-machine accel=tcg" not in options:
+ options += " -machine accel=tcg"
+ new_config["options"] = options.strip()
+ new_config.update(version.get("images"))
+
+ if "path" in appliance_config["qemu"]:
+ new_config["qemu_path"] = appliance_config["qemu"]["path"]
+ else:
+ new_config["qemu_path"] = "qemu-system-{}".format(appliance_config["qemu"]["arch"])
+
+ if "first_port_name" in appliance_config:
+ new_config["first_port_name"] = appliance_config["first_port_name"]
+
+ if "port_name_format" in appliance_config:
+ new_config["port_name_format"] = appliance_config["port_name_format"]
+
+ if "port_segment_size" in appliance_config:
+ new_config["port_segment_size"] = appliance_config["port_segment_size"]
+
+ if "custom_adapters" in appliance_config:
+ new_config["custom_adapters"] = appliance_config["custom_adapters"]
+
+ if "linked_clone" in appliance_config:
+ new_config["linked_clone"] = appliance_config["linked_clone"]
+
+ def _add_docker_config(self, new_config, appliance_config):
+
+ new_config.update(appliance_config["docker"])
+
+ if "custom_adapters" in appliance_config:
+ new_config["custom_adapters"] = appliance_config["custom_adapters"]
+
+ def _add_dynamips_config(self, new_config, appliance_config, version):
+
+ new_config.update(appliance_config["dynamips"])
+ new_config["idlepc"] = version.get("idlepc", "")
+ new_config["image"] = version.get("images").get("image")
+
+ def _add_iou_config(self, new_config, appliance_config, version):
+
+ new_config.update(appliance_config["iou"])
+ new_config["path"] = version.get("images").get("image")
diff --git a/gns3server/controller/symbols.py b/gns3server/controller/symbols.py
index d1b2e428..e601c59a 100644
--- a/gns3server/controller/symbols.py
+++ b/gns3server/controller/symbols.py
@@ -121,6 +121,10 @@ class Symbols:
return None
return directory
+ def has_symbol(self, symbol_id):
+
+ return self._symbols_path.get(symbol_id)
+
def get_path(self, symbol_id):
try:
return self._symbols_path[symbol_id]
diff --git a/gns3server/db/models/__init__.py b/gns3server/db/models/__init__.py
index ed5f7ead..d10d0668 100644
--- a/gns3server/db/models/__init__.py
+++ b/gns3server/db/models/__init__.py
@@ -20,6 +20,7 @@ from .users import User, UserGroup
from .roles import Role
from .permissions import Permission
from .computes import Compute
+from .images import Image
from .templates import (
Template,
CloudTemplate,
diff --git a/gns3server/db/models/computes.py b/gns3server/db/models/computes.py
index 5fd1cf56..71043d88 100644
--- a/gns3server/db/models/computes.py
+++ b/gns3server/db/models/computes.py
@@ -15,7 +15,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-from sqlalchemy import Column, String
+from sqlalchemy import Column, String, Integer
from .base import BaseTable, GUID
@@ -28,6 +28,6 @@ class Compute(BaseTable):
name = Column(String, index=True)
protocol = Column(String)
host = Column(String)
- port = Column(String)
+ port = Column(Integer)
user = Column(String)
password = Column(String)
diff --git a/gns3server/db/models/images.py b/gns3server/db/models/images.py
new file mode 100644
index 00000000..175ab278
--- /dev/null
+++ b/gns3server/db/models/images.py
@@ -0,0 +1,43 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2021 GNS3 Technologies Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+from sqlalchemy import Table, Column, String, ForeignKey, BigInteger, Integer
+from sqlalchemy.orm import relationship
+
+from .base import Base, BaseTable, GUID
+
+
+image_template_link = Table(
+ "images_templates_link",
+ Base.metadata,
+ Column("image_id", Integer, ForeignKey("images.image_id", ondelete="CASCADE")),
+ Column("template_id", GUID, ForeignKey("templates.template_id", ondelete="CASCADE"))
+)
+
+
+class Image(BaseTable):
+
+ __tablename__ = "images"
+
+ image_id = Column(Integer, primary_key=True, autoincrement=True)
+ filename = Column(String, index=True)
+ path = Column(String, unique=True)
+ image_type = Column(String)
+ image_size = Column(BigInteger)
+ checksum = Column(String, index=True)
+ checksum_algorithm = Column(String)
+ templates = relationship("Template", secondary=image_template_link, back_populates="images")
diff --git a/gns3server/db/models/templates.py b/gns3server/db/models/templates.py
index 470b88cb..3d5f939d 100644
--- a/gns3server/db/models/templates.py
+++ b/gns3server/db/models/templates.py
@@ -17,8 +17,10 @@
from sqlalchemy import Boolean, Column, String, Integer, ForeignKey, PickleType
+from sqlalchemy.orm import relationship
from .base import BaseTable, generate_uuid, GUID
+from .images import image_template_link
class Template(BaseTable):
@@ -27,13 +29,15 @@ class Template(BaseTable):
template_id = Column(GUID, primary_key=True, default=generate_uuid)
name = Column(String, index=True)
+ version = Column(String)
category = Column(String)
default_name_format = Column(String)
symbol = Column(String)
builtin = Column(Boolean, default=False)
- compute_id = Column(String)
usage = Column(String)
template_type = Column(String)
+ compute_id = Column(String)
+ images = relationship("Image", secondary=image_template_link, back_populates="templates")
__mapper_args__ = {
"polymorphic_identity": "templates",
diff --git a/gns3server/db/repositories/computes.py b/gns3server/db/repositories/computes.py
index b76842cb..2ea00bbd 100644
--- a/gns3server/db/repositories/computes.py
+++ b/gns3server/db/repositories/computes.py
@@ -23,15 +23,14 @@ from sqlalchemy.ext.asyncio import AsyncSession
from .base import BaseRepository
import gns3server.db.models as models
-from gns3server.services import auth_service
from gns3server import schemas
class ComputesRepository(BaseRepository):
+
def __init__(self, db_session: AsyncSession) -> None:
super().__init__(db_session)
- self._auth_service = auth_service
async def get_compute(self, compute_id: UUID) -> Optional[models.Compute]:
diff --git a/gns3server/db/repositories/images.py b/gns3server/db/repositories/images.py
new file mode 100644
index 00000000..17bf4c71
--- /dev/null
+++ b/gns3server/db/repositories/images.py
@@ -0,0 +1,138 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2021 GNS3 Technologies Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import os
+
+from typing import Optional, List
+from sqlalchemy import select, delete
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from .base import BaseRepository
+
+import gns3server.db.models as models
+
+import logging
+
+log = logging.getLogger(__name__)
+
+
+class ImagesRepository(BaseRepository):
+
+ def __init__(self, db_session: AsyncSession) -> None:
+
+ super().__init__(db_session)
+
+ async def get_image(self, image_path: str) -> Optional[models.Image]:
+ """
+ Get an image by its path.
+ """
+
+ image_dir, image_name = os.path.split(image_path)
+ if image_dir:
+ query = select(models.Image).\
+ where(models.Image.filename == image_name, models.Image.path.endswith(image_path))
+ else:
+ query = select(models.Image).where(models.Image.filename == image_name)
+ result = await self._db_session.execute(query)
+ return result.scalars().one_or_none()
+
+ async def get_image_by_checksum(self, checksum: str) -> Optional[models.Image]:
+ """
+ Get an image by its checksum.
+ """
+
+ query = select(models.Image).where(models.Image.checksum == checksum)
+ result = await self._db_session.execute(query)
+ return result.scalars().first()
+
+ async def get_images(self) -> List[models.Image]:
+ """
+ Get all images.
+ """
+
+ query = select(models.Image)
+ result = await self._db_session.execute(query)
+ return result.scalars().all()
+
+ async def get_image_templates(self, image_id: int) -> Optional[List[models.Template]]:
+ """
+ Get all templates that an image belongs to.
+ """
+
+ query = select(models.Template).\
+ join(models.Template.images).\
+ filter(models.Image.image_id == image_id)
+
+ result = await self._db_session.execute(query)
+ return result.scalars().all()
+
+ async def add_image(self, image_name, image_type, image_size, path, checksum, checksum_algorithm) -> models.Image:
+ """
+ Create a new image.
+ """
+
+ db_image = models.Image(
+ image_id=None,
+ filename=image_name,
+ image_type=image_type,
+ image_size=image_size,
+ path=path,
+ checksum=checksum,
+ checksum_algorithm=checksum_algorithm
+ )
+
+ self._db_session.add(db_image)
+ await self._db_session.commit()
+ await self._db_session.refresh(db_image)
+ return db_image
+
+ async def delete_image(self, image_path: str) -> bool:
+ """
+ Delete an image.
+ """
+
+ image_dir, image_name = os.path.split(image_path)
+ if image_dir:
+ query = delete(models.Image).\
+ where(models.Image.filename == image_name, models.Image.path.endswith(image_path)).\
+ execution_options(synchronize_session=False)
+ else:
+ query = delete(models.Image).where(models.Image.filename == image_name)
+ result = await self._db_session.execute(query)
+ await self._db_session.commit()
+ return result.rowcount > 0
+
+ async def prune_images(self) -> int:
+ """
+ Prune images not attached to any template.
+ """
+
+ query = select(models.Image).\
+ filter(~models.Image.templates.any())
+ result = await self._db_session.execute(query)
+ images = result.scalars().all()
+ images_deleted = 0
+ for image in images:
+ try:
+ log.debug(f"Deleting image '{image.path}'")
+ os.remove(image.path)
+ except OSError:
+ log.warning(f"Could not delete image file {image.path}")
+ if await self.delete_image(image.filename):
+ images_deleted += 1
+ log.info(f"{images_deleted} image(s) have been deleted")
+ return images_deleted
diff --git a/gns3server/db/repositories/templates.py b/gns3server/db/repositories/templates.py
index 4e6f50b1..f24608ca 100644
--- a/gns3server/db/repositories/templates.py
+++ b/gns3server/db/repositories/templates.py
@@ -15,10 +15,13 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
+import os
+
from uuid import UUID
-from typing import List, Union
-from sqlalchemy import select, update, delete
+from typing import List, Union, Optional
+from sqlalchemy import select, delete
from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.orm import selectinload
from sqlalchemy.orm.session import make_transient
from .base import BaseRepository
@@ -41,19 +44,22 @@ TEMPLATE_TYPE_TO_MODEL = {
class TemplatesRepository(BaseRepository):
+
def __init__(self, db_session: AsyncSession) -> None:
super().__init__(db_session)
async def get_template(self, template_id: UUID) -> Union[None, models.Template]:
- query = select(models.Template).where(models.Template.template_id == template_id)
+ query = select(models.Template).\
+ options(selectinload(models.Template.images)).\
+ where(models.Template.template_id == template_id)
result = await self._db_session.execute(query)
return result.scalars().first()
async def get_templates(self) -> List[models.Template]:
- query = select(models.Template)
+ query = select(models.Template).options(selectinload(models.Template.images))
result = await self._db_session.execute(query)
return result.scalars().all()
@@ -66,20 +72,14 @@ class TemplatesRepository(BaseRepository):
await self._db_session.refresh(db_template)
return db_template
- async def update_template(self, template_id: UUID, template_update: schemas.TemplateUpdate) -> schemas.Template:
+ async def update_template(self, db_template: models.Template, template_settings: dict) -> schemas.Template:
- update_values = template_update.dict(exclude_unset=True)
-
- query = update(models.Template). \
- where(models.Template.template_id == template_id). \
- values(update_values)
-
- await self._db_session.execute(query)
+ # update the fields directly because update() query couldn't work
+ for key, value in template_settings.items():
+ setattr(db_template, key, value)
await self._db_session.commit()
- template_db = await self.get_template(template_id)
- if template_db:
- await self._db_session.refresh(template_db) # force refresh of updated_at value
- return template_db
+ await self._db_session.refresh(db_template) # force refresh of updated_at value
+ return db_template
async def delete_template(self, template_id: UUID) -> bool:
@@ -88,18 +88,77 @@ class TemplatesRepository(BaseRepository):
await self._db_session.commit()
return result.rowcount > 0
- async def duplicate_template(self, template_id: UUID) -> schemas.Template:
+ async def duplicate_template(self, template_id: UUID) -> Optional[schemas.Template]:
- query = select(models.Template).where(models.Template.template_id == template_id)
+ query = select(models.Template).\
+ options(selectinload(models.Template.images)).\
+ where(models.Template.template_id == template_id)
db_template = (await self._db_session.execute(query)).scalars().first()
- if not db_template:
- return db_template
-
- # duplicate db object with new primary key (template_id)
- self._db_session.expunge(db_template)
- make_transient(db_template)
- db_template.template_id = None
- self._db_session.add(db_template)
- await self._db_session.commit()
- await self._db_session.refresh(db_template)
+ if db_template:
+ # duplicate db object with new primary key (template_id)
+ self._db_session.expunge(db_template)
+ make_transient(db_template)
+ db_template.template_id = None
+ self._db_session.add(db_template)
+ await self._db_session.commit()
+ await self._db_session.refresh(db_template)
return db_template
+
+ async def get_image(self, image_path: str) -> Optional[models.Image]:
+ """
+ Get an image by its path.
+ """
+
+ image_dir, image_name = os.path.split(image_path)
+ if image_dir:
+ query = select(models.Image).\
+ where(models.Image.filename == image_name, models.Image.path.endswith(image_path))
+ else:
+ query = select(models.Image).where(models.Image.filename == image_name)
+ result = await self._db_session.execute(query)
+ return result.scalars().one_or_none()
+
+ async def add_image_to_template(
+ self,
+ template_id: UUID,
+ image: models.Image
+ ) -> Union[None, models.Template]:
+ """
+ Add an image to template.
+ """
+
+ query = select(models.Template).\
+ options(selectinload(models.Template.images)).\
+ where(models.Template.template_id == template_id)
+ result = await self._db_session.execute(query)
+ template_in_db = result.scalars().first()
+ if not template_in_db:
+ return None
+
+ template_in_db.images.append(image)
+ await self._db_session.commit()
+ await self._db_session.refresh(template_in_db)
+ return template_in_db
+
+ async def remove_image_from_template(
+ self,
+ template_id: UUID,
+ image: models.Image
+ ) -> Union[None, models.Template]:
+ """
+ Remove an image from a template.
+ """
+
+ query = select(models.Template).\
+ options(selectinload(models.Template.images)).\
+ where(models.Template.template_id == template_id)
+ result = await self._db_session.execute(query)
+ template_in_db = result.scalars().first()
+ if not template_in_db:
+ return None
+
+ if image in template_in_db.images:
+ template_in_db.images.remove(image)
+ await self._db_session.commit()
+ await self._db_session.refresh(template_in_db)
+ return template_in_db
diff --git a/gns3server/schemas/__init__.py b/gns3server/schemas/__init__.py
index 7a8ec42f..477c9058 100644
--- a/gns3server/schemas/__init__.py
+++ b/gns3server/schemas/__init__.py
@@ -23,6 +23,7 @@ from .version import Version
from .controller.links import LinkCreate, LinkUpdate, Link
from .controller.computes import ComputeCreate, ComputeUpdate, AutoIdlePC, Compute
from .controller.templates import TemplateCreate, TemplateUpdate, TemplateUsage, Template
+from .controller.images import Image, ImageType
from .controller.drawings import Drawing
from .controller.gns3vm import GNS3VM
from .controller.nodes import NodeCreate, NodeUpdate, NodeDuplicate, NodeCapture, Node
diff --git a/gns3server/schemas/controller/images.py b/gns3server/schemas/controller/images.py
new file mode 100644
index 00000000..bee6621e
--- /dev/null
+++ b/gns3server/schemas/controller/images.py
@@ -0,0 +1,45 @@
+#
+# Copyright (C) 2021 GNS3 Technologies Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+from pydantic import BaseModel, Field
+from enum import Enum
+
+from .base import DateTimeModelMixin
+
+
+class ImageType(str, Enum):
+
+ qemu = "qemu"
+ ios = "ios"
+ iou = "iou"
+
+
+class ImageBase(BaseModel):
+ """
+ Common image properties.
+ """
+
+ filename: str = Field(..., description="Image name")
+ image_type: ImageType = Field(..., description="Image type")
+ image_size: int = Field(..., description="Image size in bytes")
+ checksum: str = Field(..., description="Checksum value")
+ checksum_algorithm: str = Field(..., description="Checksum algorithm")
+
+
+class Image(DateTimeModelMixin, ImageBase):
+
+ class Config:
+ orm_mode = True
diff --git a/gns3server/schemas/controller/templates/__init__.py b/gns3server/schemas/controller/templates/__init__.py
index 74866b2e..14db9982 100644
--- a/gns3server/schemas/controller/templates/__init__.py
+++ b/gns3server/schemas/controller/templates/__init__.py
@@ -41,6 +41,7 @@ class TemplateBase(BaseModel):
template_id: Optional[UUID] = None
name: Optional[str] = None
+ version: Optional[str] = None
category: Optional[Category] = None
default_name_format: Optional[str] = None
symbol: Optional[str] = None
diff --git a/gns3server/services/templates.py b/gns3server/services/templates.py
index c36bfeb6..10535c71 100644
--- a/gns3server/services/templates.py
+++ b/gns3server/services/templates.py
@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
+import os
import uuid
import pydantic
@@ -22,6 +23,7 @@ from fastapi.encoders import jsonable_encoder
from typing import List
from gns3server import schemas
+import gns3server.db.models as models
from gns3server.db.repositories.templates import TemplatesRepository
from gns3server.controller import Controller
from gns3server.controller.controller_error import (
@@ -56,7 +58,7 @@ DYNAMIPS_PLATFORM_TO_SHEMA = {
# built-in templates have their compute_id set to None to tell clients to select a compute
BUILTIN_TEMPLATES = [
{
- "template_id": uuid.uuid3(uuid.NAMESPACE_DNS, "cloud"),
+ "template_id": uuid.uuid5(uuid.NAMESPACE_X500, "cloud"),
"template_type": "cloud",
"name": "Cloud",
"default_name_format": "Cloud{0}",
@@ -66,7 +68,7 @@ BUILTIN_TEMPLATES = [
"builtin": True,
},
{
- "template_id": uuid.uuid3(uuid.NAMESPACE_DNS, "nat"),
+ "template_id": uuid.uuid5(uuid.NAMESPACE_X500, "nat"),
"template_type": "nat",
"name": "NAT",
"default_name_format": "NAT{0}",
@@ -76,7 +78,7 @@ BUILTIN_TEMPLATES = [
"builtin": True,
},
{
- "template_id": uuid.uuid3(uuid.NAMESPACE_DNS, "vpcs"),
+ "template_id": uuid.uuid5(uuid.NAMESPACE_X500, "vpcs"),
"template_type": "vpcs",
"name": "VPCS",
"default_name_format": "PC{0}",
@@ -87,7 +89,7 @@ BUILTIN_TEMPLATES = [
"builtin": True,
},
{
- "template_id": uuid.uuid3(uuid.NAMESPACE_DNS, "ethernet_switch"),
+ "template_id": uuid.uuid5(uuid.NAMESPACE_X500, "ethernet_switch"),
"template_type": "ethernet_switch",
"name": "Ethernet switch",
"console_type": "none",
@@ -98,7 +100,7 @@ BUILTIN_TEMPLATES = [
"builtin": True,
},
{
- "template_id": uuid.uuid3(uuid.NAMESPACE_DNS, "ethernet_hub"),
+ "template_id": uuid.uuid5(uuid.NAMESPACE_X500, "ethernet_hub"),
"template_type": "ethernet_hub",
"name": "Ethernet hub",
"default_name_format": "Hub{0}",
@@ -108,7 +110,7 @@ BUILTIN_TEMPLATES = [
"builtin": True,
},
{
- "template_id": uuid.uuid3(uuid.NAMESPACE_DNS, "frame_relay_switch"),
+ "template_id": uuid.uuid5(uuid.NAMESPACE_X500, "frame_relay_switch"),
"template_type": "frame_relay_switch",
"name": "Frame Relay switch",
"default_name_format": "FRSW{0}",
@@ -118,7 +120,7 @@ BUILTIN_TEMPLATES = [
"builtin": True,
},
{
- "template_id": uuid.uuid3(uuid.NAMESPACE_DNS, "atm_switch"),
+ "template_id": uuid.uuid5(uuid.NAMESPACE_X500, "atm_switch"),
"template_type": "atm_switch",
"name": "ATM switch",
"default_name_format": "ATMSW{0}",
@@ -131,6 +133,7 @@ BUILTIN_TEMPLATES = [
class TemplatesService:
+
def __init__(self, templates_repo: TemplatesRepository):
self._templates_repo = templates_repo
@@ -152,6 +155,44 @@ class TemplatesService:
templates.append(jsonable_encoder(builtin_template))
return templates
+ async def _find_image(self, image_path: str):
+
+ image = await self._templates_repo.get_image(image_path)
+ if not image or not os.path.exists(image.path):
+ raise ControllerNotFoundError(f"Image '{image_path}' could not be found")
+ return image
+
+ async def _find_images(self, template_type: str, settings: dict) -> List[models.Image]:
+
+ images_to_add_to_template = []
+ if template_type == "dynamips":
+ if settings["image"]:
+ image = await self._find_image(settings["image"])
+ if image.image_type != "ios":
+ raise ControllerBadRequestError(
+ f"Image '{image.filename}' type is not 'ios' but '{image.image_type}'"
+ )
+ images_to_add_to_template.append(image)
+ elif template_type == "iou":
+ if settings["path"]:
+ image = await self._find_image(settings["path"])
+ if image.image_type != "iou":
+ raise ControllerBadRequestError(
+ f"Image '{image.filename}' type is not 'iou' but '{image.image_type}'"
+ )
+ images_to_add_to_template.append(image)
+ elif template_type == "qemu":
+ for key, value in settings.items():
+ if key.endswith("_image") and value:
+ image = await self._find_image(value)
+ if image.image_type != "qemu":
+ raise ControllerBadRequestError(
+ f"Image '{image.filename}' type is not 'qemu' but '{image.image_type}'"
+ )
+ if image not in images_to_add_to_template:
+ images_to_add_to_template.append(image)
+ return images_to_add_to_template
+
async def create_template(self, template_create: schemas.TemplateCreate) -> dict:
try:
@@ -167,7 +208,11 @@ class TemplatesService:
settings = dynamips_template_settings_with_defaults.dict()
except pydantic.ValidationError as e:
raise ControllerBadRequestError(f"JSON schema error received while creating new template: {e}")
+
+ images_to_add_to_template = await self._find_images(template_create.template_type, settings)
db_template = await self._templates_repo.create_template(template_create.template_type, settings)
+ for image in images_to_add_to_template:
+ await self._templates_repo.add_image_to_template(db_template.template_id, image)
template = db_template.asjson()
self._controller.notification.controller_emit("template.created", template)
return template
@@ -183,13 +228,34 @@ class TemplatesService:
raise ControllerNotFoundError(f"Template '{template_id}' not found")
return template
+ async def _remove_image(self, template_id: UUID, image_path:str) -> None:
+
+ image = await self._templates_repo.get_image(image_path)
+ await self._templates_repo.remove_image_from_template(template_id, image)
+
async def update_template(self, template_id: UUID, template_update: schemas.TemplateUpdate) -> dict:
if self.get_builtin_template(template_id):
raise ControllerForbiddenError(f"Template '{template_id}' cannot be updated because it is built-in")
- db_template = await self._templates_repo.update_template(template_id, template_update)
+ template_settings = jsonable_encoder(template_update, exclude_unset=True)
+
+ db_template = await self._templates_repo.get_template(template_id)
if not db_template:
raise ControllerNotFoundError(f"Template '{template_id}' not found")
+
+ images_to_add_to_template = await self._find_images(db_template.template_type, template_settings)
+ if db_template.template_type == "dynamips" and "image" in template_settings:
+ await self._remove_image(db_template.template_id, db_template.image)
+ elif db_template.template_type == "iou" and "path" in template_settings:
+ await self._remove_image(db_template.template_id, db_template.path)
+ elif db_template.template_type == "qemu":
+ for key in template_update.dict().keys():
+ if key.endswith("_image") and key in template_settings:
+ await self._remove_image(db_template.template_id, db_template.__dict__[key])
+
+ db_template = await self._templates_repo.update_template(db_template, template_settings)
+ for image in images_to_add_to_template:
+ await self._templates_repo.add_image_to_template(db_template.template_id, image)
template = db_template.asjson()
self._controller.notification.controller_emit("template.updated", template)
return template
diff --git a/gns3server/utils/images.py b/gns3server/utils/images.py
index 5a1fdb2d..1c4c2e5b 100644
--- a/gns3server/utils/images.py
+++ b/gns3server/utils/images.py
@@ -16,21 +16,27 @@
import os
import hashlib
+import stat
+import aiofiles
+import shutil
+from typing import AsyncGenerator
from ..config import Config
from . import force_unix_path
+import gns3server.db.models as models
+from gns3server.db.repositories.images import ImagesRepository
import logging
log = logging.getLogger(__name__)
-def list_images(type):
+def list_images(image_type):
"""
- Scan directories for available image for a type
+ Scan directories for available image for a given type.
- :param type: emulator type (dynamips, qemu, iou)
+ :param image_type: image type (dynamips, qemu, iou)
"""
files = set()
images = []
@@ -39,9 +45,9 @@ def list_images(type):
general_images_directory = os.path.expanduser(server_config.images_path)
# Subfolder of the general_images_directory specific to this VM type
- default_directory = default_images_directory(type)
+ default_directory = default_images_directory(image_type)
- for directory in images_directories(type):
+ for directory in images_directories(image_type):
# We limit recursion to path outside the default images directory
# the reason is in the default directory manage file organization and
@@ -58,9 +64,9 @@ def list_images(type):
if filename.endswith(".md5sum") or filename.startswith("."):
continue
elif (
- ((filename.endswith(".image") or filename.endswith(".bin")) and type == "dynamips")
- or ((filename.endswith(".bin") or filename.startswith("i86bi")) and type == "iou")
- or (not filename.endswith(".bin") and not filename.endswith(".image") and type == "qemu")
+ ((filename.endswith(".image") or filename.endswith(".bin")) and image_type == "dynamips")
+ or ((filename.endswith(".bin") or filename.startswith("i86bi")) and image_type == "iou")
+ or (not filename.endswith(".bin") and not filename.endswith(".image") and image_type == "qemu")
):
files.add(filename)
@@ -71,7 +77,7 @@ def list_images(type):
path = os.path.relpath(os.path.join(root, filename), default_directory)
try:
- if type in ["dynamips", "iou"]:
+ if image_type in ["dynamips", "iou"]:
with open(os.path.join(root, filename), "rb") as f:
# read the first 7 bytes of the file.
elf_header_start = f.read(7)
@@ -110,20 +116,21 @@ def _os_walk(directory, recurse=True, **kwargs):
yield directory, [], files
-def default_images_directory(type):
+def default_images_directory(image_type):
"""
- :returns: Return the default directory for a node type
+ :returns: Return the default directory for an image type.
"""
+
server_config = Config.instance().settings.Server
img_dir = os.path.expanduser(server_config.images_path)
- if type == "qemu":
+ if image_type == "qemu":
return os.path.join(img_dir, "QEMU")
- elif type == "iou":
+ elif image_type == "iou":
return os.path.join(img_dir, "IOU")
- elif type == "dynamips":
+ elif image_type == "dynamips" or image_type == "ios":
return os.path.join(img_dir, "IOS")
else:
- raise NotImplementedError("%s node type is not supported", type)
+ raise NotImplementedError(f"%s node type is not supported", image_type)
def images_directories(type):
@@ -206,3 +213,72 @@ def remove_checksum(path):
path = f"{path}.md5sum"
if os.path.exists(path):
os.remove(path)
+
+
+class InvalidImageError(Exception):
+
+ def __init__(self, message: str):
+ super().__init__()
+ self._message = message
+
+ def __str__(self):
+ return self._message
+
+
+def check_valid_image_header(data: bytes, image_type: str, header_magic_len: int) -> None:
+
+ if image_type == "ios":
+ # file must start with the ELF magic number, be 32-bit, big endian and have an ELF version of 1
+ if data[:header_magic_len] != b'\x7fELF\x01\x02\x01':
+ raise InvalidImageError("Invalid IOS file detected")
+ elif image_type == "iou":
+ # file must start with the ELF magic number, be 32-bit or 64-bit, little endian and have an ELF version of 1
+ # (normal IOS images are big endian!)
+ if data[:header_magic_len] != b'\x7fELF\x01\x01\x01' and data[:7] != b'\x7fELF\x02\x01\x01':
+ raise InvalidImageError("Invalid IOU file detected")
+ elif image_type == "qemu":
+ if data[:header_magic_len] != b'QFI\xfb' and data[:header_magic_len] != b'KDMV':
+ raise InvalidImageError("Invalid Qemu file detected (must be qcow2 or VDMK format)")
+
+
+async def write_image(
+ image_name: str,
+ image_type: str,
+ path: str,
+ stream: AsyncGenerator[bytes, None],
+ images_repo: ImagesRepository,
+ check_image_header=True
+) -> models.Image:
+
+ log.info(f"Writing image file to '{path}'")
+ # Store the file under its final name only when the upload is completed
+ tmp_path = path + ".tmp"
+ os.makedirs(os.path.dirname(path), exist_ok=True)
+ checksum = hashlib.md5()
+ header_magic_len = 7
+ if image_type == "qemu":
+ header_magic_len = 4
+ try:
+ async with aiofiles.open(tmp_path, "wb") as f:
+ async for chunk in stream:
+ if check_image_header and len(chunk) >= header_magic_len:
+ check_image_header = False
+ check_valid_image_header(chunk, image_type, header_magic_len)
+ await f.write(chunk)
+ checksum.update(chunk)
+
+ image_size = os.path.getsize(tmp_path)
+ if not image_size or image_size < header_magic_len:
+ raise InvalidImageError("The image content is empty or too small to be valid")
+
+ checksum = checksum.hexdigest()
+ duplicate_image = await images_repo.get_image_by_checksum(checksum)
+ if duplicate_image and os.path.dirname(duplicate_image.path) == os.path.dirname(path):
+ raise InvalidImageError(f"Image {duplicate_image.filename} with "
+ f"same checksum already exists in the same directory")
+ except InvalidImageError:
+ os.remove(tmp_path)
+ raise
+ os.chmod(tmp_path, stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC)
+ shutil.move(tmp_path, path)
+ return await images_repo.add_image(image_name, image_type, image_size, path, checksum, checksum_algorithm="md5")
diff --git a/tests/api/routes/controller/test_images.py b/tests/api/routes/controller/test_images.py
new file mode 100644
index 00000000..2a4a2230
--- /dev/null
+++ b/tests/api/routes/controller/test_images.py
@@ -0,0 +1,258 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2021 GNS3 Technologies Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import os
+import pytest
+import hashlib
+
+from sqlalchemy.ext.asyncio import AsyncSession
+from fastapi import FastAPI, status
+from httpx import AsyncClient
+
+from gns3server.db.repositories.images import ImagesRepository
+
+pytestmark = pytest.mark.asyncio
+
+
+@pytest.fixture
+def iou_32_bit_image(tmpdir) -> str:
+ """
+ Create a fake IOU image on disk
+ """
+
+ path = os.path.join(tmpdir, "iou_32bit.bin")
+ with open(path, "wb+") as f:
+ f.write(b'\x7fELF\x01\x01\x01')
+ return path
+
+
+@pytest.fixture
+def iou_64_bit_image(tmpdir) -> str:
+ """
+ Create a fake IOU image on disk
+ """
+
+ path = os.path.join(tmpdir, "iou_64bit.bin")
+ with open(path, "wb+") as f:
+ f.write(b'\x7fELF\x02\x01\x01')
+ return path
+
+
+@pytest.fixture
+def ios_image(tmpdir) -> str:
+ """
+ Create a fake IOS image on disk
+ """
+
+ path = os.path.join(tmpdir, "ios.bin")
+ with open(path, "wb+") as f:
+ f.write(b'\x7fELF\x01\x02\x01')
+ return path
+
+
+@pytest.fixture
+def qcow2_image(tmpdir) -> str:
+ """
+ Create a fake Qemu qcow2 image on disk
+ """
+
+ path = os.path.join(tmpdir, "image.qcow2")
+ with open(path, "wb+") as f:
+ f.write(b'QFI\xfb')
+ return path
+
+
+@pytest.fixture
+def invalid_image(tmpdir) -> str:
+ """
+ Create a fake invalid image on disk
+ """
+
+ path = os.path.join(tmpdir, "invalid_image.bin")
+ with open(path, "wb+") as f:
+ f.write(b'\x01\x01\x01\x01')
+ return path
+
+
+@pytest.fixture
+def empty_image(tmpdir) -> str:
+ """
+ Create a fake empty image on disk
+ """
+
+ path = os.path.join(tmpdir, "empty_image.bin")
+ with open(path, "wb+") as f:
+ f.write(b'')
+ return path
+
+
+class TestImageRoutes:
+
+ @pytest.mark.parametrize(
+ "image_type, fixture_name, valid_request",
+ (
+ ("iou", "iou_32_bit_image", True),
+ ("iou", "iou_64_bit_image", True),
+ ("iou", "invalid_image", False),
+ ("ios", "ios_image", True),
+ ("ios", "invalid_image", False),
+ ("qemu", "qcow2_image", True),
+ ("qemu", "empty_image", False),
+ ("wrong_type", "qcow2_image", False),
+ ),
+ )
+ async def test_upload_image(
+ self,
+ app: FastAPI,
+ client: AsyncClient,
+ images_dir: str,
+ image_type: str,
+ fixture_name: str,
+ valid_request: bool,
+ request
+ ) -> None:
+
+ image_path = request.getfixturevalue(fixture_name)
+ image_name = os.path.basename(image_path)
+ image_checksum = hashlib.md5()
+ with open(image_path, "rb") as f:
+ image_data = f.read()
+ image_checksum.update(image_data)
+
+ response = await client.post(
+ app.url_path_for("upload_image", image_path=image_name),
+ params={"image_type": image_type},
+ content=image_data)
+
+ if valid_request:
+ assert response.status_code == status.HTTP_201_CREATED
+ assert response.json()["filename"] == image_name
+ assert response.json()["checksum"] == image_checksum.hexdigest()
+ assert os.path.exists(os.path.join(images_dir, image_type.upper(), image_name))
+ else:
+ assert response.status_code != status.HTTP_201_CREATED
+
+ async def test_image_list(self, app: FastAPI, client: AsyncClient) -> None:
+
+ response = await client.get(app.url_path_for("get_images"))
+ assert response.status_code == status.HTTP_200_OK
+ assert len(response.json()) == 4 # 4 valid images uploaded before
+
+ async def test_image_get(self, app: FastAPI, client: AsyncClient, qcow2_image: str) -> None:
+
+ image_name = os.path.basename(qcow2_image)
+ response = await client.get(app.url_path_for("get_image", image_path=image_name))
+ assert response.status_code == status.HTTP_200_OK
+ assert response.json()["filename"] == image_name
+
+ async def test_same_image_cannot_be_uploaded(self, app: FastAPI, client: AsyncClient, qcow2_image: str) -> None:
+
+ image_name = os.path.basename(qcow2_image)
+ with open(qcow2_image, "rb") as f:
+ image_data = f.read()
+ response = await client.post(
+ app.url_path_for("upload_image", image_path=image_name),
+ params={"image_type": "qemu"},
+ content=image_data)
+ assert response.status_code == status.HTTP_400_BAD_REQUEST
+
+ async def test_image_delete(self, app: FastAPI, client: AsyncClient, qcow2_image: str) -> None:
+
+ image_name = os.path.basename(qcow2_image)
+ response = await client.delete(app.url_path_for("delete_image", image_path=image_name))
+ assert response.status_code == status.HTTP_204_NO_CONTENT
+
+ async def test_not_found_image(self, app: FastAPI, client: AsyncClient, qcow2_image: str) -> None:
+
+ image_name = os.path.basename(qcow2_image)
+ response = await client.get(app.url_path_for("get_image", image_path=image_name))
+ assert response.status_code == status.HTTP_404_NOT_FOUND
+
+ async def test_image_deleted_on_disk(self, app: FastAPI, client: AsyncClient, images_dir: str, qcow2_image: str) -> None:
+
+ image_name = os.path.basename(qcow2_image)
+ with open(qcow2_image, "rb") as f:
+ image_data = f.read()
+ response = await client.post(
+ app.url_path_for("upload_image", image_path=image_name),
+ params={"image_type": "qemu"},
+ content=image_data)
+ assert response.status_code == status.HTTP_201_CREATED
+
+ response = await client.delete(app.url_path_for("delete_image", image_path=image_name))
+ assert response.status_code == status.HTTP_204_NO_CONTENT
+ assert not os.path.exists(os.path.join(images_dir, "QEMU", image_name))
+
+ @pytest.mark.parametrize(
+ "subdir, expected_result",
+ (
+ ("subdir", status.HTTP_201_CREATED),
+ ("subdir", status.HTTP_400_BAD_REQUEST),
+ ("subdir2", status.HTTP_201_CREATED),
+ ),
+ )
+ async def test_upload_image_subdir(
+ self,
+ app: FastAPI,
+ client: AsyncClient,
+ images_dir: str,
+ qcow2_image: str,
+ subdir: str,
+ expected_result: int
+ ) -> None:
+
+ image_name = os.path.basename(qcow2_image)
+ with open(qcow2_image, "rb") as f:
+ image_data = f.read()
+ image_path = os.path.join(subdir, image_name)
+ response = await client.post(
+ app.url_path_for("upload_image", image_path=image_path),
+ params={"image_type": "qemu"},
+ content=image_data)
+ assert response.status_code == expected_result
+
+ async def test_image_delete_multiple_match(
+ self,
+ app: FastAPI,
+ client: AsyncClient,
+ qcow2_image: str
+ ) -> None:
+
+ image_name = os.path.basename(qcow2_image)
+ response = await client.delete(app.url_path_for("delete_image", image_path=image_name))
+ assert response.status_code == status.HTTP_400_BAD_REQUEST
+
+ async def test_image_delete_with_subdir(
+ self,
+ app: FastAPI,
+ client: AsyncClient,
+ qcow2_image: str
+ ) -> None:
+
+ image_name = os.path.basename(qcow2_image)
+ image_path = os.path.join("subdir", image_name)
+ response = await client.delete(app.url_path_for("delete_image", image_path=image_path))
+ assert response.status_code == status.HTTP_204_NO_CONTENT
+
+ async def test_prune_images(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession) -> None:
+
+ response = await client.post(app.url_path_for("prune_images"))
+ assert response.status_code == status.HTTP_204_NO_CONTENT
+
+ images_repo = ImagesRepository(db_session)
+ images_in_db = await images_repo.get_images()
+ assert len(images_in_db) == 0
diff --git a/tests/api/routes/controller/test_templates.py b/tests/api/routes/controller/test_templates.py
index 2c008558..d2423230 100644
--- a/tests/api/routes/controller/test_templates.py
+++ b/tests/api/routes/controller/test_templates.py
@@ -15,13 +15,18 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
+import os
import pytest
import uuid
from pathlib import Path
from fastapi import FastAPI, status
from httpx import AsyncClient
+from sqlalchemy.ext.asyncio import AsyncSession
+from tests.utils import asyncio_patch
+from gns3server.db.repositories.images import ImagesRepository
+from gns3server.db.repositories.templates import TemplatesRepository
from gns3server.controller import Controller
from gns3server.services.templates import BUILTIN_TEMPLATES
@@ -91,7 +96,7 @@ class TestTemplateRoutes:
assert response.status_code == status.HTTP_200_OK
assert response.json()["template_id"] == template_id
- params["name"] = "VPCS_TEST_RENAMED"
+ params = {"name": "VPCS_TEST_RENAMED", "console_auto_start": True}
response = await client.put(app.url_path_for("update_template", template_id=template_id), json=params)
assert response.status_code == status.HTTP_200_OK
@@ -111,6 +116,40 @@ class TestTemplateRoutes:
response = await client.delete(app.url_path_for("delete_template", template_id=template_id))
assert response.status_code == status.HTTP_204_NO_CONTENT
+ async def test_template_delete_with_prune_images(
+ self,
+ app: FastAPI,
+ client: AsyncClient,
+ db_session: AsyncSession,
+ tmpdir: str,
+ ) -> None:
+
+ path = os.path.join(tmpdir, "test.qcow2")
+ with open(path, "wb+") as f:
+ f.write(b'\x42\x42\x42\x42')
+ images_repo = ImagesRepository(db_session)
+ await images_repo.add_image("test.qcow2", "qemu", 42, path, "e342eb86c1229b6c154367a5476969b5", "md5")
+
+ template_id = str(uuid.uuid4())
+ params = {"template_id": template_id,
+ "name": "QEMU_TEMPLATE",
+ "compute_id": "local",
+ "hda_disk_image": "test.qcow2",
+ "template_type": "qemu"}
+
+ response = await client.post(app.url_path_for("create_template"), json=params)
+ assert response.status_code == status.HTTP_201_CREATED
+
+ response = await client.delete(
+ app.url_path_for("delete_template", template_id=template_id),
+ params={"prune_images": True}
+ )
+ assert response.status_code == status.HTTP_204_NO_CONTENT
+
+ images_repo = ImagesRepository(db_session)
+ images = await images_repo.get_images()
+ assert len(images) == 0
+
# async def test_create_node_from_template(self, controller_api, controller, project):
#
# id = str(uuid.uuid4())
@@ -210,42 +249,43 @@ class TestDynamipsTemplate:
"image": "c7200-adventerprisek9-mz.124-24.T5.image",
"template_type": "dynamips"}
- response = await client.post(app.url_path_for("create_template"), json=params)
- assert response.status_code == status.HTTP_201_CREATED
- assert response.json()["template_id"] is not None
+ with asyncio_patch("gns3server.services.templates.TemplatesService._find_images", return_value=[]) as mock:
+ response = await client.post(app.url_path_for("create_template"), json=params)
+ assert mock.called
+ assert response.status_code == status.HTTP_201_CREATED
+ assert response.json()["template_id"] is not None
- expected_response = {"template_type": "dynamips",
- "auto_delete_disks": False,
- "builtin": False,
- "category": "router",
- "compute_id": "local",
- "console_auto_start": False,
- "console_type": "telnet",
- "default_name_format": "R{0}",
- "disk0": 0,
- "disk1": 0,
- "exec_area": 64,
- "idlemax": 500,
- "idlepc": "",
- "idlesleep": 30,
- "image": "c7200-adventerprisek9-mz.124-24.T5.image",
- "mac_addr": "",
- "midplane": "vxr",
- "mmap": True,
- "name": "Cisco c7200 template",
- "npe": "npe-400",
- "nvram": 512,
- "platform": "c7200",
- "private_config": "",
- "ram": 512,
- "sparsemem": True,
- "startup_config": "ios_base_startup-config.txt",
- "symbol": ":/symbols/router.svg",
- "system_id": "FTX0945W0MY"}
-
- for item, value in expected_response.items():
- assert response.json().get(item) == value
+ expected_response = {"template_type": "dynamips",
+ "auto_delete_disks": False,
+ "builtin": False,
+ "category": "router",
+ "compute_id": "local",
+ "console_auto_start": False,
+ "console_type": "telnet",
+ "default_name_format": "R{0}",
+ "disk0": 0,
+ "disk1": 0,
+ "exec_area": 64,
+ "idlemax": 500,
+ "idlepc": "",
+ "idlesleep": 30,
+ "image": "c7200-adventerprisek9-mz.124-24.T5.image",
+ "mac_addr": "",
+ "midplane": "vxr",
+ "mmap": True,
+ "name": "Cisco c7200 template",
+ "npe": "npe-400",
+ "nvram": 512,
+ "platform": "c7200",
+ "private_config": "",
+ "ram": 512,
+ "sparsemem": True,
+ "startup_config": "ios_base_startup-config.txt",
+ "symbol": ":/symbols/router.svg",
+ "system_id": "FTX0945W0MY"}
+ for item, value in expected_response.items():
+ assert response.json().get(item) == value
async def test_c3745_dynamips_template_create(self, app: FastAPI, client: AsyncClient) -> None:
@@ -255,40 +295,42 @@ class TestDynamipsTemplate:
"image": "c3745-adventerprisek9-mz.124-25d.image",
"template_type": "dynamips"}
- response = await client.post(app.url_path_for("create_template"), json=params)
- assert response.status_code == status.HTTP_201_CREATED
- assert response.json()["template_id"] is not None
+ with asyncio_patch("gns3server.services.templates.TemplatesService._find_images", return_value=[]) as mock:
+ response = await client.post(app.url_path_for("create_template"), json=params)
+ assert mock.called
+ assert response.status_code == status.HTTP_201_CREATED
+ assert response.json()["template_id"] is not None
- expected_response = {"template_type": "dynamips",
- "auto_delete_disks": False,
- "builtin": False,
- "category": "router",
- "compute_id": "local",
- "console_auto_start": False,
- "console_type": "telnet",
- "default_name_format": "R{0}",
- "disk0": 0,
- "disk1": 0,
- "exec_area": 64,
- "idlemax": 500,
- "idlepc": "",
- "idlesleep": 30,
- "image": "c3745-adventerprisek9-mz.124-25d.image",
- "mac_addr": "",
- "mmap": True,
- "name": "Cisco c3745 template",
- "iomem": 5,
- "nvram": 256,
- "platform": "c3745",
- "private_config": "",
- "ram": 256,
- "sparsemem": True,
- "startup_config": "ios_base_startup-config.txt",
- "symbol": ":/symbols/router.svg",
- "system_id": "FTX0945W0MY"}
+ expected_response = {"template_type": "dynamips",
+ "auto_delete_disks": False,
+ "builtin": False,
+ "category": "router",
+ "compute_id": "local",
+ "console_auto_start": False,
+ "console_type": "telnet",
+ "default_name_format": "R{0}",
+ "disk0": 0,
+ "disk1": 0,
+ "exec_area": 64,
+ "idlemax": 500,
+ "idlepc": "",
+ "idlesleep": 30,
+ "image": "c3745-adventerprisek9-mz.124-25d.image",
+ "mac_addr": "",
+ "mmap": True,
+ "name": "Cisco c3745 template",
+ "iomem": 5,
+ "nvram": 256,
+ "platform": "c3745",
+ "private_config": "",
+ "ram": 256,
+ "sparsemem": True,
+ "startup_config": "ios_base_startup-config.txt",
+ "symbol": ":/symbols/router.svg",
+ "system_id": "FTX0945W0MY"}
- for item, value in expected_response.items():
- assert response.json().get(item) == value
+ for item, value in expected_response.items():
+ assert response.json().get(item) == value
async def test_c3725_dynamips_template_create(self, app: FastAPI, client: AsyncClient) -> None:
@@ -298,40 +340,42 @@ class TestDynamipsTemplate:
"image": "c3725-adventerprisek9-mz.124-25d.image",
"template_type": "dynamips"}
- response = await client.post(app.url_path_for("create_template"), json=params)
- assert response.status_code == status.HTTP_201_CREATED
- assert response.json()["template_id"] is not None
+ with asyncio_patch("gns3server.services.templates.TemplatesService._find_images", return_value=[]) as mock:
+ response = await client.post(app.url_path_for("create_template"), json=params)
+ assert mock.called
+ assert response.status_code == status.HTTP_201_CREATED
+ assert response.json()["template_id"] is not None
- expected_response = {"template_type": "dynamips",
- "auto_delete_disks": False,
- "builtin": False,
- "category": "router",
- "compute_id": "local",
- "console_auto_start": False,
- "console_type": "telnet",
- "default_name_format": "R{0}",
- "disk0": 0,
- "disk1": 0,
- "exec_area": 64,
- "idlemax": 500,
- "idlepc": "",
- "idlesleep": 30,
- "image": "c3725-adventerprisek9-mz.124-25d.image",
- "mac_addr": "",
- "mmap": True,
- "name": "Cisco c3725 template",
- "iomem": 5,
- "nvram": 256,
- "platform": "c3725",
- "private_config": "",
- "ram": 128,
- "sparsemem": True,
- "startup_config": "ios_base_startup-config.txt",
- "symbol": ":/symbols/router.svg",
- "system_id": "FTX0945W0MY"}
+ expected_response = {"template_type": "dynamips",
+ "auto_delete_disks": False,
+ "builtin": False,
+ "category": "router",
+ "compute_id": "local",
+ "console_auto_start": False,
+ "console_type": "telnet",
+ "default_name_format": "R{0}",
+ "disk0": 0,
+ "disk1": 0,
+ "exec_area": 64,
+ "idlemax": 500,
+ "idlepc": "",
+ "idlesleep": 30,
+ "image": "c3725-adventerprisek9-mz.124-25d.image",
+ "mac_addr": "",
+ "mmap": True,
+ "name": "Cisco c3725 template",
+ "iomem": 5,
+ "nvram": 256,
+ "platform": "c3725",
+ "private_config": "",
+ "ram": 128,
+ "sparsemem": True,
+ "startup_config": "ios_base_startup-config.txt",
+ "symbol": ":/symbols/router.svg",
+ "system_id": "FTX0945W0MY"}
- for item, value in expected_response.items():
- assert response.json().get(item) == value
+ for item, value in expected_response.items():
+ assert response.json().get(item) == value
async def test_c3600_dynamips_template_create(self, app: FastAPI, client: AsyncClient) -> None:
@@ -342,41 +386,43 @@ class TestDynamipsTemplate:
"image": "c3660-a3jk9s-mz.124-25d.image",
"template_type": "dynamips"}
- response = await client.post(app.url_path_for("create_template"), json=params)
- assert response.status_code == status.HTTP_201_CREATED
- assert response.json()["template_id"] is not None
+ with asyncio_patch("gns3server.services.templates.TemplatesService._find_images", return_value=[]) as mock:
+ response = await client.post(app.url_path_for("create_template"), json=params)
+ assert mock.called
+ assert response.status_code == status.HTTP_201_CREATED
+ assert response.json()["template_id"] is not None
- expected_response = {"template_type": "dynamips",
- "auto_delete_disks": False,
- "builtin": False,
- "category": "router",
- "compute_id": "local",
- "console_auto_start": False,
- "console_type": "telnet",
- "default_name_format": "R{0}",
- "disk0": 0,
- "disk1": 0,
- "exec_area": 64,
- "idlemax": 500,
- "idlepc": "",
- "idlesleep": 30,
- "image": "c3660-a3jk9s-mz.124-25d.image",
- "mac_addr": "",
- "mmap": True,
- "name": "Cisco c3600 template",
- "iomem": 5,
- "nvram": 128,
- "platform": "c3600",
- "chassis": "3660",
- "private_config": "",
- "ram": 192,
- "sparsemem": True,
- "startup_config": "ios_base_startup-config.txt",
- "symbol": ":/symbols/router.svg",
- "system_id": "FTX0945W0MY"}
+ expected_response = {"template_type": "dynamips",
+ "auto_delete_disks": False,
+ "builtin": False,
+ "category": "router",
+ "compute_id": "local",
+ "console_auto_start": False,
+ "console_type": "telnet",
+ "default_name_format": "R{0}",
+ "disk0": 0,
+ "disk1": 0,
+ "exec_area": 64,
+ "idlemax": 500,
+ "idlepc": "",
+ "idlesleep": 30,
+ "image": "c3660-a3jk9s-mz.124-25d.image",
+ "mac_addr": "",
+ "mmap": True,
+ "name": "Cisco c3600 template",
+ "iomem": 5,
+ "nvram": 128,
+ "platform": "c3600",
+ "chassis": "3660",
+ "private_config": "",
+ "ram": 192,
+ "sparsemem": True,
+ "startup_config": "ios_base_startup-config.txt",
+ "symbol": ":/symbols/router.svg",
+ "system_id": "FTX0945W0MY"}
- for item, value in expected_response.items():
- assert response.json().get(item) == value
+ for item, value in expected_response.items():
+ assert response.json().get(item) == value
async def test_c3600_dynamips_template_create_wrong_chassis(self, app: FastAPI, client: AsyncClient) -> None:
@@ -398,40 +444,42 @@ class TestDynamipsTemplate:
"image": "c2691-adventerprisek9-mz.124-25d.image",
"template_type": "dynamips"}
- response = await client.post(app.url_path_for("create_template"), json=params)
- assert response.status_code == status.HTTP_201_CREATED
- assert response.json()["template_id"] is not None
+ with asyncio_patch("gns3server.services.templates.TemplatesService._find_images", return_value=[]) as mock:
+ response = await client.post(app.url_path_for("create_template"), json=params)
+ assert mock.called
+ assert response.status_code == status.HTTP_201_CREATED
+ assert response.json()["template_id"] is not None
- expected_response = {"template_type": "dynamips",
- "auto_delete_disks": False,
- "builtin": False,
- "category": "router",
- "compute_id": "local",
- "console_auto_start": False,
- "console_type": "telnet",
- "default_name_format": "R{0}",
- "disk0": 0,
- "disk1": 0,
- "exec_area": 64,
- "idlemax": 500,
- "idlepc": "",
- "idlesleep": 30,
- "image": "c2691-adventerprisek9-mz.124-25d.image",
- "mac_addr": "",
- "mmap": True,
- "name": "Cisco c2691 template",
- "iomem": 5,
- "nvram": 256,
- "platform": "c2691",
- "private_config": "",
- "ram": 192,
- "sparsemem": True,
- "startup_config": "ios_base_startup-config.txt",
- "symbol": ":/symbols/router.svg",
- "system_id": "FTX0945W0MY"}
+ expected_response = {"template_type": "dynamips",
+ "auto_delete_disks": False,
+ "builtin": False,
+ "category": "router",
+ "compute_id": "local",
+ "console_auto_start": False,
+ "console_type": "telnet",
+ "default_name_format": "R{0}",
+ "disk0": 0,
+ "disk1": 0,
+ "exec_area": 64,
+ "idlemax": 500,
+ "idlepc": "",
+ "idlesleep": 30,
+ "image": "c2691-adventerprisek9-mz.124-25d.image",
+ "mac_addr": "",
+ "mmap": True,
+ "name": "Cisco c2691 template",
+ "iomem": 5,
+ "nvram": 256,
+ "platform": "c2691",
+ "private_config": "",
+ "ram": 192,
+ "sparsemem": True,
+ "startup_config": "ios_base_startup-config.txt",
+ "symbol": ":/symbols/router.svg",
+ "system_id": "FTX0945W0MY"}
- for item, value in expected_response.items():
- assert response.json().get(item) == value
+ for item, value in expected_response.items():
+ assert response.json().get(item) == value
async def test_c2600_dynamips_template_create(self, app: FastAPI, client: AsyncClient) -> None:
@@ -442,41 +490,43 @@ class TestDynamipsTemplate:
"image": "c2600-adventerprisek9-mz.124-25d.image",
"template_type": "dynamips"}
- response = await client.post(app.url_path_for("create_template"), json=params)
- assert response.status_code == status.HTTP_201_CREATED
- assert response.json()["template_id"] is not None
+ with asyncio_patch("gns3server.services.templates.TemplatesService._find_images", return_value=[]) as mock:
+ response = await client.post(app.url_path_for("create_template"), json=params)
+ assert mock.called
+ assert response.status_code == status.HTTP_201_CREATED
+ assert response.json()["template_id"] is not None
- expected_response = {"template_type": "dynamips",
- "auto_delete_disks": False,
- "builtin": False,
- "category": "router",
- "compute_id": "local",
- "console_auto_start": False,
- "console_type": "telnet",
- "default_name_format": "R{0}",
- "disk0": 0,
- "disk1": 0,
- "exec_area": 64,
- "idlemax": 500,
- "idlepc": "",
- "idlesleep": 30,
- "image": "c2600-adventerprisek9-mz.124-25d.image",
- "mac_addr": "",
- "mmap": True,
- "name": "Cisco c2600 template",
- "iomem": 15,
- "nvram": 128,
- "platform": "c2600",
- "chassis": "2651XM",
- "private_config": "",
- "ram": 160,
- "sparsemem": True,
- "startup_config": "ios_base_startup-config.txt",
- "symbol": ":/symbols/router.svg",
- "system_id": "FTX0945W0MY"}
+ expected_response = {"template_type": "dynamips",
+ "auto_delete_disks": False,
+ "builtin": False,
+ "category": "router",
+ "compute_id": "local",
+ "console_auto_start": False,
+ "console_type": "telnet",
+ "default_name_format": "R{0}",
+ "disk0": 0,
+ "disk1": 0,
+ "exec_area": 64,
+ "idlemax": 500,
+ "idlepc": "",
+ "idlesleep": 30,
+ "image": "c2600-adventerprisek9-mz.124-25d.image",
+ "mac_addr": "",
+ "mmap": True,
+ "name": "Cisco c2600 template",
+ "iomem": 15,
+ "nvram": 128,
+ "platform": "c2600",
+ "chassis": "2651XM",
+ "private_config": "",
+ "ram": 160,
+ "sparsemem": True,
+ "startup_config": "ios_base_startup-config.txt",
+ "symbol": ":/symbols/router.svg",
+ "system_id": "FTX0945W0MY"}
- for item, value in expected_response.items():
- assert response.json().get(item) == value
+ for item, value in expected_response.items():
+ assert response.json().get(item) == value
async def test_c2600_dynamips_template_create_wrong_chassis(self, app: FastAPI, client: AsyncClient) -> None:
@@ -499,41 +549,43 @@ class TestDynamipsTemplate:
"image": "c1700-adventerprisek9-mz.124-25d.image",
"template_type": "dynamips"}
- response = await client.post(app.url_path_for("create_template"), json=params)
- assert response.status_code == status.HTTP_201_CREATED
- assert response.json()["template_id"] is not None
+ with asyncio_patch("gns3server.services.templates.TemplatesService._find_images", return_value=[]) as mock:
+ response = await client.post(app.url_path_for("create_template"), json=params)
+ assert mock.called
+ assert response.status_code == status.HTTP_201_CREATED
+ assert response.json()["template_id"] is not None
- expected_response = {"template_type": "dynamips",
- "auto_delete_disks": False,
- "builtin": False,
- "category": "router",
- "compute_id": "local",
- "console_auto_start": False,
- "console_type": "telnet",
- "default_name_format": "R{0}",
- "disk0": 0,
- "disk1": 0,
- "exec_area": 64,
- "idlemax": 500,
- "idlepc": "",
- "idlesleep": 30,
- "image": "c1700-adventerprisek9-mz.124-25d.image",
- "mac_addr": "",
- "mmap": True,
- "name": "Cisco c1700 template",
- "iomem": 15,
- "nvram": 128,
- "platform": "c1700",
- "chassis": "1760",
- "private_config": "",
- "ram": 160,
- "sparsemem": False,
- "startup_config": "ios_base_startup-config.txt",
- "symbol": ":/symbols/router.svg",
- "system_id": "FTX0945W0MY"}
+ expected_response = {"template_type": "dynamips",
+ "auto_delete_disks": False,
+ "builtin": False,
+ "category": "router",
+ "compute_id": "local",
+ "console_auto_start": False,
+ "console_type": "telnet",
+ "default_name_format": "R{0}",
+ "disk0": 0,
+ "disk1": 0,
+ "exec_area": 64,
+ "idlemax": 500,
+ "idlepc": "",
+ "idlesleep": 30,
+ "image": "c1700-adventerprisek9-mz.124-25d.image",
+ "mac_addr": "",
+ "mmap": True,
+ "name": "Cisco c1700 template",
+ "iomem": 15,
+ "nvram": 128,
+ "platform": "c1700",
+ "chassis": "1760",
+ "private_config": "",
+ "ram": 160,
+ "sparsemem": False,
+ "startup_config": "ios_base_startup-config.txt",
+ "symbol": ":/symbols/router.svg",
+ "system_id": "FTX0945W0MY"}
- for item, value in expected_response.items():
- assert response.json().get(item) == value
+ for item, value in expected_response.items():
+ assert response.json().get(item) == value
async def test_c1700_dynamips_template_create_wrong_chassis(self, app: FastAPI, client: AsyncClient) -> None:
@@ -569,31 +621,33 @@ class TestIOUTemplate:
"path": image_path,
"template_type": "iou"}
- response = await client.post(app.url_path_for("create_template"), json=params)
- assert response.status_code == status.HTTP_201_CREATED
- assert response.json()["template_id"] is not None
+ with asyncio_patch("gns3server.services.templates.TemplatesService._find_images", return_value=[]) as mock:
+ response = await client.post(app.url_path_for("create_template"), json=params)
+ assert mock.called
+ assert response.status_code == status.HTTP_201_CREATED
+ assert response.json()["template_id"] is not None
- expected_response = {"template_type": "iou",
- "builtin": False,
- "category": "router",
- "compute_id": "local",
- "console_auto_start": False,
- "console_type": "telnet",
- "default_name_format": "IOU{0}",
- "ethernet_adapters": 2,
- "name": "IOU template",
- "nvram": 128,
- "path": image_path,
- "private_config": "",
- "ram": 256,
- "serial_adapters": 2,
- "startup_config": "iou_l3_base_startup-config.txt",
- "symbol": ":/symbols/multilayer_switch.svg",
- "use_default_iou_values": True,
- "l1_keepalives": False}
+ expected_response = {"template_type": "iou",
+ "builtin": False,
+ "category": "router",
+ "compute_id": "local",
+ "console_auto_start": False,
+ "console_type": "telnet",
+ "default_name_format": "IOU{0}",
+ "ethernet_adapters": 2,
+ "name": "IOU template",
+ "nvram": 128,
+ "path": image_path,
+ "private_config": "",
+ "ram": 256,
+ "serial_adapters": 2,
+ "startup_config": "iou_l3_base_startup-config.txt",
+ "symbol": ":/symbols/multilayer_switch.svg",
+ "use_default_iou_values": True,
+ "l1_keepalives": False}
- for item, value in expected_response.items():
- assert response.json().get(item) == value
+ for item, value in expected_response.items():
+ assert response.json().get(item) == value
class TestDockerTemplate:
@@ -643,54 +697,57 @@ class TestQemuTemplate:
"ram": 512,
"template_type": "qemu"}
- response = await client.post(app.url_path_for("create_template"), json=params)
- assert response.status_code == status.HTTP_201_CREATED
- assert response.json()["template_id"] is not None
+ with asyncio_patch("gns3server.services.templates.TemplatesService._find_images", return_value=[]) as mock:
+ response = await client.post(app.url_path_for("create_template"), json=params)
+ assert mock.called
+ assert response.status_code == status.HTTP_201_CREATED
+ assert response.json()["template_id"] is not None
- expected_response = {"adapter_type": "e1000",
- "adapters": 1,
- "template_type": "qemu",
- "bios_image": "",
- "boot_priority": "c",
- "builtin": False,
- "category": "guest",
- "cdrom_image": "",
- "compute_id": "local",
- "console_auto_start": False,
- "console_type": "telnet",
- "cpu_throttling": 0,
- "cpus": 1,
- "default_name_format": "{name}-{0}",
- "first_port_name": "",
- "hda_disk_image": "IOSvL2-15.2.4.0.55E.qcow2",
- "hda_disk_interface": "none",
- "hdb_disk_image": "",
- "hdb_disk_interface": "none",
- "hdc_disk_image": "",
- "hdc_disk_interface": "none",
- "hdd_disk_image": "",
- "hdd_disk_interface": "none",
- "initrd": "",
- "kernel_command_line": "",
- "kernel_image": "",
- "legacy_networking": False,
- "linked_clone": True,
- "mac_address": "",
- "name": "Qemu template",
- "on_close": "power_off",
- "options": "",
- "platform": "i386",
- "port_name_format": "Ethernet{0}",
- "port_segment_size": 0,
- "process_priority": "normal",
- "qemu_path": "",
- "ram": 512,
- "symbol": ":/symbols/qemu_guest.svg",
- "usage": "",
- "custom_adapters": []}
+ expected_response = {"adapter_type": "e1000",
+ "adapters": 1,
+ "template_type": "qemu",
+ "bios_image": "",
+ "boot_priority": "c",
+ "builtin": False,
+ "category": "guest",
+ "cdrom_image": "",
+ "compute_id": "local",
+ "console_auto_start": False,
+ "console_type": "telnet",
+ "cpu_throttling": 0,
+ "cpus": 1,
+ "default_name_format": "{name}-{0}",
+ "first_port_name": "",
+ "hda_disk_image": "IOSvL2-15.2.4.0.55E.qcow2",
+ "hda_disk_interface": "none",
+ "hdb_disk_image": "",
+ "hdb_disk_interface": "none",
+ "hdc_disk_image": "",
+ "hdc_disk_interface": "none",
+ "hdd_disk_image": "",
+ "hdd_disk_interface": "none",
+ "initrd": "",
+ "kernel_command_line": "",
+ "kernel_image": "",
+ "legacy_networking": False,
+ "linked_clone": True,
+ "mac_address": "",
+ "name": "Qemu template",
+ "on_close": "power_off",
+ "options": "",
+ "platform": "i386",
+ "port_name_format": "Ethernet{0}",
+ "port_segment_size": 0,
+ "process_priority": "normal",
+ "qemu_path": "",
+ "ram": 512,
+ "symbol": ":/symbols/qemu_guest.svg",
+ "usage": "",
+ "custom_adapters": []}
+
+ for item, value in expected_response.items():
+ assert response.json().get(item) == value
- for item, value in expected_response.items():
- assert response.json().get(item) == value
class TestVMwareTemplate:
@@ -944,3 +1001,235 @@ class TestCloudTemplate:
for item, value in expected_response.items():
assert response.json().get(item) == value
+
+
+class TestImageAssociationWithTemplate:
+
+ @pytest.mark.parametrize(
+ "image_name, image_type, params",
+ (
+ (
+ "c7200-adventerprisek9-mz.124-24.T5.image",
+ "ios",
+ {
+ "template_id": "6d85c8db-640f-4547-8955-bc132f7d7196",
+ "name": "Cisco c7200 template",
+ "platform": "c7200",
+ "compute_id": "local",
+ "image": "",
+ "template_type": "dynamips"
+ }
+ ),
+ (
+ "i86bi_linux-ipbase-ms-12.4.bin",
+ "iou",
+ {
+ "template_id": "0014185e-bdfe-454b-86cd-9009c23900c5",
+ "name": "IOU template",
+ "compute_id": "local",
+ "path": "",
+ "template_type": "iou"
+ }
+ ),
+ (
+ "image.qcow2",
+ "qemu",
+ {
+ "template_id": "97ef56a5-7ae4-4795-ad4c-e7dcdd745cff",
+ "name": "Qemu template",
+ "compute_id": "local",
+ "platform": "i386",
+ "hda_disk_image": "",
+ "hdb_disk_image": "",
+ "hdc_disk_image": "",
+ "hdd_disk_image": "",
+ "cdrom_image": "",
+ "kernel_image": "",
+ "bios_image": "",
+ "ram": 512,
+ "template_type": "qemu"
+ }
+ ),
+ ),
+ )
+ async def test_template_create_with_images(
+ self,
+ app: FastAPI,
+ client: AsyncClient,
+ db_session: AsyncSession,
+ tmpdir: str,
+ image_name: str,
+ image_type: str,
+ params: dict
+ ) -> None:
+
+ path = os.path.join(tmpdir, image_name)
+ with open(path, "wb+") as f:
+ f.write(b'\x42\x42\x42\x42')
+ images_repo = ImagesRepository(db_session)
+ await images_repo.add_image(image_name, image_type, 42, path, "e342eb86c1229b6c154367a5476969b5", "md5")
+ for key, value in params.items():
+ if value == "":
+ params[key] = image_name
+ response = await client.post(app.url_path_for("create_template"), json=params)
+ assert response.status_code == status.HTTP_201_CREATED
+
+ templates_repo = TemplatesRepository(db_session)
+ db_template = await templates_repo.get_template(uuid.UUID(params["template_id"]))
+ assert len(db_template.images) == 1
+ assert db_template.images[0].filename == image_name
+
+ @pytest.mark.parametrize(
+ "image_name, image_type, template_id, params",
+ (
+ (
+ "c7200-adventerprisek9-mz.155-2.XB.image",
+ "ios",
+ "6d85c8db-640f-4547-8955-bc132f7d7196",
+ {
+ "image": "",
+ }
+ ),
+ (
+ "i86bi-linux-l2-adventerprisek9-15.2d.bin",
+ "iou",
+ "0014185e-bdfe-454b-86cd-9009c23900c5",
+ {
+ "path": "",
+ }
+ ),
+ (
+ "new_image.qcow2",
+ "qemu",
+ "97ef56a5-7ae4-4795-ad4c-e7dcdd745cff",
+ {
+ "hda_disk_image": "",
+ "hdb_disk_image": "",
+ "hdc_disk_image": "",
+ "hdd_disk_image": "",
+ "cdrom_image": "",
+ "kernel_image": "",
+ "bios_image": "",
+ }
+ ),
+ ),
+ )
+ async def test_template_update_with_images(
+ self,
+ app: FastAPI,
+ client: AsyncClient,
+ db_session: AsyncSession,
+ tmpdir: str,
+ image_name: str,
+ image_type: str,
+ template_id: str,
+ params: dict
+ ) -> None:
+
+ path = os.path.join(tmpdir, image_name)
+ with open(path, "wb+") as f:
+ f.write(b'\x42\x42\x42\x42')
+ images_repo = ImagesRepository(db_session)
+ await images_repo.add_image(image_name, image_type, 42, path, "e342eb86c1229b6c154367a5476969b5", "md5")
+
+ for key, value in params.items():
+ if value == "":
+ params[key] = image_name
+ response = await client.put(app.url_path_for("update_template", template_id=template_id), json=params)
+ assert response.status_code == status.HTTP_200_OK
+
+ templates_repo = TemplatesRepository(db_session)
+ db_template = await templates_repo.get_template(uuid.UUID(template_id))
+ assert len(db_template.images) == 1
+ assert db_template.images[0].filename == image_name
+
+ @pytest.mark.parametrize(
+ "template_id, params",
+ (
+ (
+ "6d85c8db-640f-4547-8955-bc132f7d7196",
+ {
+ "image": "",
+ }
+ ),
+ (
+ "0014185e-bdfe-454b-86cd-9009c23900c5",
+ {
+ "path": "",
+ }
+ ),
+ (
+ "97ef56a5-7ae4-4795-ad4c-e7dcdd745cff",
+ {
+ "hda_disk_image": "",
+ "hdb_disk_image": "",
+ "hdc_disk_image": "",
+ "hdd_disk_image": "",
+ "cdrom_image": "",
+ "kernel_image": "",
+ "bios_image": "",
+ }
+ ),
+ ),
+ )
+ async def test_remove_images_from_template(
+ self,
+ app: FastAPI,
+ client: AsyncClient,
+ db_session: AsyncSession,
+ template_id: str,
+ params: dict
+ ) -> None:
+
+ for key, value in params.items():
+ if value == "":
+ params[key] = ""
+ response = await client.put(app.url_path_for("update_template", template_id=template_id), json=params)
+ assert response.status_code == status.HTTP_200_OK
+
+ templates_repo = TemplatesRepository(db_session)
+ db_template = await templates_repo.get_template(uuid.UUID(template_id))
+ assert len(db_template.images) == 0
+
+ async def test_template_create_with_image_in_subdir(
+ self,
+ app: FastAPI,
+ client: AsyncClient,
+ db_session: AsyncSession,
+ tmpdir: str,
+ ) -> None:
+
+ params = {"name": "Qemu template",
+ "compute_id": "local",
+ "platform": "i386",
+ "hda_disk_image": "subdir/image.qcow2",
+ "ram": 512,
+ "template_type": "qemu"}
+
+ path = os.path.join(tmpdir, "subdir", "image.qcow2")
+ os.makedirs(os.path.dirname(path))
+ with open(path, "wb+") as f:
+ f.write(b'\x42\x42\x42\x42')
+ images_repo = ImagesRepository(db_session)
+ await images_repo.add_image("image.qcow2", "qemu", 42, path, "e342eb86c1229b6c154367a5476969b5", "md5")
+
+ response = await client.post(app.url_path_for("create_template"), json=params)
+ assert response.status_code == status.HTTP_201_CREATED
+ template_id = response.json()["template_id"]
+
+ templates_repo = TemplatesRepository(db_session)
+ db_template = await templates_repo.get_template(template_id)
+ assert len(db_template.images) == 1
+ assert db_template.images[0].path.endswith("subdir/image.qcow2")
+
+ async def test_template_create_with_non_existing_image(self, app: FastAPI, client: AsyncClient) -> None:
+
+ params = {"name": "Qemu template",
+ "compute_id": "local",
+ "platform": "i386",
+ "hda_disk_image": "unkown_image.qcow2",
+ "ram": 512,
+ "template_type": "qemu"}
+
+ response = await client.post(app.url_path_for("create_template"), json=params)
+ assert response.status_code == status.HTTP_404_NOT_FOUND