|
|
|
@ -19,9 +19,21 @@ import os
|
|
|
|
|
import stat
|
|
|
|
|
import asyncio
|
|
|
|
|
import sys
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
|
|
from gns3server.modules.qemu import Qemu
|
|
|
|
|
from tests.utils import asyncio_patch
|
|
|
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def fake_qemu_img_binary(tmpdir):
|
|
|
|
|
|
|
|
|
|
bin_path = str(tmpdir / "qemu-img")
|
|
|
|
|
with open(bin_path, "w+") as f:
|
|
|
|
|
f.write("1")
|
|
|
|
|
os.chmod(bin_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
|
|
|
|
|
return bin_path
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_qemu_version(loop):
|
|
|
|
@ -57,6 +69,7 @@ def test_binary_list(loop):
|
|
|
|
|
assert {"path": os.path.join(os.environ["PATH"], "qemu-system-x42"), "version": version} in qemus
|
|
|
|
|
assert {"path": os.path.join(os.environ["PATH"], "hello"), "version": version} not in qemus
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_img_binary_list(loop):
|
|
|
|
|
|
|
|
|
|
files_to_create = ["qemu-img", "qemu-io", "qemu-system-x86", "qemu-system-x42", "qemu-kvm", "hello"]
|
|
|
|
@ -83,3 +96,52 @@ def test_img_binary_list(loop):
|
|
|
|
|
def test_get_legacy_vm_workdir():
|
|
|
|
|
|
|
|
|
|
assert Qemu.get_legacy_vm_workdir(42, "bla") == os.path.join("qemu", "vm-42")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_create_image_abs_path(loop, tmpdir, fake_qemu_img_binary):
|
|
|
|
|
options = {
|
|
|
|
|
"format": "qcow2",
|
|
|
|
|
"preallocation": "metadata",
|
|
|
|
|
"cluster_size": 64,
|
|
|
|
|
"refcount_bits": 12,
|
|
|
|
|
"lazy_refcounts": "off",
|
|
|
|
|
"size": 100
|
|
|
|
|
}
|
|
|
|
|
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
|
|
|
|
loop.run_until_complete(asyncio.async(Qemu.instance().create_disk(fake_qemu_img_binary, str(tmpdir / "hda.qcow2"), options)))
|
|
|
|
|
args, kwargs = process.call_args
|
|
|
|
|
assert args == (
|
|
|
|
|
fake_qemu_img_binary,
|
|
|
|
|
"create",
|
|
|
|
|
"-f",
|
|
|
|
|
"qcow2",
|
|
|
|
|
"-o",
|
|
|
|
|
"cluster_size=64",
|
|
|
|
|
"-o",
|
|
|
|
|
"lazy_refcounts=off",
|
|
|
|
|
"-o",
|
|
|
|
|
"preallocation=metadata",
|
|
|
|
|
"-o",
|
|
|
|
|
"refcount_bits=12",
|
|
|
|
|
str(tmpdir / "hda.qcow2"),
|
|
|
|
|
"100M"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_create_image_relative_path(loop, tmpdir, fake_qemu_img_binary):
|
|
|
|
|
options = {
|
|
|
|
|
"format": "raw",
|
|
|
|
|
"size": 100
|
|
|
|
|
}
|
|
|
|
|
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
|
|
|
|
with patch("gns3server.modules.qemu.Qemu.get_images_directory", return_value=str(tmpdir)):
|
|
|
|
|
loop.run_until_complete(asyncio.async(Qemu.instance().create_disk(fake_qemu_img_binary, "hda.qcow2", options)))
|
|
|
|
|
args, kwargs = process.call_args
|
|
|
|
|
assert args == (
|
|
|
|
|
fake_qemu_img_binary,
|
|
|
|
|
"create",
|
|
|
|
|
"-f",
|
|
|
|
|
"raw",
|
|
|
|
|
str(tmpdir / "hda.qcow2"),
|
|
|
|
|
"100M"
|
|
|
|
|
)
|
|
|
|
|