1
0
mirror of https://github.com/GNS3/gns3-server synced 2024-12-26 16:58:28 +00:00

Use PATH environnement variable for searching binary

This commit is contained in:
Julien Duponchelle 2015-01-16 20:23:43 +01:00
parent bf6f62e629
commit 8e307c8cbb
3 changed files with 35 additions and 62 deletions

View File

@ -19,74 +19,28 @@
import asyncio import asyncio
from .vm_error import VMError from .vm_error import VMError
from .attic import find_unused_port from .attic import find_unused_port
from ..config import Config
import logging import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class BaseVM: class BaseVM:
_allocated_console_ports = []
def __init__(self, name, identifier, port_manager): def __init__(self, name, identifier, port_manager):
self._loop = asyncio.get_event_loop() self._loop = asyncio.get_event_loop()
self._allocate_console()
self._queue = asyncio.Queue() self._queue = asyncio.Queue()
self._name = name self._name = name
self._id = identifier self._id = identifier
self._created = asyncio.Future() self._created = asyncio.Future()
self._worker = asyncio.async(self._run()) self._worker = asyncio.async(self._run())
self._port_manager = port_manager self._port_manager = port_manager
self._config = Config.instance()
log.info("{type} device {name} [id={id}] has been created".format( log.info("{type} device {name} [id={id}] has been created".format(
type=self.__class__.__name__, type=self.__class__.__name__,
name=self._name, name=self._name,
id=self._id)) id=self._id))
def _allocate_console(self):
if not self._console:
# allocate a console port
try:
self._console = find_unused_port(self._console_start_port_range,
self._console_end_port_range,
self._console_host,
ignore_ports=self._allocated_console_ports)
except Exception as e:
raise VMError(e)
if self._console in self._allocated_console_ports:
raise VMError("Console port {} is already used by another VM".format(self._console))
self._allocated_console_ports.append(self._console)
@property
def console(self):
"""
Returns the TCP console port.
:returns: console port (integer)
"""
return self._console
@console.setter
def console(self, console):
"""
Sets the TCP console port.
:param console: console port (integer)
"""
if console in self._allocated_console_ports:
raise VMError("Console port {} is already used by another VM".format(console))
self._allocated_console_ports.remove(self._console)
self._console = console
self._allocated_console_ports.append(self._console)
log.info("{type} {name} [id={id}]: console port set to {port}".format(type=self.__class__.__name__,
name=self._name,
id=self._id,
port=console))
@property @property
def id(self): def id(self):
""" """

View File

@ -28,6 +28,7 @@ import shutil
import re import re
import asyncio import asyncio
import socket import socket
import shutil
from pkg_resources import parse_version from pkg_resources import parse_version
from .vpcs_error import VPCSError from .vpcs_error import VPCSError
@ -52,17 +53,17 @@ class VPCSDevice(BaseVM):
:param console: TCP console port :param console: TCP console port
""" """
def __init__(self, name, vpcs_id, port_manager, def __init__(self, name, vpcs_id, port_manager,
path = None, working_dir = None, console = None):
working_dir = None, super().__init__(name, vpcs_id, port_manager)
console=None):
#self._path = path #self._path = path
#self._working_dir = working_dir #self._working_dir = working_dir
# TODO: Hardcodded for testing # TODO: Hardcodded for testing
self._path = "/usr/local/bin/vpcs" self._path = self._config.get_section_config("VPCS").get("path", "vpcs")
self._working_dir = "/tmp"
self._working_dir = "/tmp"
self._console = console self._console = console
self._command = [] self._command = []
self._process = None self._process = None
self._vpcs_stdout_file = "" self._vpcs_stdout_file = ""
@ -89,12 +90,15 @@ class VPCSDevice(BaseVM):
raise VPCSError(e) raise VPCSError(e)
self._check_requirements() self._check_requirements()
super().__init__(name, vpcs_id, port_manager)
def _check_requirements(self): def _check_requirements(self):
""" """
Check if VPCS is available with the correct version Check if VPCS is available with the correct version
""" """
if self._path == "vpcs":
self._path = shutil.which("vpcs")
if not self._path: if not self._path:
raise VPCSError("No path to a VPCS executable has been set") raise VPCSError("No path to a VPCS executable has been set")
@ -106,6 +110,16 @@ class VPCSDevice(BaseVM):
self._check_vpcs_version() self._check_vpcs_version()
@property
def console(self):
"""
Returns the console port of this VPCS device.
:returns: console port
"""
return self._console
@property @property
def name(self): def name(self):
""" """

View File

