1
0
mirror of https://github.com/GNS3/gns3-server synced 2025-01-12 17:10:55 +00:00

API for duplication a Node

Support:
* VPCS
* Dynamips router
* Ethernet switch

Ref #1065
This commit is contained in:
Julien Duponchelle 2017-07-20 17:29:42 +02:00
parent 0449a5b4ee
commit 0854c04687
No known key found for this signature in database
GPG Key ID: CE8B29639E07F5E8
18 changed files with 382 additions and 32 deletions

View File

@ -256,6 +256,37 @@ class BaseManager:
project.add_node(node) project.add_node(node)
return node return node
@asyncio.coroutine
def duplicate_node(self, source_node_id, destination_node_id):
"""
Duplicate a node
:param source_node_id: Source node identifier
:param destination_node_id: Destination node identifier
:returns: New node instance
"""
source_node = self.get_node(source_node_id)
destination_node = self.get_node(destination_node_id)
# Some node don't have working dir like switch
if not hasattr(destination_node, "working_dir"):
return destination_node
destination_dir = destination_node.working_dir
try:
shutil.rmtree(destination_dir)
shutil.copytree(source_node.working_dir, destination_dir)
except OSError as e:
raise aiohttp.web.HTTPConflict(text="Can't duplicate node data: {}".format(e))
# We force a refresh of the name. This force the rewrite
# of some configuration files
node_name = destination_node.name
destination_node.name = node_name + "tmp"
destination_node.name = node_name
return destination_node
@asyncio.coroutine @asyncio.coroutine
def close_node(self, node_id): def close_node(self, node_id):
""" """

View File

@ -34,7 +34,6 @@ from ..ubridge.hypervisor import Hypervisor
from ..ubridge.ubridge_error import UbridgeError from ..ubridge.ubridge_error import UbridgeError
from .nios.nio_udp import NIOUDP from .nios.nio_udp import NIOUDP
from .error import NodeError from .error import NodeError
from ..config import Config
log = logging.getLogger(__name__) log = logging.getLogger(__name__)

View File

@ -511,16 +511,12 @@ class Dynamips(BaseManager):
:param settings: VM settings :param settings: VM settings
""" """
module_workdir = vm.project.module_working_directory(self.module_name.lower())
default_startup_config_path = os.path.join(module_workdir, vm.id, "configs", "i{}_startup-config.cfg".format(vm.dynamips_id))
default_private_config_path = os.path.join(module_workdir, vm.id, "configs", "i{}_private-config.cfg".format(vm.dynamips_id))
startup_config_content = settings.get("startup_config_content") startup_config_content = settings.get("startup_config_content")
if startup_config_content: if startup_config_content:
self._create_config(vm, default_startup_config_path, startup_config_content) self._create_config(vm, vm.startup_config_path, startup_config_content)
private_config_content = settings.get("private_config_content") private_config_content = settings.get("private_config_content")
if private_config_content: if private_config_content:
self._create_config(vm, default_private_config_path, private_config_content) self._create_config(vm, vm.private_config_path, private_config_content)
def _create_config(self, vm, path, content=None): def _create_config(self, vm, path, content=None):
""" """
@ -605,3 +601,40 @@ class Dynamips(BaseManager):
if was_auto_started: if was_auto_started:
yield from vm.stop() yield from vm.stop()
return validated_idlepc return validated_idlepc
@asyncio.coroutine
def duplicate_node(self, source_node_id, destination_node_id):
"""
Duplicate a node
:param node_id: Node identifier
:returns: New node instance
"""
source_node = self.get_node(source_node_id)
destination_node = self.get_node(destination_node_id)
# Non router gears
if not hasattr(source_node, "startup_config_path"):
return (yield from super().duplicate_node(source_node_id, destination_node_id))
try:
with open(source_node.startup_config_path) as f:
startup_config = f.read()
except OSError:
startup_config = None
try:
with open(source_node.private_config_path) as f:
private_config = f.read()
except OSError:
private_config = None
yield from self.set_vm_configs(destination_node, {
"startup_config_content": startup_config,
"private_config_content": private_config
})
# Force refresh of the name in configuration files
new_name = destination_node.name
yield from destination_node.set_name(source_node.name)
yield from destination_node.set_name(new_name)
return destination_node

View File

