mirror of
https://github.com/GNS3/gns3-server
synced 2024-11-24 09:18:08 +00:00
parent
28533cfdac
commit
61cf91d83c
@ -748,6 +748,9 @@ class QemuVM(BaseVM):
|
||||
Changes the process priority
|
||||
"""
|
||||
|
||||
if self._process_priority == "normal":
|
||||
return
|
||||
|
||||
if sys.platform.startswith("win"):
|
||||
try:
|
||||
import win32api
|
||||
@ -756,7 +759,7 @@ class QemuVM(BaseVM):
|
||||
except ImportError:
|
||||
log.error("pywin32 must be installed to change the priority class for QEMU VM {}".format(self._name))
|
||||
else:
|
||||
log.info("setting QEMU VM {} priority class to BELOW_NORMAL".format(self._name))
|
||||
log.info("Setting QEMU VM {} priority class to {}".format(self._name, self._process_priority))
|
||||
handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, 0, self._process.pid)
|
||||
if self._process_priority == "realtime":
|
||||
priority = win32process.REALTIME_PRIORITY_CLASS
|
||||
|
@ -16,6 +16,7 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
import os
|
||||
from tests.utils import asyncio_patch
|
||||
from unittest.mock import patch
|
||||
@ -76,6 +77,7 @@ def test_vpcs_nio_create_udp(server, vm):
|
||||
assert response.json["type"] == "nio_udp"
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
|
||||
def test_vpcs_nio_create_tap(server, vm, ethernet_device):
|
||||
with patch("gns3server.modules.base_manager.BaseManager.has_privileged_access", return_value=True):
|
||||
response = server.post("/projects/{project_id}/vpcs/vms/{vm_id}/adapters/0/ports/0/nio".format(project_id=vm["project_id"], vm_id=vm["vm_id"]), {"type": "nio_tap",
|
||||
|
@ -31,6 +31,7 @@ from unittest.mock import patch, MagicMock
|
||||
from gns3server.modules.qemu.qemu_vm import QemuVM
|
||||
from gns3server.modules.qemu.qemu_error import QemuError
|
||||
from gns3server.modules.qemu import Qemu
|
||||
from gns3server.utils import force_unix_path
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
@ -66,7 +67,9 @@ def fake_qemu_binary():
|
||||
@pytest.fixture(scope="function")
|
||||
def vm(project, manager, fake_qemu_binary, fake_qemu_img_binary):
|
||||
manager.port_manager.console_host = "127.0.0.1"
|
||||
return QemuVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", project, manager, qemu_path=fake_qemu_binary)
|
||||
vm = QemuVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", project, manager, qemu_path=fake_qemu_binary)
|
||||
vm._process_priority = "normal" # Avoid complexity for Windows tests
|
||||
return vm
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
@ -175,6 +178,7 @@ def test_add_nio_binding_udp(vm, loop):
|
||||
assert nio.lport == 4242
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
|
||||
def test_add_nio_binding_ethernet(vm, loop, ethernet_device):
|
||||
with patch("gns3server.modules.base_manager.BaseManager.has_privileged_access", return_value=True):
|
||||
nio = Qemu.instance().create_nio(vm.qemu_path, {"type": "nio_generic_ethernet", "ethernet_device": ethernet_device})
|
||||
@ -254,6 +258,7 @@ def test_set_qemu_path_windows(vm, tmpdir):
|
||||
assert vm.platform == "x86_64"
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
|
||||
def test_set_qemu_path_kvm_binary(vm, tmpdir, fake_qemu_binary):
|
||||
|
||||
bin_path = os.path.join(os.environ["PATH"], "qemu-kvm")
|
||||
@ -299,12 +304,23 @@ def test_set_process_priority(vm, loop, fake_qemu_img_binary):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
vm._process = MagicMock()
|
||||
vm._process.pid = 42
|
||||
vm._process_priority = "low"
|
||||
loop.run_until_complete(asyncio.async(vm._set_process_priority()))
|
||||
assert process.called
|
||||
args, kwargs = process.call_args
|
||||
assert args == ("renice", "-n", "5", "-p", "42")
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
|
||||
def test_set_process_priority_normal(vm, loop, fake_qemu_img_binary):
|
||||
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
vm._process = MagicMock()
|
||||
vm._process.pid = 42
|
||||
loop.run_until_complete(asyncio.async(vm._set_process_priority()))
|
||||
assert not process.called
|
||||
|
||||
|
||||
def test_json(vm, project):
|
||||
|
||||
json = vm.__json__()
|
||||
@ -388,9 +404,9 @@ def test_hda_disk_image(vm, tmpdir):
|
||||
vm.manager.config.set("Server", "images_path", str(tmpdir))
|
||||
|
||||
vm.hda_disk_image = str(tmpdir / "test")
|
||||
assert vm.hda_disk_image == str(tmpdir / "test")
|
||||
assert vm.hda_disk_image == force_unix_path(str(tmpdir / "test"))
|
||||
vm.hda_disk_image = "test"
|
||||
assert vm.hda_disk_image == str(tmpdir / "QEMU" / "test")
|
||||
assert vm.hda_disk_image == force_unix_path(str(tmpdir / "QEMU" / "test"))
|
||||
|
||||
|
||||
def test_hda_disk_image_ova(vm, tmpdir):
|
||||
@ -398,7 +414,7 @@ def test_hda_disk_image_ova(vm, tmpdir):
|
||||
vm.manager.config.set("Server", "images_path", str(tmpdir))
|
||||
|
||||
vm.hda_disk_image = "test.ovf/test.vmdk"
|
||||
assert vm.hda_disk_image == str(tmpdir / "QEMU" / "test.ovf" / "test.vmdk")
|
||||
assert vm.hda_disk_image == force_unix_path(str(tmpdir / "QEMU" / "test.ovf" / "test.vmdk"))
|
||||
|
||||
|
||||
def test_hdb_disk_image(vm, tmpdir):
|
||||
@ -406,9 +422,9 @@ def test_hdb_disk_image(vm, tmpdir):
|
||||
vm.manager.config.set("Server", "images_path", str(tmpdir))
|
||||
|
||||
vm.hdb_disk_image = str(tmpdir / "test")
|
||||
assert vm.hdb_disk_image == str(tmpdir / "test")
|
||||
assert vm.hdb_disk_image == force_unix_path(str(tmpdir / "test"))
|
||||
vm.hdb_disk_image = "test"
|
||||
assert vm.hdb_disk_image == str(tmpdir / "QEMU" / "test")
|
||||
assert vm.hdb_disk_image == force_unix_path(str(tmpdir / "QEMU" / "test"))
|
||||
|
||||
|
||||
def test_hdc_disk_image(vm, tmpdir):
|
||||
@ -416,9 +432,9 @@ def test_hdc_disk_image(vm, tmpdir):
|
||||
vm.manager.config.set("Server", "images_path", str(tmpdir))
|
||||
|
||||
vm.hdc_disk_image = str(tmpdir / "test")
|
||||
assert vm.hdc_disk_image == str(tmpdir / "test")
|
||||
assert vm.hdc_disk_image == force_unix_path(str(tmpdir / "test"))
|
||||
vm.hdc_disk_image = "test"
|
||||
assert vm.hdc_disk_image == str(tmpdir / "QEMU" / "test")
|
||||
assert vm.hdc_disk_image == force_unix_path(str(tmpdir / "QEMU" / "test"))
|
||||
|
||||
|
||||
def test_hdd_disk_image(vm, tmpdir):
|
||||
@ -426,9 +442,9 @@ def test_hdd_disk_image(vm, tmpdir):
|
||||
vm.manager.config.set("Server", "images_path", str(tmpdir))
|
||||
|
||||
vm.hdd_disk_image = str(tmpdir / "test")
|
||||
assert vm.hdd_disk_image == str(tmpdir / "test")
|
||||
assert vm.hdd_disk_image == force_unix_path(str(tmpdir / "test"))
|
||||
vm.hdd_disk_image = "test"
|
||||
assert vm.hdd_disk_image == str(tmpdir / "QEMU" / "test")
|
||||
assert vm.hdd_disk_image == force_unix_path(str(tmpdir / "QEMU" / "test"))
|
||||
|
||||
|
||||
def test_options(linux_platform, vm):
|
||||
|
@ -211,6 +211,7 @@ def test_add_nio_binding_udp(vm):
|
||||
assert nio.lport == 4242
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
|
||||
def test_add_nio_binding_tap(vm, ethernet_device):
|
||||
with patch("gns3server.modules.base_manager.BaseManager.has_privileged_access", return_value=True):
|
||||
nio = VPCS.instance().create_nio(vm.vpcs_path, {"type": "nio_tap", "tap_device": ethernet_device})
|
||||
|
Loading…
Reference in New Issue
Block a user