mirror of
https://github.com/GNS3/gns3-server
synced 2024-11-28 03:08:14 +00:00
Generic class for watch file change
This commit is contained in:
parent
1c6de3ff39
commit
182a979e71
@ -24,6 +24,8 @@ import os
|
||||
import configparser
|
||||
import asyncio
|
||||
|
||||
from .utils.file_watcher import FileWatcher
|
||||
|
||||
import logging
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@ -100,24 +102,13 @@ class Config(object):
|
||||
self.read_config()
|
||||
|
||||
def _watch_config_file(self):
|
||||
asyncio.get_event_loop().call_later(1, self._check_config_file_change)
|
||||
for file in self._files:
|
||||
self._watched_files[file] = FileWatcher(file, self._config_file_change)
|
||||
|
||||
def _check_config_file_change(self):
|
||||
"""
|
||||
Check if configuration file has changed on the disk
|
||||
"""
|
||||
changed = False
|
||||
for file in self._watched_files:
|
||||
try:
|
||||
if os.stat(file).st_mtime != self._watched_files[file]:
|
||||
changed = True
|
||||
except OSError:
|
||||
continue
|
||||
if changed:
|
||||
def _config_file_change(self, path):
|
||||
self.read_config()
|
||||
for section in self._override_config:
|
||||
self.set_section_config(section, self._override_config[section])
|
||||
self._watch_config_file()
|
||||
|
||||
def reload(self):
|
||||
"""
|
||||
|
68
gns3server/utils/file_watcher.py
Normal file
68
gns3server/utils/file_watcher.py
Normal file
@ -0,0 +1,68 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (C) 2016 GNS3 Technologies Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
|
||||
class FileWatcher:
|
||||
"""
|
||||
Watch for file change and call the callback when something happen
|
||||
"""
|
||||
def __init__(self, path, callback, delay=1):
|
||||
if not isinstance(path, str):
|
||||
path = str(path)
|
||||
self._path = path
|
||||
self._callback = callback
|
||||
self._delay = delay
|
||||
self._closed = False
|
||||
|
||||
try:
|
||||
self._mtime = os.stat(path).st_mtime
|
||||
except OSError:
|
||||
self._mtime = None
|
||||
asyncio.get_event_loop().call_later(self._delay, self._check_config_file_change)
|
||||
|
||||
def __del__(self):
|
||||
self._closed = True
|
||||
|
||||
def close(self):
|
||||
self._closed = True
|
||||
|
||||
def _check_config_file_change(self):
|
||||
if self._closed:
|
||||
return
|
||||
changed = False
|
||||
try:
|
||||
if os.stat(self._path).st_mtime != self._mtime:
|
||||
changed = True
|
||||
self._mtime = os.stat(self._path).st_mtime
|
||||
except OSError:
|
||||
self._mtime = None
|
||||
if changed:
|
||||
self._callback(self._path)
|
||||
asyncio.get_event_loop().call_later(self._delay, self._check_config_file_change)
|
||||
|
||||
@property
|
||||
def callback(self):
|
||||
return self._callback
|
||||
|
||||
@callback.setter
|
||||
def callback(self, val):
|
||||
self._callback = val
|
||||
|
||||
|
46
tests/utils/test_file_watcher.py
Normal file
46
tests/utils/test_file_watcher.py
Normal file
@ -0,0 +1,46 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (C) 2016 GNS3 Technologies Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
|
||||
from gns3server.utils.file_watcher import FileWatcher
|
||||
|
||||
|
||||
def test_file_watcher(async_run, tmpdir):
|
||||
file = tmpdir / "test"
|
||||
file.write("a")
|
||||
callback = MagicMock()
|
||||
fw = FileWatcher(file, callback, delay=0.5)
|
||||
async_run(asyncio.sleep(1))
|
||||
assert not callback.called
|
||||
file.write("b")
|
||||
async_run(asyncio.sleep(1.5))
|
||||
callback.assert_called_with(str(file))
|
||||
|
||||
|
||||
def test_file_watcher_not_existing(async_run, tmpdir):
|
||||
file = tmpdir / "test"
|
||||
callback = MagicMock()
|
||||
fw = FileWatcher(file, callback, delay=0.5)
|
||||
async_run(asyncio.sleep(1))
|
||||
assert not callback.called
|
||||
file.write("b")
|
||||
async_run(asyncio.sleep(1.5))
|
||||
callback.assert_called_with(str(file))
|
||||
|
Loading…
Reference in New Issue
Block a user