From 31a2cb998db48da3c1958871c5c3ae4751580798 Mon Sep 17 00:00:00 2001 From: grossmj Date: Sun, 17 Nov 2024 14:39:22 +1000 Subject: [PATCH 1/3] Fix issue with asyncio.Queue which is not thread safe. --- gns3server/compute/notification_manager.py | 5 +++-- gns3server/controller/notification.py | 9 +++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/gns3server/compute/notification_manager.py b/gns3server/compute/notification_manager.py index 82388b0a..b18e92d2 100644 --- a/gns3server/compute/notification_manager.py +++ b/gns3server/compute/notification_manager.py @@ -15,7 +15,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . - +import asyncio from contextlib import contextmanager from gns3server.utils.notification_queue import NotificationQueue @@ -28,6 +28,7 @@ class NotificationManager: def __init__(self): self._listeners = set() + self._loop = asyncio.get_event_loop() @contextmanager def queue(self): @@ -54,7 +55,7 @@ class NotificationManager: """ for listener in self._listeners: - listener.put_nowait((action, event, kwargs)) + self._loop.call_soon_threadsafe(listener.put_nowait, (action, event, kwargs)) @staticmethod def reset(): diff --git a/gns3server/controller/notification.py b/gns3server/controller/notification.py index 48d5a3d2..4fa25e76 100644 --- a/gns3server/controller/notification.py +++ b/gns3server/controller/notification.py @@ -15,7 +15,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os +import asyncio from contextlib import contextmanager from gns3server.utils.notification_queue import NotificationQueue @@ -32,6 +32,7 @@ class Notification: self._controller = controller self._project_listeners = {} self._controller_listeners = set() + self._loop = asyncio.get_event_loop() @contextmanager def project_queue(self, project_id): @@ -73,7 +74,7 @@ class Notification: """ for controller_listener in self._controller_listeners: - controller_listener.put_nowait((action, event, {})) + self._loop.call_soon_threadsafe(controller_listener.put_nowait, (action, event, {})) def project_has_listeners(self, project_id): """ @@ -134,7 +135,7 @@ class Notification: except KeyError: return for listener in project_listeners: - listener.put_nowait((action, event, {})) + self._loop.call_soon_threadsafe(listener.put_nowait, (action, event, {})) def _send_event_to_all_projects(self, action, event): """ @@ -146,4 +147,4 @@ class Notification: """ for project_listeners in self._project_listeners.values(): for listener in project_listeners: - listener.put_nowait((action, event, {})) + self._loop.call_soon_threadsafe(listener.put_nowait, (action, event, {})) From b37db57bb938ec23386499660b9b96295d037843 Mon Sep 17 00:00:00 2001 From: grossmj Date: Sun, 17 Nov 2024 15:00:13 +1000 Subject: [PATCH 2/3] Fix tests --- gns3server/compute/notification_manager.py | 3 +-- gns3server/controller/notification.py | 7 +++---- tests/compute/qemu/test_qemu_vm.py | 4 ++-- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/gns3server/compute/notification_manager.py b/gns3server/compute/notification_manager.py index b18e92d2..22955b17 100644 --- a/gns3server/compute/notification_manager.py +++ b/gns3server/compute/notification_manager.py @@ -28,7 +28,6 @@ class NotificationManager: def __init__(self): self._listeners = set() - self._loop = asyncio.get_event_loop() @contextmanager def queue(self): @@ -55,7 +54,7 @@ class NotificationManager: """ for listener in self._listeners: - self._loop.call_soon_threadsafe(listener.put_nowait, (action, event, kwargs)) + asyncio.get_event_loop().call_soon(listener.put_nowait, (action, event, kwargs)) @staticmethod def reset(): diff --git a/gns3server/controller/notification.py b/gns3server/controller/notification.py index 4fa25e76..19672fb0 100644 --- a/gns3server/controller/notification.py +++ b/gns3server/controller/notification.py @@ -32,7 +32,6 @@ class Notification: self._controller = controller self._project_listeners = {} self._controller_listeners = set() - self._loop = asyncio.get_event_loop() @contextmanager def project_queue(self, project_id): @@ -74,7 +73,7 @@ class Notification: """ for controller_listener in self._controller_listeners: - self._loop.call_soon_threadsafe(controller_listener.put_nowait, (action, event, {})) + asyncio.get_event_loop().call_soon_threadsafe(controller_listener.put_nowait, (action, event, {})) def project_has_listeners(self, project_id): """ @@ -135,7 +134,7 @@ class Notification: except KeyError: return for listener in project_listeners: - self._loop.call_soon_threadsafe(listener.put_nowait, (action, event, {})) + asyncio.get_event_loop().call_soon_threadsafe(listener.put_nowait, (action, event, {})) def _send_event_to_all_projects(self, action, event): """ @@ -147,4 +146,4 @@ class Notification: """ for project_listeners in self._project_listeners.values(): for listener in project_listeners: - self._loop.call_soon_threadsafe(listener.put_nowait, (action, event, {})) + asyncio.get_event_loop().call_soon_threadsafe(listener.put_nowait, (action, event, {})) diff --git a/tests/compute/qemu/test_qemu_vm.py b/tests/compute/qemu/test_qemu_vm.py index c07e9051..d714d442 100644 --- a/tests/compute/qemu/test_qemu_vm.py +++ b/tests/compute/qemu/test_qemu_vm.py @@ -202,11 +202,11 @@ async def test_termination_callback_error(vm, tmpdir): await queue.get(1) # Ping - (action, event, kwargs) = queue.get_nowait() + (action, event, kwargs) = await queue.get(1) assert action == "node.updated" assert event == vm - (action, event, kwargs) = queue.get_nowait() + (action, event, kwargs) = await queue.get(1) assert action == "log.error" assert event["message"] == "QEMU process has stopped, return code: 1\nBOOMM" From fa0d7d7529e7ec45760abd37ed03e4198c5c4fc8 Mon Sep 17 00:00:00 2001 From: Jeremy Grossmann Date: Mon, 18 Nov 2024 12:13:41 +1000 Subject: [PATCH 3/3] Use call_soon_threadsafe() in notification manager --- gns3server/compute/notification_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gns3server/compute/notification_manager.py b/gns3server/compute/notification_manager.py index 22955b17..44c9ca39 100644 --- a/gns3server/compute/notification_manager.py +++ b/gns3server/compute/notification_manager.py @@ -54,7 +54,7 @@ class NotificationManager: """ for listener in self._listeners: - asyncio.get_event_loop().call_soon(listener.put_nowait, (action, event, kwargs)) + asyncio.get_event_loop().call_soon_threadsafe(listener.put_nowait, (action, event, kwargs)) @staticmethod def reset():