diff --git a/gns3server/compute/base_manager.py b/gns3server/compute/base_manager.py index 40e2c007..a96fe2e7 100644 --- a/gns3server/compute/base_manager.py +++ b/gns3server/compute/base_manager.py @@ -506,7 +506,7 @@ class BaseManager: """ try: - return list_images(self._NODE_TYPE) + return await list_images(self._NODE_TYPE) except OSError as e: raise ComputeError(f"Can not list images {e}") diff --git a/gns3server/compute/qemu/qemu_vm.py b/gns3server/compute/qemu/qemu_vm.py index 90381d6b..313a5c32 100644 --- a/gns3server/compute/qemu/qemu_vm.py +++ b/gns3server/compute/qemu/qemu_vm.py @@ -1057,10 +1057,10 @@ class QemuVM(BaseNode): # In case user upload image manually we don't have md5 sums. # We need generate hashes at this point, otherwise they will be generated # at asdict but not on separate thread. - await cancellable_wait_run_in_executor(md5sum, self._hda_disk_image) - await cancellable_wait_run_in_executor(md5sum, self._hdb_disk_image) - await cancellable_wait_run_in_executor(md5sum, self._hdc_disk_image) - await cancellable_wait_run_in_executor(md5sum, self._hdd_disk_image) + await cancellable_wait_run_in_executor(md5sum, self._hda_disk_image, self.working_dir) + await cancellable_wait_run_in_executor(md5sum, self._hdb_disk_image, self.working_dir) + await cancellable_wait_run_in_executor(md5sum, self._hdc_disk_image, self.working_dir) + await cancellable_wait_run_in_executor(md5sum, self._hdd_disk_image, self.working_dir) super().create() diff --git a/gns3server/controller/appliance_manager.py b/gns3server/controller/appliance_manager.py index b7874deb..17d8374a 100644 --- a/gns3server/controller/appliance_manager.py +++ b/gns3server/controller/appliance_manager.py @@ -153,8 +153,14 @@ class ApplianceManager: version_images[appliance_key] = image_in_db.filename else: # check if the image is on disk + # FIXME: still necessary? the image should have been discovered and saved in the db already image_path = os.path.join(image_dir, appliance_file) - if os.path.exists(image_path) and await wait_run_in_executor(md5sum, image_path) == image_checksum: + if os.path.exists(image_path) and \ + await wait_run_in_executor( + md5sum, + image_path, + cache_to_md5file=False + ) == image_checksum: async with aiofiles.open(image_path, "rb") as f: await write_image(appliance_file, image_path, f, images_repo) else: diff --git a/gns3server/controller/compute.py b/gns3server/controller/compute.py index 0f19585b..d8caa17c 100644 --- a/gns3server/controller/compute.py +++ b/gns3server/controller/compute.py @@ -630,9 +630,6 @@ class Compute: try: if type in ["qemu", "dynamips", "iou"]: - # for local_image in list_images(type): - # if local_image['filename'] not in [i['filename'] for i in images]: - # images.append(local_image) images = sorted(images, key=itemgetter("filename")) else: images = sorted(images, key=itemgetter("image")) diff --git a/gns3server/core/tasks.py b/gns3server/core/tasks.py index 5519b808..6d363baa 100644 --- a/gns3server/core/tasks.py +++ b/gns3server/core/tasks.py @@ -24,7 +24,8 @@ from gns3server.controller import Controller from gns3server.compute import MODULES from gns3server.compute.port_manager import PortManager from gns3server.utils.http_client import HTTPClient -from gns3server.db.tasks import connect_to_db, disconnect_from_db, get_computes +from gns3server.db.tasks import connect_to_db, get_computes, disconnect_from_db, discover_images_on_filesystem + import logging @@ -59,7 +60,9 @@ def create_startup_handler(app: FastAPI) -> Callable: # computing with server start from gns3server.compute.qemu import Qemu - asyncio.ensure_future(Qemu.instance().list_images()) + # Start the discovering new images on file system 5 seconds after the server has started + # to give it a chance to process API requests + loop.call_later(5, asyncio.create_task, discover_images_on_filesystem(app)) for module in MODULES: log.debug(f"Loading module {module.__name__}") diff --git a/gns3server/db/tasks.py b/gns3server/db/tasks.py index 46f5a2b7..99853993 100644 --- a/gns3server/db/tasks.py +++ b/gns3server/db/tasks.py @@ -15,11 +15,13 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import asyncio +import signal import os from fastapi import FastAPI -from fastapi.encoders import jsonable_encoder from pydantic import ValidationError +from watchfiles import awatch, Change from typing import List from sqlalchemy import event @@ -27,6 +29,8 @@ from sqlalchemy.engine import Engine from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from gns3server.db.repositories.computes import ComputesRepository +from gns3server.db.repositories.images import ImagesRepository +from gns3server.utils.images import discover_images, check_valid_image_header, read_image_info, InvalidImageError from gns3server import schemas from .models import Base @@ -82,3 +86,94 @@ async def get_computes(app: FastAPI) -> List[dict]: continue computes.append(compute) return computes + + +def image_filter(change: Change, path: str) -> bool: + + if change == Change.added: + header_magic_len = 7 + with open(path, "rb") as f: + image_header = f.read(header_magic_len) # read the first 7 bytes of the file + if len(image_header) >= header_magic_len: + try: + check_valid_image_header(image_header) + except InvalidImageError as e: + log.debug(f"New image '{path}' added: {e}") + return False + else: + log.debug(f"New image '{path}' added: size is too small to be valid") + return False + return True + # FIXME: should we support image deletion? + # elif change == Change.deleted: + # return True + return False + + +async def monitor_images_on_filesystem(app: FastAPI): + + server_config = Config.instance().settings.Server + images_dir = os.path.expanduser(server_config.images_path) + + try: + async for changes in awatch( + images_dir, + watch_filter=image_filter, + raise_interrupt=True + ): + async with AsyncSession(app.state._db_engine) as db_session: + images_repository = ImagesRepository(db_session) + for change in changes: + change_type, image_path = change + if change_type == Change.added: + try: + image = await read_image_info(image_path) + except InvalidImageError as e: + log.warning(str(e)) + continue + try: + if await images_repository.get_image(image_path): + continue + await images_repository.add_image(**image) + log.info(f"Discovered image '{image_path}' has been added to the database") + except SQLAlchemyError as e: + log.warning(f"Error while adding image '{image_path}' to the database: {e}") + # if change_type == Change.deleted: + # try: + # if await images_repository.get_image(image_path): + # success = await images_repository.delete_image(image_path) + # if not success: + # log.warning(f"Could not delete image '{image_path}' from the database") + # else: + # log.info(f"Image '{image_path}' has been deleted from the database") + # except SQLAlchemyError as e: + # log.warning(f"Error while deleting image '{image_path}' from the database: {e}") + except KeyboardInterrupt: + # send SIGTERM to the server PID so uvicorn can shutdown the process + os.kill(os.getpid(), signal.SIGTERM) + + +async def discover_images_on_filesystem(app: FastAPI): + + async with AsyncSession(app.state._db_engine) as db_session: + images_repository = ImagesRepository(db_session) + db_images = await images_repository.get_images() + existing_image_paths = [] + for db_image in db_images: + try: + image = schemas.Image.from_orm(db_image) + existing_image_paths.append(image.path) + except ValidationError as e: + log.error(f"Could not load image '{db_image.filename}' from database: {e}") + continue + for image_type in ("qemu", "ios", "iou"): + discovered_images = await discover_images(image_type, existing_image_paths) + for image in discovered_images: + log.info(f"Adding discovered image '{image['path']}' to the database") + try: + await images_repository.add_image(**image) + except SQLAlchemyError as e: + log.warning(f"Error while adding image '{image['path']}' to the database: {e}") + + # monitor if images have been manually added + asyncio.create_task(monitor_images_on_filesystem(app)) diff --git a/gns3server/utils/images.py b/gns3server/utils/images.py index d1d9eecd..28f24a1a 100644 --- a/gns3server/utils/images.py +++ b/gns3server/utils/images.py @@ -20,19 +20,20 @@ import stat import aiofiles import shutil -from typing import AsyncGenerator +from typing import List, AsyncGenerator from ..config import Config from . import force_unix_path import gns3server.db.models as models from gns3server.db.repositories.images import ImagesRepository +from gns3server.utils.asyncio import wait_run_in_executor import logging log = logging.getLogger(__name__) -def list_images(image_type): +async def list_images(image_type): """ Scan directories for available image for a given type. @@ -59,7 +60,6 @@ def list_images(image_type): directory = os.path.normpath(directory) for root, _, filenames in _os_walk(directory, recurse=recurse): for filename in filenames: - path = os.path.join(root, filename) if filename not in files: if filename.endswith(".md5sum") or filename.startswith("."): continue @@ -92,7 +92,7 @@ def list_images(image_type): { "filename": filename, "path": force_unix_path(path), - "md5sum": md5sum(os.path.join(root, filename)), + "md5sum": await wait_run_in_executor(md5sum, os.path.join(root, filename)), "filesize": os.stat(os.path.join(root, filename)).st_size, } ) @@ -101,6 +101,59 @@ def list_images(image_type): return images +async def read_image_info(path: str, expected_image_type: str = None) -> dict: + + header_magic_len = 7 + try: + async with aiofiles.open(path, "rb") as f: + image_header = await f.read(header_magic_len) # read the first 7 bytes of the file + if len(image_header) >= header_magic_len: + detected_image_type = check_valid_image_header(image_header) + if expected_image_type and detected_image_type != expected_image_type: + raise InvalidImageError(f"Detected image type for '{path}' is {detected_image_type}, " + f"expected type is {expected_image_type}") + else: + raise InvalidImageError(f"Image '{path}' is too small to be valid") + except OSError as e: + raise InvalidImageError(f"Cannot read image '{path}': {e}") + + image_info = { + "image_name": os.path.basename(path), + "image_type": detected_image_type, + "image_size": os.stat(path).st_size, + "path": path, + "checksum": await wait_run_in_executor(md5sum, path, cache_to_md5file=False), + "checksum_algorithm": "md5", + } + return image_info + + +async def discover_images(image_type: str, skip_image_paths: list = None) -> List[dict]: + """ + Scan directories for available images + """ + + files = set() + images = [] + + for directory in images_directories(image_type): + for root, _, filenames in os.walk(os.path.normpath(directory)): + for filename in filenames: + if filename.endswith(".md5sum") or filename.startswith("."): + continue + path = os.path.join(root, filename) + if not os.path.isfile(path) or skip_image_paths and path in skip_image_paths or path in files: + continue + files.add(path) + + try: + images.append(await read_image_info(path, image_type)) + except InvalidImageError as e: + log.debug(str(e)) + continue + return images + + def _os_walk(directory, recurse=True, **kwargs): """ Work like os.walk but if recurse is False just list current directory @@ -133,18 +186,18 @@ def default_images_directory(image_type): raise NotImplementedError(f"%s node type is not supported", image_type) -def images_directories(type): +def images_directories(image_type): """ Return all directories where we will look for images by priority - :param type: Type of emulator + :param image_type: Type of emulator """ server_config = Config.instance().settings.Server paths = [] img_dir = os.path.expanduser(server_config.images_path) - type_img_directory = default_images_directory(type) + type_img_directory = default_images_directory(image_type) try: os.makedirs(type_img_directory, exist_ok=True) paths.append(type_img_directory) @@ -158,7 +211,7 @@ def images_directories(type): return [force_unix_path(p) for p in paths if os.path.exists(p)] -def md5sum(path, working_dir=None, stopped_event=None): +def md5sum(path, working_dir=None, stopped_event=None, cache_to_md5file=True): """ Return the md5sum of an image and cache it on disk @@ -193,7 +246,7 @@ def md5sum(path, working_dir=None, stopped_event=None): if stopped_event is not None and stopped_event.is_set(): log.error(f"MD5 sum calculation of `{path}` has stopped due to cancellation") return - buf = f.read(128) + buf = f.read(1024) if not buf: break m.update(buf) @@ -202,11 +255,12 @@ def md5sum(path, working_dir=None, stopped_event=None): log.error("Can't create digest of %s: %s", path, str(e)) return None - try: - with open(md5sum_file, "w+") as f: - f.write(digest) - except OSError as e: - log.error("Can't write digest of %s: %s", path, str(e)) + if cache_to_md5file: + try: + with open(md5sum_file, "w+") as f: + f.write(digest) + except OSError as e: + log.error("Can't write digest of %s: %s", path, str(e)) return digest @@ -237,10 +291,11 @@ def check_valid_image_header(data: bytes) -> str: # for IOS images: file must start with the ELF magic number, be 32-bit, big endian and have an ELF version of 1 return "ios" elif data[:7] == b'\x7fELF\x01\x01\x01' or data[:7] == b'\x7fELF\x02\x01\x01': - # for IOU images file must start with the ELF magic number, be 32-bit or 64-bit, little endian and + # for IOU images: file must start with the ELF magic number, be 32-bit or 64-bit, little endian and # have an ELF version of 1 (normal IOS images are big endian!) return "iou" - elif data[:4] != b'QFI\xfb' or data[:4] != b'KDMV': + elif data[:4] == b'QFI\xfb' or data[:4] == b'KDMV': + # for Qemy images: file must be QCOW2 or VMDK return "qemu" else: raise InvalidImageError("Could not detect image type, please make sure it is a valid image") diff --git a/requirements.txt b/requirements.txt index f1496974..3041dd05 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,4 +15,5 @@ aiosqlite===0.17.0 passlib[bcrypt]==1.7.4 python-jose==3.3.0 email-validator==1.1.3 +watchfiles==0.13 setuptools==60.6.0 # don't upgrade because of https://github.com/pypa/setuptools/issues/3084 diff --git a/tests/utils/test_images.py b/tests/utils/test_images.py index 5dd2270b..10c83521 100644 --- a/tests/utils/test_images.py +++ b/tests/utils/test_images.py @@ -18,6 +18,7 @@ import os import sys import threading +import pytest from unittest.mock import patch @@ -110,7 +111,8 @@ def test_remove_checksum(tmpdir): remove_checksum(str(tmpdir / 'not_exists')) -def test_list_images(tmpdir, config): +@pytest.mark.asyncio +async def test_list_images(tmpdir, config): path1 = tmpdir / "images1" / "IOS" / "test1.image" path1.write(b'\x7fELF\x01\x02\x01', ensure=True) @@ -140,7 +142,7 @@ def test_list_images(tmpdir, config): config.settings.Server.images_path = str(tmpdir / "images1") config.settings.Server.additional_images_paths = "/tmp/null24564;" + str(tmpdir / "images2") - assert list_images("dynamips") == [ + assert await list_images("dynamips") == [ { 'filename': 'test1.image', 'filesize': 7, @@ -156,7 +158,7 @@ def test_list_images(tmpdir, config): ] if sys.platform.startswith("linux"): - assert list_images("iou") == [ + assert await list_images("iou") == [ { 'filename': 'test3.bin', 'filesize': 7, @@ -165,7 +167,7 @@ def test_list_images(tmpdir, config): } ] - assert list_images("qemu") == [ + assert await list_images("qemu") == [ { 'filename': 'test4.qcow2', 'filesize': 1,