|
|
|
@ -29,6 +29,7 @@ from tests.utils import AsyncioMagicMock, AsyncioBytesIO
|
|
|
|
|
|
|
|
|
|
from gns3server.controller.project import Project
|
|
|
|
|
from gns3server.controller.export_project import export_project, _is_exportable
|
|
|
|
|
from gns3server.utils.asyncio import aiozipstream
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
@ -51,6 +52,13 @@ def node(controller, project, async_run):
|
|
|
|
|
return node
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def write_file(path, z):
|
|
|
|
|
|
|
|
|
|
with open(path, 'wb') as f:
|
|
|
|
|
async for chunk in z:
|
|
|
|
|
f.write(chunk)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_exportable_files():
|
|
|
|
|
assert _is_exportable("hello/world")
|
|
|
|
|
assert not _is_exportable("project-files/tmp")
|
|
|
|
@ -103,12 +111,10 @@ def test_export(tmpdir, project, async_run):
|
|
|
|
|
with open(os.path.join(path, "project-files", "snapshots", "test"), 'w+') as f:
|
|
|
|
|
f.write("WORLD")
|
|
|
|
|
|
|
|
|
|
with patch("gns3server.compute.Dynamips.get_images_directory", return_value=str(tmpdir / "IOS"),):
|
|
|
|
|
z = async_run(export_project(project, str(tmpdir), include_images=False))
|
|
|
|
|
|
|
|
|
|
with open(str(tmpdir / 'zipfile.zip'), 'wb') as f:
|
|
|
|
|
for data in z:
|
|
|
|
|
f.write(data)
|
|
|
|
|
with aiozipstream.ZipFile() as z:
|
|
|
|
|
with patch("gns3server.compute.Dynamips.get_images_directory", return_value=str(tmpdir / "IOS"),):
|
|
|
|
|
async_run(export_project(z, project, str(tmpdir), include_images=False))
|
|
|
|
|
async_run(write_file(str(tmpdir / 'zipfile.zip'), z))
|
|
|
|
|
|
|
|
|
|
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
|
|
|
|
|
with myzip.open("vm-1/dynamips/test") as myfile:
|
|
|
|
@ -128,7 +134,7 @@ def test_export(tmpdir, project, async_run):
|
|
|
|
|
assert topo["computes"] == []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_export_vm(tmpdir, project, async_run, controller):
|
|
|
|
|
def test_export_vm(tmpdir, project, async_run):
|
|
|
|
|
"""
|
|
|
|
|
If data is on a remote server export it locally before
|
|
|
|
|
sending it in the archive.
|
|
|
|
@ -154,12 +160,10 @@ def test_export_vm(tmpdir, project, async_run, controller):
|
|
|
|
|
with open(os.path.join(path, "test.gns3"), 'w+') as f:
|
|
|
|
|
f.write("{}")
|
|
|
|
|
|
|
|
|
|
z = async_run(export_project(project, str(tmpdir)))
|
|
|
|
|
assert compute.list_files.called
|
|
|
|
|
|
|
|
|
|
with open(str(tmpdir / 'zipfile.zip'), 'wb') as f:
|
|
|
|
|
for data in z:
|
|
|
|
|
f.write(data)
|
|
|
|
|
with aiozipstream.ZipFile() as z:
|
|
|
|
|
async_run(export_project(z, project, str(tmpdir)))
|
|
|
|
|
assert compute.list_files.called
|
|
|
|
|
async_run(write_file(str(tmpdir / 'zipfile.zip'), z))
|
|
|
|
|
|
|
|
|
|
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
|
|
|
|
|
with myzip.open("vm-1/dynamips/test") as myfile:
|
|
|
|
@ -169,7 +173,7 @@ def test_export_vm(tmpdir, project, async_run, controller):
|
|
|
|
|
|
|
|
|
|
def test_export_disallow_running(tmpdir, project, node, async_run):
|
|
|
|
|
"""
|
|
|
|
|
Dissallow export when a node is running
|
|
|
|
|
Disallow export when a node is running
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
path = project.path
|
|
|
|
@ -189,12 +193,13 @@ def test_export_disallow_running(tmpdir, project, node, async_run):
|
|
|
|
|
|
|
|
|
|
node._status = "started"
|
|
|
|
|
with pytest.raises(aiohttp.web.HTTPConflict):
|
|
|
|
|
async_run(export_project(project, str(tmpdir)))
|
|
|
|
|
with aiozipstream.ZipFile() as z:
|
|
|
|
|
async_run(export_project(z, project, str(tmpdir)))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_export_disallow_some_type(tmpdir, project, async_run):
|
|
|
|
|
"""
|
|
|
|
|
Dissalow export for some node type
|
|
|
|
|
Disallow export for some node type
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
path = project.path
|
|
|
|
@ -213,8 +218,10 @@ def test_export_disallow_some_type(tmpdir, project, async_run):
|
|
|
|
|
json.dump(topology, f)
|
|
|
|
|
|
|
|
|
|
with pytest.raises(aiohttp.web.HTTPConflict):
|
|
|
|
|
z = async_run(export_project(project, str(tmpdir)))
|
|
|
|
|
z = async_run(export_project(project, str(tmpdir), allow_all_nodes=True))
|
|
|
|
|
with aiozipstream.ZipFile() as z:
|
|
|
|
|
async_run(export_project(z, project, str(tmpdir)))
|
|
|
|
|
with aiozipstream.ZipFile() as z:
|
|
|
|
|
async_run(export_project(z, project, str(tmpdir), allow_all_nodes=True))
|
|
|
|
|
|
|
|
|
|
# VirtualBox is always disallowed
|
|
|
|
|
topology = {
|
|
|
|
@ -232,7 +239,8 @@ def test_export_disallow_some_type(tmpdir, project, async_run):
|
|
|
|
|
with open(os.path.join(path, "test.gns3"), 'w+') as f:
|
|
|
|
|
json.dump(topology, f)
|
|
|
|
|
with pytest.raises(aiohttp.web.HTTPConflict):
|
|
|
|
|
z = async_run(export_project(project, str(tmpdir), allow_all_nodes=True))
|
|
|
|
|
with aiozipstream.ZipFile() as z:
|
|
|
|
|
async_run(export_project(z, project, str(tmpdir), allow_all_nodes=True))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_export_fix_path(tmpdir, project, async_run):
|
|
|
|
@ -264,10 +272,9 @@ def test_export_fix_path(tmpdir, project, async_run):
|
|
|
|
|
with open(os.path.join(path, "test.gns3"), 'w+') as f:
|
|
|
|
|
json.dump(topology, f)
|
|
|
|
|
|
|
|
|
|
z = async_run(export_project(project, str(tmpdir)))
|
|
|
|
|
with open(str(tmpdir / 'zipfile.zip'), 'wb') as f:
|
|
|
|
|
for data in z:
|
|
|
|
|
f.write(data)
|
|
|
|
|
with aiozipstream.ZipFile() as z:
|
|
|
|
|
async_run(export_project(z, project, str(tmpdir)))
|
|
|
|
|
async_run(write_file(str(tmpdir / 'zipfile.zip'), z))
|
|
|
|
|
|
|
|
|
|
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
|
|
|
|
|
with myzip.open("project.gns3") as myfile:
|
|
|
|
@ -303,11 +310,10 @@ def test_export_with_images(tmpdir, project, async_run):
|
|
|
|
|
with open(os.path.join(path, "test.gns3"), 'w+') as f:
|
|
|
|
|
json.dump(topology, f)
|
|
|
|
|
|
|
|
|
|
with patch("gns3server.compute.Dynamips.get_images_directory", return_value=str(tmpdir / "IOS"),):
|
|
|
|
|
z = async_run(export_project(project, str(tmpdir), include_images=True))
|
|
|
|
|
with open(str(tmpdir / 'zipfile.zip'), 'wb') as f:
|
|
|
|
|
for data in z:
|
|
|
|
|
f.write(data)
|
|
|
|
|
with aiozipstream.ZipFile() as z:
|
|
|
|
|
with patch("gns3server.compute.Dynamips.get_images_directory", return_value=str(tmpdir / "IOS"),):
|
|
|
|
|
async_run(export_project(z, project, str(tmpdir), include_images=True))
|
|
|
|
|
async_run(write_file(str(tmpdir / 'zipfile.zip'), z))
|
|
|
|
|
|
|
|
|
|
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
|
|
|
|
|
myzip.getinfo("images/IOS/test.image")
|
|
|
|
@ -341,11 +347,9 @@ def test_export_keep_compute_id(tmpdir, project, async_run):
|
|
|
|
|
}
|
|
|
|
|
json.dump(data, f)
|
|
|
|
|
|
|
|
|
|
z = async_run(export_project(project, str(tmpdir), keep_compute_id=True))
|
|
|
|
|
|
|
|
|
|
with open(str(tmpdir / 'zipfile.zip'), 'wb') as f:
|
|
|
|
|
for data in z:
|
|
|
|
|
f.write(data)
|
|
|
|
|
with aiozipstream.ZipFile() as z:
|
|
|
|
|
async_run(export_project(z, project, str(tmpdir), keep_compute_id=True))
|
|
|
|
|
async_run(write_file(str(tmpdir / 'zipfile.zip'), z))
|
|
|
|
|
|
|
|
|
|
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
|
|
|
|
|
with myzip.open("project.gns3") as myfile:
|
|
|
|
@ -353,7 +357,8 @@ def test_export_keep_compute_id(tmpdir, project, async_run):
|
|
|
|
|
assert topo["nodes"][0]["compute_id"] == "6b7149c8-7d6e-4ca0-ab6b-daa8ab567be0"
|
|
|
|
|
assert len(topo["computes"]) == 1
|
|
|
|
|
|
|
|
|
|
def test_export_images_from_vm(tmpdir, project, async_run, controller):
|
|
|
|
|
|
|
|
|
|
def test_export_images_from_vm(tmpdir, project, async_run):
|
|
|
|
|
"""
|
|
|
|
|
If data is on a remote server export it locally before
|
|
|
|
|
sending it in the archive.
|
|
|
|
@ -405,12 +410,10 @@ def test_export_images_from_vm(tmpdir, project, async_run, controller):
|
|
|
|
|
with open(os.path.join(path, "test.gns3"), 'w+') as f:
|
|
|
|
|
f.write(json.dumps(topology))
|
|
|
|
|
|
|
|
|
|
z = async_run(export_project(project, str(tmpdir), include_images=True))
|
|
|
|
|
assert compute.list_files.called
|
|
|
|
|
|
|
|
|
|
with open(str(tmpdir / 'zipfile.zip'), 'wb') as f:
|
|
|
|
|
for data in z:
|
|
|
|
|
f.write(data)
|
|
|
|
|
with aiozipstream.ZipFile() as z:
|
|
|
|
|
async_run(export_project(z, project, str(tmpdir), include_images=True))
|
|
|
|
|
assert compute.list_files.called
|
|
|
|
|
async_run(write_file(str(tmpdir / 'zipfile.zip'), z))
|
|
|
|
|
|
|
|
|
|
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
|
|
|
|
|
with myzip.open("vm-1/dynamips/test") as myfile:
|
|
|
|
@ -450,12 +453,9 @@ def test_export_with_ignoring_snapshots(tmpdir, project, async_run):
|
|
|
|
|
os.makedirs(snapshots_dir)
|
|
|
|
|
Path(os.path.join(snapshots_dir, 'snap.gns3project')).touch()
|
|
|
|
|
|
|
|
|
|
z = async_run(export_project(project, str(tmpdir), keep_compute_id=True))
|
|
|
|
|
|
|
|
|
|
with open(str(tmpdir / 'zipfile.zip'), 'wb') as f:
|
|
|
|
|
for data in z:
|
|
|
|
|
f.write(data)
|
|
|
|
|
with aiozipstream.ZipFile() as z:
|
|
|
|
|
async_run(export_project(z, project, str(tmpdir), keep_compute_id=True))
|
|
|
|
|
async_run(write_file(str(tmpdir / 'zipfile.zip'), z))
|
|
|
|
|
|
|
|
|
|
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
|
|
|
|
|
assert not os.path.join('snapshots', 'snap.gns3project') in [f.filename for f in myzip.filelist]
|
|
|
|
|
|
|
|
|
|