@ -28,33 +28,38 @@ from gns3server.modules.vpcs.vpcs_error import VPCSError
@patch("subprocess.check_output", return_value="Welcome to Virtual PC Simulator, version 0.6".encode("utf-8")) @patch("subprocess.check_output", return_value="Welcome to Virtual PC Simulator, version 0.6".encode("utf-8"))
def test_vm(tmpdir, port_manager): def test_vm(tmpdir, port_manager):
vm = VPCSDevice("test", 42, port_manager, working_dir=str(tmpdir), path="/bin/test") vm = VPCSDevice("test", 42, port_manager, working_dir=str(tmpdir))
assert vm.name == "test" assert vm.name == "test"
assert vm.id == 42 assert vm.id == 42
@patch("subprocess.check_output", return_value="Welcome to Virtual PC Simulator, version 0.1".encode("utf-8")) @patch("subprocess.check_output", return_value="Welcome to Virtual PC Simulator, version 0.1".encode("utf-8"))
def test_vm_invalid_vpcs_version(tmpdir, port_manager): def test_vm_invalid_vpcs_version(tmpdir, port_manager):
with pytest.raises(VPCSError): with pytest.raises(VPCSError):
vm = VPCSDevice("test", 42, port_manager, working_dir=str(tmpdir), path="/bin/test") vm = VPCSDevice("test", 42, port_manager, working_dir=str(tmpdir))
assert vm.name == "test" assert vm.name == "test"
assert vm.id == 42 assert vm.id == 42
@patch("gns3server.config.Config.get_section_config", return_value = {"path": "/bin/test_fake"})
def test_vm_invalid_vpcs_path(tmpdir, port_manager): def test_vm_invalid_vpcs_path(tmpdir, port_manager):
with pytest.raises(VPCSError): with pytest.raises(VPCSError):
vm = VPCSDevice("test", 42, port_manager, working_dir=str(tmpdir), path="/bin/test_fake") vm = VPCSDevice("test", 42, port_manager, working_dir=str(tmpdir))
assert vm.name == "test" assert vm.name == "test"
assert vm.id == 42 assert vm.id == 42
def test_start(tmpdir, loop, port_manager): def test_start(tmpdir, loop, port_manager):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()): with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()):
vm = VPCSDevice("test", 42, port_manager, working_dir=str(tmpdir), path="/bin/test") vm = VPCSDevice("test", 42, port_manager, working_dir=str(tmpdir))
nio = vm.port_add_nio_binding(0, {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
loop.run_until_complete(asyncio.async(vm.start())) loop.run_until_complete(asyncio.async(vm.start()))
assert vm.is_running() == True assert vm.is_running() == True
def test_stop(tmpdir, loop, port_manager): def test_stop(tmpdir, loop, port_manager):
process = MagicMock() process = MagicMock()
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process): with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
vm = VPCSDevice("test", 42, port_manager, working_dir=str(tmpdir), path="/bin/test") vm = VPCSDevice("test", 42, port_manager, working_dir=str(tmpdir))
nio = vm.port_add_nio_binding(0, {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
loop.run_until_complete(asyncio.async(vm.start())) loop.run_until_complete(asyncio.async(vm.start()))
assert vm.is_running() == True assert vm.is_running() == True
loop.run_until_complete(asyncio.async(vm.stop())) loop.run_until_complete(asyncio.async(vm.stop()))
@ -62,18 +67,18 @@ def test_stop(tmpdir, loop, port_manager):
process.terminate.assert_called_with() process.terminate.assert_called_with()
def test_add_nio_binding_udp(port_manager, tmpdir): def test_add_nio_binding_udp(port_manager, tmpdir):
vm = VPCSDevice("test", 42, port_manager, working_dir=str(tmpdir), path="/bin/test") vm = VPCSDevice("test", 42, port_manager, working_dir=str(tmpdir))
nio = vm.port_add_nio_binding(0, {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"}) nio = vm.port_add_nio_binding(0, {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
assert nio.lport == 4242 assert nio.lport == 4242
def test_add_nio_binding_tap(port_manager, tmpdir): def test_add_nio_binding_tap(port_manager, tmpdir):
vm = VPCSDevice("test", 42, port_manager, working_dir=str(tmpdir), path="/bin/test") vm = VPCSDevice("test", 42, port_manager, working_dir=str(tmpdir))
with patch("gns3server.modules.vpcs.vpcs_device.has_privileged_access", return_value=True): with patch("gns3server.modules.vpcs.vpcs_device.has_privileged_access", return_value=True):
nio = vm.port_add_nio_binding(0, {"type": "nio_tap", "tap_device": "test"}) nio = vm.port_add_nio_binding(0, {"type": "nio_tap", "tap_device": "test"})
assert nio.tap_device == "test" assert nio.tap_device == "test"
def test_add_nio_binding_tap_no_privileged_access(port_manager, tmpdir): def test_add_nio_binding_tap_no_privileged_access(port_manager, tmpdir):
vm = VPCSDevice("test", 42, port_manager, working_dir=str(tmpdir), path="/bin/test") vm = VPCSDevice("test", 42, port_manager, working_dir=str(tmpdir))
with patch("gns3server.modules.vpcs.vpcs_device.has_privileged_access", return_value=False): with patch("gns3server.modules.vpcs.vpcs_device.has_privileged_access", return_value=False):
with pytest.raises(VPCSError): with pytest.raises(VPCSError):
vm.port_add_nio_binding(0, {"type": "nio_tap", "tap_device": "test"}) vm.port_add_nio_binding(0, {"type": "nio_tap", "tap_device": "test"})