diff --git a/gns3server/db/tasks.py b/gns3server/db/tasks.py index 95880a76..16fc74cd 100644 --- a/gns3server/db/tasks.py +++ b/gns3server/db/tasks.py @@ -16,13 +16,11 @@ # along with this program. If not, see . import asyncio -import signal +import time import os from fastapi import FastAPI from pydantic import ValidationError -from watchfiles import awatch, Change - from typing import List from sqlalchemy import event from sqlalchemy.engine import Engine @@ -32,10 +30,12 @@ from alembic import command, config from alembic.script import ScriptDirectory from alembic.runtime.migration import MigrationContext from alembic.util.exc import CommandError +from watchdog.observers import Observer +from watchdog.events import FileSystemEvent, PatternMatchingEventHandler from gns3server.db.repositories.computes import ComputesRepository from gns3server.db.repositories.images import ImagesRepository -from gns3server.utils.images import discover_images, check_valid_image_header, read_image_info, default_images_directory, InvalidImageError +from gns3server.utils.images import discover_images, read_image_info, default_images_directory, InvalidImageError from gns3server import schemas from .models import Base @@ -130,80 +130,6 @@ async def get_computes(app: FastAPI) -> List[dict]: return computes -def image_filter(change: Change, path: str) -> bool: - - if change == Change.added and os.path.isfile(path): - if path.endswith(".tmp") or path.endswith(".md5sum") or path.startswith("."): - return False - if "/lib/" in path or "/lib64/" in path: - # ignore custom IOU libraries - return False - header_magic_len = 7 - with open(path, "rb") as f: - image_header = f.read(header_magic_len) # read the first 7 bytes of the file - if len(image_header) >= header_magic_len: - try: - check_valid_image_header(image_header) - except InvalidImageError as e: - log.debug(f"New image '{path}': {e}") - return False - else: - log.debug(f"New image '{path}': size is too small to be valid") - return False - return True - # FIXME: should we support image deletion? - # elif change == Change.deleted: - # return True - return False - - -async def monitor_images_on_filesystem(app: FastAPI): - - directories_to_monitor = [] - for image_type in ("qemu", "ios", "iou"): - image_dir = default_images_directory(image_type) - if os.path.isdir(image_dir): - log.debug(f"Monitoring for new images in '{image_dir}'") - directories_to_monitor.append(image_dir) - - try: - async for changes in awatch( - *directories_to_monitor, - watch_filter=image_filter, - raise_interrupt=True - ): - async with AsyncSession(app.state._db_engine) as db_session: - images_repository = ImagesRepository(db_session) - for change in changes: - change_type, image_path = change - if change_type == Change.added: - try: - image = await read_image_info(image_path) - except InvalidImageError as e: - log.warning(str(e)) - continue - try: - if await images_repository.get_image(image_path): - continue - await images_repository.add_image(**image) - log.info(f"Discovered image '{image_path}' has been added to the database") - except SQLAlchemyError as e: - log.warning(f"Error while adding image '{image_path}' to the database: {e}") - # if change_type == Change.deleted: - # try: - # if await images_repository.get_image(image_path): - # success = await images_repository.delete_image(image_path) - # if not success: - # log.warning(f"Could not delete image '{image_path}' from the database") - # else: - # log.info(f"Image '{image_path}' has been deleted from the database") - # except SQLAlchemyError as e: - # log.warning(f"Error while deleting image '{image_path}' from the database: {e}") - except KeyboardInterrupt: - # send SIGTERM to the server PID so uvicorn can shutdown the process - os.kill(os.getpid(), signal.SIGTERM) - - async def discover_images_on_filesystem(app: FastAPI): async with AsyncSession(app.state._db_engine) as db_session: @@ -228,3 +154,99 @@ async def discover_images_on_filesystem(app: FastAPI): # monitor if images have been manually added asyncio.create_task(monitor_images_on_filesystem(app)) + +class EventHandler(PatternMatchingEventHandler): + """ + Watchdog event handler. + """ + + def __init__(self, queue: asyncio.Queue, loop: asyncio.BaseEventLoop, **kwargs): + + self._loop = loop + self._queue = queue + + # ignore temporary files, md5sum files, hidden files and directories + super().__init__(ignore_patterns=["*.tmp", "*.md5sum", ".*"], ignore_directories = True, **kwargs) + + def on_closed(self, event: FileSystemEvent) -> None: + # monitor for closed files (e.g. when a file has finished to be copied) + if "/lib/" in event.src_path or "/lib64/" in event.src_path: + return # ignore custom IOU libraries + self._loop.call_soon_threadsafe(self._queue.put_nowait, event) + +class EventIterator(object): + """ + Watchdog Event iterator. + """ + + def __init__(self, queue: asyncio.Queue): + self.queue = queue + + def __aiter__(self): + return self + + async def __anext__(self): + + item = await self.queue.get() + if item is None: + raise StopAsyncIteration + return item + +async def monitor_images_on_filesystem(app: FastAPI): + + def watchdog( + path: str, + queue: asyncio.Queue, + loop: asyncio.BaseEventLoop, + app: FastAPI, recursive: bool = False + ) -> None: + """ + Thread to monitor a directory for new images. + """ + + handler = EventHandler(queue, loop) + observer = Observer() + observer.schedule(handler, str(path), recursive=recursive) + observer.start() + log.info(f"Monitoring for new images in '{path}'") + while True: + time.sleep(1) + # stop when the app is exiting + if app.state.exiting: + observer.stop() + observer.join() + log.info(f"Stopping monitoring for new images in '{path}'") + loop.call_soon_threadsafe(queue.put_nowait, None) + break + + queue = asyncio.Queue() + loop = asyncio.get_event_loop() + server_config = Config.instance().settings.Server + image_dir = os.path.expanduser(server_config.images_path) + asyncio.get_event_loop().run_in_executor(None, watchdog,image_dir, queue, loop, app, True) + + async for filesystem_event in EventIterator(queue): + # read the file system event from the queue + print(filesystem_event) + image_path = filesystem_event.src_path + expected_image_type = None + if "IOU" in image_path: + expected_image_type = "iou" + elif "QEMU" in image_path: + expected_image_type = "qemu" + elif "IOS" in image_path: + expected_image_type = "ios" + async with AsyncSession(app.state._db_engine) as db_session: + images_repository = ImagesRepository(db_session) + try: + image = await read_image_info(image_path, expected_image_type) + except InvalidImageError as e: + log.warning(str(e)) + continue + try: + if await images_repository.get_image(image_path): + continue + await images_repository.add_image(**image) + log.info(f"Discovered image '{image_path}' has been added to the database") + except SQLAlchemyError as e: + log.warning(f"Error while adding image '{image_path}' to the database: {e}") diff --git a/requirements.txt b/requirements.txt index 54a12e6e..3a9657f9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,7 +18,7 @@ alembic==1.14.0 bcrypt==4.2.1 joserfc==1.0.1 email-validator==2.2.0 -watchfiles==1.0.3 +watchdog==6.0.0 zstandard==0.23.0 platformdirs>=2.4.0,<3 # platformdirs >=3 conflicts when building Debian packages importlib-resources>=1.3; python_version <= '3.9' diff --git a/tests/conftest.py b/tests/conftest.py index 6849f794..30b3a6b0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -400,10 +400,12 @@ def run_around_tests(monkeypatch, config, port_manager): config.settings.VMware.vmrun_path = tmppath config.settings.Dynamips.dynamips_path = tmppath - # Force turn off KVM because it's not available on CI config.settings.Qemu.enable_hardware_acceleration = False + # avoid monitoring for new images while testing + config.settings.Server.auto_discover_images = False + monkeypatch.setattr("gns3server.utils.path.get_default_project_directory", lambda *args: os.path.join(tmppath, 'projects')) # Force sys.platform to the original value. Because it seems not be restored correctly after each test