@ -1474,6 +1474,20 @@ class Router(BaseNode):
return self._slots return self._slots
@property
def startup_config_path(self):
"""
:returns: Path of the startup config
"""
return os.path.join(self._working_directory, "configs", "i{}_startup-config.cfg".format(self._dynamips_id))
@property
def private_config_path(self):
"""
:returns: Path of the private config
"""
return os.path.join(self._working_directory, "configs", "i{}_private-config.cfg".format(self._dynamips_id))
@asyncio.coroutine @asyncio.coroutine
def set_name(self, new_name): def set_name(self, new_name):
""" """
@ -1483,28 +1497,26 @@ class Router(BaseNode):
""" """
# change the hostname in the startup-config # change the hostname in the startup-config
startup_config_path = os.path.join(self._working_directory, "configs", "i{}_startup-config.cfg".format(self._dynamips_id)) if os.path.isfile(self.startup_config_path):
if os.path.isfile(startup_config_path):
try: try:
with open(startup_config_path, "r+", encoding="utf-8", errors="replace") as f: with open(self.startup_config_path, "r+", encoding="utf-8", errors="replace") as f:
old_config = f.read() old_config = f.read()
new_config = re.sub(r"^hostname .+$", "hostname " + new_name, old_config, flags=re.MULTILINE) new_config = re.sub(r"^hostname .+$", "hostname " + new_name, old_config, flags=re.MULTILINE)
f.seek(0) f.seek(0)
f.write(new_config) f.write(new_config)
except OSError as e: except OSError as e:
raise DynamipsError("Could not amend the configuration {}: {}".format(startup_config_path, e)) raise DynamipsError("Could not amend the configuration {}: {}".format(self.startup_config_path, e))
# change the hostname in the private-config # change the hostname in the private-config
private_config_path = os.path.join(self._working_directory, "configs", "i{}_private-config.cfg".format(self._dynamips_id)) if os.path.isfile(self.private_config_path):
if os.path.isfile(private_config_path):
try: try:
with open(private_config_path, "r+", encoding="utf-8", errors="replace") as f: with open(self.private_config_path, "r+", encoding="utf-8", errors="replace") as f:
old_config = f.read() old_config = f.read()
new_config = old_config.replace(self.name, new_name) new_config = old_config.replace(self.name, new_name)
f.seek(0) f.seek(0)
f.write(new_config) f.write(new_config)
except OSError as e: except OSError as e:
raise DynamipsError("Could not amend the configuration {}: {}".format(private_config_path, e)) raise DynamipsError("Could not amend the configuration {}: {}".format(self.private_config_path, e))
yield from self._hypervisor.send('vm rename "{name}" "{new_name}"'.format(name=self._name, new_name=new_name)) yield from self._hypervisor.send('vm rename "{name}" "{new_name}"'.format(name=self._name, new_name=new_name))
log.info('Router "{name}" [{id}]: renamed to "{new_name}"'.format(name=self._name, id=self._id, new_name=new_name)) log.info('Router "{name}" [{id}]: renamed to "{new_name}"'.format(name=self._name, id=self._id, new_name=new_name))
@ -1543,7 +1555,7 @@ class Router(BaseNode):
startup_config_base64, private_config_base64 = yield from self.extract_config() startup_config_base64, private_config_base64 = yield from self.extract_config()
if startup_config_base64: if startup_config_base64:
startup_config = os.path.join("configs", "i{}_startup-config.cfg".format(self._dynamips_id)) startup_config = self.startup_config_path
try: try:
config = base64.b64decode(startup_config_base64).decode("utf-8", errors="replace") config = base64.b64decode(startup_config_base64).decode("utf-8", errors="replace")
config = "!\n" + config.replace("\r", "") config = "!\n" + config.replace("\r", "")
@ -1555,7 +1567,7 @@ class Router(BaseNode):
raise DynamipsError("Could not save the startup configuration {}: {}".format(config_path, e)) raise DynamipsError("Could not save the startup configuration {}: {}".format(config_path, e))
if private_config_base64 and base64.b64decode(private_config_base64) != b'\nkerberos password \nend\n': if private_config_base64 and base64.b64decode(private_config_base64) != b'\nkerberos password \nend\n':
private_config = os.path.join("configs", "i{}_private-config.cfg".format(self._dynamips_id)) private_config = self.private_config_path
try: try:
config = base64.b64decode(private_config_base64).decode("utf-8", errors="replace") config = base64.b64decode(private_config_base64).decode("utf-8", errors="replace")
config_path = os.path.join(self._working_directory, private_config) config_path = os.path.join(self._working_directory, private_config)

