Remove NIO from VPCS

pull/100/head
Julien Duponchelle 10 years ago
parent 8e307c8cbb
commit 77db08c39e

@ -79,21 +79,40 @@ class VPCSHandler(object):
@Route.post(
r"/vpcs/{vpcs_id}/ports/{port_id}/nio",
parameters={
"vpcs_id": "Id of VPCS instance"
"vpcs_id": "Id of VPCS instance",
"port_id": "Id of the port where the nio should be add"
},
status_codes={
201: "Success of creation of NIO",
200: "Success of creation of NIO",
404: "If VPCS doesn't exist"
},
description="ADD NIO to a VPCS",
input=VPCS_NIO_SCHEMA,
output=VPCS_NIO_SCHEMA)
def create_nio(request, response):
# TODO: response with nio
vpcs_manager = VPCS.instance()
vm = vpcs_manager.get_vm(int(request.match_info['vpcs_id']))
nio = vm.port_add_nio_binding(int(request.match_info['port_id']), request.json)
response.json(nio)
@classmethod
@Route.delete(
r"/vpcs/{vpcs_id}/ports/{port_id}/nio",
parameters={
"vpcs_id": "Id of VPCS instance",
"port_id": "Id of the port where the nio should be remove"
},
status_codes={
200: "Success of deletin of NIO",
404: "If VPCS doesn't exist"
},
description="Remove NIO from a VPCS")
def delete_nio(request, response):
vpcs_manager = VPCS.instance()
vm = vpcs_manager.get_vm(int(request.match_info['vpcs_id']))
nio = vm.port_remove_nio_binding(int(request.match_info['port_id']))
response.json({})

@ -70,6 +70,10 @@ class Route(object):
def put(cls, path, *args, **kw):
return cls._route('PUT', path, *args, **kw)
@classmethod
def delete(cls, path, *args, **kw):
return cls._route('DELETE', path, *args, **kw)
@classmethod
def _route(cls, method, path, *args, **kw):
# This block is executed only the first time

@ -44,6 +44,9 @@ class Query:
def get(self, path, **kwargs):
return self._fetch("GET", path, **kwargs)
def delete(self, path, **kwargs):
return self._fetch("DELETE", path, **kwargs)
def _get_url(self, path):
return "http://{}:{}{}".format(self._host, self._port, path)

@ -53,3 +53,18 @@ def test_vpcs_nio_create_tap(mock, server):
assert response.status == 200
assert response.route == '/vpcs/{vpcs_id}/ports/{port_id}/nio'
assert response.json['type'] == 'nio_tap'
def test_vpcs_delete_nio(server):
vm = server.post('/vpcs', {'name': 'PC TEST 1'})
response = server.post('/vpcs/{}/ports/0/nio'.format(vm.json["vpcs_id"]), {
'type': 'nio_udp',
'lport': 4242,
'rport': 4343,
'rhost': '127.0.0.1'
},
)
response = server.delete('/vpcs/{}/ports/0/nio'.format(vm.json["vpcs_id"]))
assert response.status == 200
assert response.route == '/vpcs/{vpcs_id}/ports/{port_id}/nio'

@ -82,3 +82,10 @@ def test_add_nio_binding_tap_no_privileged_access(port_manager, tmpdir):
with patch("gns3server.modules.vpcs.vpcs_device.has_privileged_access", return_value=False):
with pytest.raises(VPCSError):
vm.port_add_nio_binding(0, {"type": "nio_tap", "tap_device": "test"})
assert vm._ethernet_adapter.ports[0] is not None
def test_port_remove_nio_binding(port_manager, tmpdir):
vm = VPCSDevice("test", 42, port_manager, working_dir=str(tmpdir))
nio = vm.port_add_nio_binding(0, {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
vm.port_remove_nio_binding(0)
assert vm._ethernet_adapter.ports[0] == None

Loading…
Cancel
Save