View File

@ -173,7 +173,7 @@ class VPCSVM(BaseNode):
if self.script_file: if self.script_file:
content = self.startup_script content = self.startup_script
content = content.replace(self._name, new_name) content = content.replace(self._name, new_name)
escaped_name = re.escape(new_name) escaped_name = new_name.replace('\\', '')
content = re.sub(r"^set pcname .+$", "set pcname " + escaped_name, content, flags=re.MULTILINE) content = re.sub(r"^set pcname .+$", "set pcname " + escaped_name, content, flags=re.MULTILINE)
self.startup_script = content self.startup_script = content

View File

@ -106,6 +106,16 @@ class Node:
if self._symbol is None: if self._symbol is None:
self.symbol = ":/symbols/computer.svg" self.symbol = ":/symbols/computer.svg"
def is_always_running(self):
"""
:returns: Boolean True if the node is always running
like ethernet switch
"""
return self.node_type not in (
"qemu", "docker", "dynamips",
"vpcs", "vmware", "virtualbox",
"iou")
@property @property
def id(self): def id(self):
return self._id return self._id

View File

@ -19,6 +19,7 @@ import re
import os import os
import json import json
import uuid import uuid
import copy
import shutil import shutil
import asyncio import asyncio
import aiohttp import aiohttp
@ -367,6 +368,8 @@ class Project:
if base_name is None: if base_name is None:
return None return None
base_name = re.sub(r"[ ]", "", base_name) base_name = re.sub(r"[ ]", "", base_name)
base_name = re.sub(r"[0-9]+$", "{0}", base_name)
if '{0}' in base_name or '{id}' in base_name: if '{0}' in base_name or '{id}' in base_name:
# base name is a template, replace {0} or {id} by an unique identifier # base name is a template, replace {0} or {id} by an unique identifier
for number in range(1, 1000000): for number in range(1, 1000000):
@ -834,7 +837,7 @@ class Project:
""" """
for node in self._nodes.values(): for node in self._nodes.values():
# Some node type are always running we ignore them # Some node type are always running we ignore them
if node.status != "stopped" and node.node_type in ("qemu", "docker", "dynamips", "vpcs", "vmware", "virtualbox", "iou"): if node.status != "stopped" and not node.is_always_running():
return True return True
return False return False
@ -882,6 +885,54 @@ class Project:
pool.append(node.suspend) pool.append(node.suspend)
yield from pool.join() yield from pool.join()
@asyncio.coroutine
def duplicate_node(self, node, x, y, z):
"""
Duplicate a node
:param node: Node instance
:param x: X position
:param y: Y position
:param z: Z position
:returns: New node
"""
if node.status != "stopped" and not node.is_always_running():
raise aiohttp.web.HTTPConflict(text="Can't duplicate node data while is running")
data = copy.deepcopy(node.__json__(topology_dump=True))
# Some properties like internal ID should not be duplicate
for unique_property in (
'node_id',
'name',
'compute_id',
'application_id',
'dynamips_id'):
data.pop(unique_property, None)
if 'properties' in data:
data['properties'].pop(unique_property, None)
node_type = data.pop('node_type')
data['x'] = x
data['y'] = y
data['z'] = z
new_node_uuid = str(uuid.uuid4())
new_node = yield from self.add_node(
node.compute,
node.name,
new_node_uuid,
node_type=node_type,
**data)
try:
yield from node.post("/duplicate", timeout=None, data={
"destination_node_id": new_node_uuid
})
except aiohttp.web.HTTPNotFound as e:
yield from self.delete_node(new_node_uuid)
raise aiohttp.web.HTTPConflict(text="You can't duplicate this node type")
except aiohttp.web.HTTPConflict as e:
yield from self.delete_node(new_node_uuid)
raise e
return new_node
def __json__(self): def __json__(self):
return { return {
"name": self._name, "name": self._name,

View File

@ -447,3 +447,23 @@ class DynamipsVMHandler:
dynamips_manager = Dynamips.instance() dynamips_manager = Dynamips.instance()
yield from dynamips_manager.write_image(request.match_info["filename"], request.content) yield from dynamips_manager.write_image(request.match_info["filename"], request.content)
response.set_status(204) response.set_status(204)
@Route.post(
r"/projects/{project_id}/dynamips/nodes/{node_id}/duplicate",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID"
},
status_codes={
201: "Instance duplicated",
404: "Instance doesn't exist"
},
description="Duplicate a dynamips instance")
def duplicate(request, response):
new_node = yield from Dynamips.instance().duplicate_node(
request.match_info["node_id"],
request.json["destination_node_id"]
)
response.set_status(201)
response.json(new_node)

View File

@ -20,7 +20,6 @@ import os
from gns3server.web.route import Route from gns3server.web.route import Route
from gns3server.schemas.node import NODE_CAPTURE_SCHEMA from gns3server.schemas.node import NODE_CAPTURE_SCHEMA
from gns3server.schemas.nio import NIO_SCHEMA from gns3server.schemas.nio import NIO_SCHEMA
from gns3server.compute.builtin import Builtin
from gns3server.compute.dynamips import Dynamips from gns3server.compute.dynamips import Dynamips
from gns3server.schemas.ethernet_switch import ( from gns3server.schemas.ethernet_switch import (
@ -91,6 +90,26 @@ class EthernetSwitchHandler:
# node = builtin_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"]) # node = builtin_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
response.json(node) response.json(node)
@Route.post(
r"/projects/{project_id}/ethernet_switch/nodes/{node_id}/duplicate",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID"
},
status_codes={
201: "Instance duplicated",
404: "Instance doesn't exist"
},
description="Duplicate an ethernet switch instance")
def duplicate(request, response):
new_node = yield from Dynamips.instance().duplicate_node(
request.match_info["node_id"],
request.json["destination_node_id"]
)
response.set_status(201)
response.json(new_node)
@Route.put( @Route.put(
r"/projects/{project_id}/ethernet_switch/nodes/{node_id}", r"/projects/{project_id}/ethernet_switch/nodes/{node_id}",
parameters={ parameters={

View File

@ -30,7 +30,6 @@ from gns3server.schemas.vpcs import (
class VPCSHandler: class VPCSHandler:
""" """
API entry points for VPCS. API entry points for VPCS.
""" """
@ -119,6 +118,26 @@ class VPCSHandler:
yield from VPCS.instance().delete_node(request.match_info["node_id"]) yield from VPCS.instance().delete_node(request.match_info["node_id"])
response.set_status(204) response.set_status(204)
@Route.post(
r"/projects/{project_id}/vpcs/nodes/{node_id}/duplicate",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID"
},
status_codes={
201: "Instance duplicated",
404: "Instance doesn't exist"
},
description="Duplicate a VPCS instance")
def duplicate(request, response):
new_node = yield from VPCS.instance().duplicate_node(
request.match_info["node_id"],
request.json["destination_node_id"]
)
response.set_status(201)
response.json(new_node)
@Route.post( @Route.post(
r"/projects/{project_id}/vpcs/nodes/{node_id}/start", r"/projects/{project_id}/vpcs/nodes/{node_id}/start",
parameters={ parameters={

View File

@ -25,7 +25,8 @@ from gns3server.utils import force_unix_path
from gns3server.schemas.node import ( from gns3server.schemas.node import (
NODE_OBJECT_SCHEMA, NODE_OBJECT_SCHEMA,
NODE_UPDATE_SCHEMA, NODE_UPDATE_SCHEMA,
NODE_CREATE_SCHEMA NODE_CREATE_SCHEMA,
NODE_DUPLICATE_SCHEMA
) )
@ -180,6 +181,32 @@ class NodeHandler:
yield from project.start_all() yield from project.start_all()
response.set_status(204) response.set_status(204)
@Route.post(
r"/projects/{project_id}/nodes/{node_id}/duplicate",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID"
},
status_codes={
201: "Instance duplicated",
400: "Invalid request",
404: "Instance doesn't exist"
},
description="Duplicate a node instance",
input=NODE_DUPLICATE_SCHEMA,
output=NODE_OBJECT_SCHEMA)
def duplicate(request, response):
project = yield from Controller.instance().get_loaded_project(request.match_info["project_id"])
node = project.get_node(request.match_info["node_id"])
new_node = yield from project.duplicate_node(
node,
request.json["x"],
request.json["y"],
request.json.get("z", 0))
response.json(new_node)
response.set_status(201)
@Route.post( @Route.post(
r"/projects/{project_id}/nodes/{node_id}/start", r"/projects/{project_id}/nodes/{node_id}/start",
parameters={ parameters={

View File

@ -237,3 +237,26 @@ NODE_OBJECT_SCHEMA = {
NODE_CREATE_SCHEMA = NODE_OBJECT_SCHEMA NODE_CREATE_SCHEMA = NODE_OBJECT_SCHEMA
NODE_UPDATE_SCHEMA = copy.deepcopy(NODE_OBJECT_SCHEMA) NODE_UPDATE_SCHEMA = copy.deepcopy(NODE_OBJECT_SCHEMA)
del NODE_UPDATE_SCHEMA["required"] del NODE_UPDATE_SCHEMA["required"]
NODE_DUPLICATE_SCHEMA = {
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Duplicate a node",
"type": "object",
"properties": {
"x": {
"description": "X position of the node",
"type": "integer"
},
"y": {
"description": "Y position of the node",
"type": "integer"
},
"z": {
"description": "Z position of the node",
"type": "integer"
}
},
"additionalProperties": False,
"required": ["x", "y"]
}

View File

@ -26,6 +26,7 @@ import asyncio
from gns3server.compute.dynamips import Dynamips from gns3server.compute.dynamips import Dynamips
from gns3server.compute.dynamips.dynamips_error import DynamipsError from gns3server.compute.dynamips.dynamips_error import DynamipsError
from unittest.mock import patch from unittest.mock import patch
from tests.utils import asyncio_patch, AsyncioMagicMock
@pytest.fixture @pytest.fixture
@ -82,7 +83,7 @@ def test_release_dynamips_id(manager):
manager.release_dynamips_id(project_2, 0) manager.release_dynamips_id(project_2, 0)
def test_project_closed(manager, project, loop): def test_project_closed(manager, project, async_run):
manager._dynamips_ids[project.id] = set([1, 2, 3]) manager._dynamips_ids[project.id] = set([1, 2, 3])
@ -90,7 +91,38 @@ def test_project_closed(manager, project, loop):
os.makedirs(project_dir) os.makedirs(project_dir)
open(os.path.join(project_dir, "test.ghost"), "w+").close() open(os.path.join(project_dir, "test.ghost"), "w+").close()
loop.run_until_complete(asyncio.async(manager.project_closed(project))) async_run(manager.project_closed(project))
assert not os.path.exists(os.path.join(project_dir, "test.ghost")) assert not os.path.exists(os.path.join(project_dir, "test.ghost"))
assert project.id not in manager._dynamips_ids assert project.id not in manager._dynamips_ids
def test_duplicate_node(manager, project, async_run):
"""
Duplicate dynamips do nothing it's manage outside the
filesystem
"""
with asyncio_patch('gns3server.compute.dynamips.nodes.c7200.C7200.create'):
source_node = async_run(manager.create_node(
'R1',
project.id,
str(uuid.uuid4()),
platform="c7200"
))
destination_node = async_run(manager.create_node(
'R2',
project.id,
str(uuid.uuid4()),
platform="c7200"
))
destination_node._hypervisor = AsyncioMagicMock()
with open(os.path.join(source_node.working_dir, 'c3600_i1_nvram'), 'w+') as f:
f.write("1")
with open(source_node.startup_config_path, 'w+') as f:
f.write('hostname R1\necho TEST')
async_run(manager.duplicate_node(source_node.id, destination_node.id))
assert not os.path.exists(os.path.join(destination_node.working_dir, 'c3600_i1_nvram'))
with open(destination_node.startup_config_path) as f:
content = f.read()
assert content == '!\nhostname R2\necho TEST'

View File

@ -19,9 +19,11 @@ import uuid
import os import os
import pytest import pytest
from unittest.mock import patch from unittest.mock import patch
from tests.utils import AsyncioMagicMock, asyncio_patch
from gns3server.compute.vpcs import VPCS from gns3server.compute.vpcs import VPCS
from gns3server.compute.dynamips import Dynamips
from gns3server.compute.qemu import Qemu from gns3server.compute.qemu import Qemu
from gns3server.compute.error import NodeError, ImageMissingError from gns3server.compute.error import NodeError, ImageMissingError
from gns3server.utils import force_unix_path from gns3server.utils import force_unix_path
@ -273,3 +275,25 @@ def test_delete_node(async_run, vpcs, project):
async_run(vpcs.delete_node(node_id)) async_run(vpcs.delete_node(node_id))
mock_emit.assert_called_with("node.deleted", node) mock_emit.assert_called_with("node.deleted", node)
assert node not in project.nodes assert node not in project.nodes
def test_duplicate_vpcs(async_run, vpcs, project):
source_node_id = str(uuid.uuid4())
source_node = async_run(vpcs.create_node("PC-1", project.id, source_node_id, console=2222))
with open(os.path.join(source_node.working_dir, "startup.vpc"), "w+") as f:
f.write("set pcname PC-1\nip dhcp\n")
destination_node_id = str(uuid.uuid4())
destination_node = async_run(vpcs.create_node("PC-2", project.id, destination_node_id, console=2223))
async_run(vpcs.duplicate_node(source_node_id, destination_node_id))
with open(os.path.join(destination_node.working_dir, "startup.vpc")) as f:
assert f.read() == "set pcname PC-2\nip dhcp\n"
def test_duplicate_ethernet_switch(async_run, project):
with asyncio_patch('gns3server.compute.dynamips.nodes.ethernet_switch.EthernetSwitch.create'):
dynamips_manager = Dynamips.instance()
source_node_id = str(uuid.uuid4())
source_node = async_run(dynamips_manager.create_node("SW-1", project.id, source_node_id, node_type='ethernet_switch'))
destination_node_id = str(uuid.uuid4())
destination_node = async_run(dynamips_manager.create_node("SW-2", project.id, destination_node_id, node_type='ethernet_switch'))
async_run(dynamips_manager.duplicate_node(source_node_id, destination_node_id))

View File

@ -255,7 +255,7 @@ def test_update_startup_script_h(vm):
def test_update_startup_script_with_escaping_characters_in_name(vm): def test_update_startup_script_with_escaping_characters_in_name(vm):
vm.startup_script = "set pcname initial-name\n" vm.startup_script = "set pcname initial-name\n"
vm.name = "test\\" vm.name = "test\\"
assert vm.startup_script == "set pcname test\\{}".format(os.linesep) assert vm.startup_script == "set pcname test{}".format(os.linesep)
def test_get_startup_script(vm): def test_get_startup_script(vm):

View File

@ -597,6 +597,10 @@ def test_node_name(project, async_run):
assert node.name == "helloworld-1" assert node.name == "helloworld-1"
node = async_run(project.add_node(compute, "hello world-{0}", None, node_type="vpcs", properties={"startup_config": "test.cfg"})) node = async_run(project.add_node(compute, "hello world-{0}", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
assert node.name == "helloworld-2" assert node.name == "helloworld-2"
node = async_run(project.add_node(compute, "VPCS-1", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
assert node.name == "VPCS-1"
node = async_run(project.add_node(compute, "VPCS-1", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
assert node.name == "VPCS-2"
def test_add_iou_node_and_check_if_gets_application_id(project, async_run): def test_add_iou_node_and_check_if_gets_application_id(project, async_run):
@ -618,3 +622,22 @@ def test_add_iou_node_and_check_if_gets_application_id(project, async_run):
compute, "test", None, node_type="iou", application_id=333, properties={"startup_config": "test.cfg"})) compute, "test", None, node_type="iou", application_id=333, properties={"startup_config": "test.cfg"}))
assert mocked_get_app_id.called assert mocked_get_app_id.called
assert node.properties['application_id'] == 333 assert node.properties['application_id'] == 333
def test_duplicate_node(project, async_run):
compute = MagicMock()
compute.id = "local"
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
original = async_run(project.add_node(
compute,
"test",
None,
node_type="vpcs",
properties={
"startup_config": "test.cfg"
}))
new_node = async_run(project.duplicate_node(original, 42, 10, 11))
assert new_node.x == 42

View File

@ -16,6 +16,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest import pytest
import uuid
import sys import sys
import os import os
from tests.utils import asyncio_patch from tests.utils import asyncio_patch
@ -66,10 +67,10 @@ def test_vpcs_create_port(http_compute, project, free_console_port):
def test_vpcs_nio_create_udp(http_compute, vm): def test_vpcs_nio_create_udp(http_compute, vm):
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM.add_ubridge_udp_connection"): with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM.add_ubridge_udp_connection"):
response = http_compute.post("/projects/{project_id}/vpcs/nodes/{node_id}/adapters/0/ports/0/nio".format(project_id=vm["project_id"], node_id=vm["node_id"]), {"type": "nio_udp", response = http_compute.post("/projects/{project_id}/vpcs/nodes/{node_id}/adapters/0/ports/0/nio".format(project_id=vm["project_id"], node_id=vm["node_id"]), {"type": "nio_udp",
"lport": 4242, "lport": 4242,
"rport": 4343, "rport": 4343,
"rhost": "127.0.0.1"}, "rhost": "127.0.0.1"},
example=True) example=True)
assert response.status == 201 assert response.status == 201
assert response.route == "/projects/{project_id}/vpcs/nodes/{node_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio" assert response.route == "/projects/{project_id}/vpcs/nodes/{node_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio"
assert response.json["type"] == "nio_udp" assert response.json["type"] == "nio_udp"
@ -92,9 +93,9 @@ def test_vpcs_nio_update_udp(http_compute, vm):
def test_vpcs_delete_nio(http_compute, vm): def test_vpcs_delete_nio(http_compute, vm):
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM._ubridge_send"): with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM._ubridge_send"):
http_compute.post("/projects/{project_id}/vpcs/nodes/{node_id}/adapters/0/ports/0/nio".format(project_id=vm["project_id"], node_id=vm["node_id"]), {"type": "nio_udp", http_compute.post("/projects/{project_id}/vpcs/nodes/{node_id}/adapters/0/ports/0/nio".format(project_id=vm["project_id"], node_id=vm["node_id"]), {"type": "nio_udp",
"lport": 4242, "lport": 4242,
"rport": 4343, "rport": 4343,
"rhost": "127.0.0.1"}) "rhost": "127.0.0.1"})
response = http_compute.delete("/projects/{project_id}/vpcs/nodes/{node_id}/adapters/0/ports/0/nio".format(project_id=vm["project_id"], node_id=vm["node_id"]), example=True) response = http_compute.delete("/projects/{project_id}/vpcs/nodes/{node_id}/adapters/0/ports/0/nio".format(project_id=vm["project_id"], node_id=vm["node_id"]), example=True)
assert response.status == 204, response.body.decode() assert response.status == 204, response.body.decode()
assert response.route == "/projects/{project_id}/vpcs/nodes/{node_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio" assert response.route == "/projects/{project_id}/vpcs/nodes/{node_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio"
@ -130,6 +131,20 @@ def test_vpcs_delete(http_compute, vm):
assert response.status == 204 assert response.status == 204
def test_vpcs_duplicate(http_compute, vm):
with asyncio_patch("gns3server.compute.vpcs.VPCS.duplicate_node", return_value=True) as mock:
response = http_compute.post(
"/projects/{project_id}/vpcs/nodes/{node_id}/duplicate".format(
project_id=vm["project_id"],
node_id=vm["node_id"]),
body={
"destination_node_id": str(uuid.uuid4())
},
example=True)
assert mock.called
assert response.status == 201
def test_vpcs_update(http_compute, vm, tmpdir, free_console_port): def test_vpcs_update(http_compute, vm, tmpdir, free_console_port):
response = http_compute.put("/projects/{project_id}/vpcs/nodes/{node_id}".format(project_id=vm["project_id"], node_id=vm["node_id"]), {"name": "test", response = http_compute.put("/projects/{project_id}/vpcs/nodes/{node_id}".format(project_id=vm["project_id"], node_id=vm["node_id"]), {"name": "test",
"console": free_console_port, "console": free_console_port,

View File

@ -195,6 +195,18 @@ def test_reload_node(http_controller, tmpdir, project, compute, node):
assert response.json == node.__json__() assert response.json == node.__json__()
def test_duplicate_node(http_controller, tmpdir, project, compute, node):
response = MagicMock()
response.json({"console": 2035})
compute.post = AsyncioMagicMock(return_value=response)
response = http_controller.post("/projects/{}/nodes/{}/duplicate".format(
project.id, node.id),
{"x": 10, "y": 5, "z": 0},
example=True)
assert response.status == 201, response.body.decode()
def test_delete_node(http_controller, tmpdir, project, compute, node): def test_delete_node(http_controller, tmpdir, project, compute, node):
response = MagicMock() response = MagicMock()
compute.post = AsyncioMagicMock() compute.post = AsyncioMagicMock()