Drop Windows support

pull/2025/head
grossmj 2 years ago
parent 4fb0cd9321
commit 74c675d5b0

@ -230,16 +230,6 @@ async def start_capture(
"""
pcap_file_path = os.path.join(node.project.capture_working_directory(), node_capture_data.capture_file_name)
if sys.platform.startswith("win"):
# FIXME: Dynamips (Cygwin actually) doesn't like non ascii paths on Windows
try:
pcap_file_path.encode("ascii")
except UnicodeEncodeError:
raise DynamipsError(
f"The capture file path '{pcap_file_path}' must only contain ASCII (English) characters"
)
await node.start_capture(adapter_number, port_number, pcap_file_path, node_capture_data.data_link_type)
return {"pcap_file_path": pcap_file_path}

@ -32,10 +32,7 @@ if (
or os.environ.get("PYTEST_BUILD_DOCUMENTATION") == "1"
):
# IOU & Docker only runs on Linux but test suite works on UNIX platform
if not sys.platform.startswith("win"):
from .docker import Docker
MODULES.append(Docker)
from .iou import IOU
MODULES.append(IOU)
from .docker import Docker
from .iou import IOU
MODULES.append(Docker)
MODULES.append(IOU)

@ -301,10 +301,6 @@ class BaseManager:
:returns: True or False
"""
if sys.platform.startswith("win"):
# do not check anything on Windows
return True
if sys.platform.startswith("darwin"):
if os.stat(executable).st_uid == 0:
return True
@ -425,11 +421,10 @@ class BaseManager:
valid_directory_prefices.append(extra_dir)
# Windows path should not be send to a unix server
if not sys.platform.startswith("win"):
if re.match(r"^[A-Z]:", path) is not None:
raise NodeError(
f"'{path}' is not allowed on this remote server. Please only use a file from '{img_directory}'"
)
if re.match(r"^[A-Z]:", path) is not None:
raise NodeError(
f"'{path}' is not allowed on this remote server. Please only use a file from '{img_directory}'"
)
if not os.path.isabs(orig_path):

@ -892,34 +892,6 @@ class BaseNode:
await self._ubridge_send(
'bridge add_nio_linux_raw {name} "{interface}"'.format(name=bridge_name, interface=ethernet_interface)
)
elif sys.platform.startswith("win"):
# on Windows we use Winpcap/Npcap
windows_interfaces = interfaces()
npf_id = None
source_mac = None
for interface in windows_interfaces:
# Winpcap/Npcap uses a NPF ID to identify an interface on Windows
if "netcard" in interface and ethernet_interface in interface["netcard"]:
npf_id = interface["id"]
source_mac = interface["mac_address"]
elif ethernet_interface in interface["name"]:
npf_id = interface["id"]
source_mac = interface["mac_address"]
if npf_id:
await self._ubridge_send(
'bridge add_nio_ethernet {name} "{interface}"'.format(name=bridge_name, interface=npf_id)
)
else:
raise NodeError(f"Could not find NPF id for interface {ethernet_interface}")
if block_host_traffic:
if source_mac:
await self._ubridge_send(
'bridge set_pcap_filter {name} "not ether src {mac}"'.format(name=bridge_name, mac=source_mac)
)
log.info(f"PCAP filter applied on '{ethernet_interface}' for source MAC {source_mac}")
else:
log.warning(f"Could not block host network traffic on {ethernet_interface} (no MAC address found)")
else:
# on other platforms we just rely on the pcap library
await self._ubridge_send(

@ -310,30 +310,26 @@ class Cloud(BaseNode):
"uBridge requires root access or the capability to interact with Ethernet and TAP adapters"
)
if sys.platform.startswith("win"):
await self._add_ubridge_ethernet_connection(bridge_name, port_info["interface"])
if port_info["type"] == "ethernet":
network_interfaces = [interface["name"] for interface in self._interfaces()]
if not port_info["interface"] in network_interfaces:
raise NodeError(
f"Interface '{port_info['interface']}' could not be found on this system, please update '{self.name}'"
)
else:
if port_info["type"] == "ethernet":
network_interfaces = [interface["name"] for interface in self._interfaces()]
if not port_info["interface"] in network_interfaces:
raise NodeError(
f"Interface '{port_info['interface']}' could not be found on this system, please update '{self.name}'"
)
if sys.platform.startswith("linux"):
await self._add_linux_ethernet(port_info, bridge_name)
elif sys.platform.startswith("darwin"):
await self._add_osx_ethernet(port_info, bridge_name)
else:
await self._add_windows_ethernet(port_info, bridge_name)
elif port_info["type"] == "tap":
await self._ubridge_send(
'bridge add_nio_tap {name} "{interface}"'.format(
name=bridge_name, interface=port_info["interface"]
)
if sys.platform.startswith("linux"):
await self._add_linux_ethernet(port_info, bridge_name)
elif sys.platform.startswith("darwin"):
await self._add_osx_ethernet(port_info, bridge_name)
else:
await self._add_windows_ethernet(port_info, bridge_name)
elif port_info["type"] == "tap":
await self._ubridge_send(
'bridge add_nio_tap {name} "{interface}"'.format(
name=bridge_name, interface=port_info["interface"]
)
)
elif port_info["type"] == "udp":
await self._ubridge_send(

@ -348,23 +348,10 @@ class Dynamips(BaseManager):
nio.suspend = nio_settings.get("suspend", False)
elif nio_settings["type"] == "nio_generic_ethernet":
ethernet_device = nio_settings["ethernet_device"]
if sys.platform.startswith("win"):
# replace the interface name by the GUID on Windows
windows_interfaces = interfaces()
npf_interface = None
for interface in windows_interfaces:
if interface["name"] == ethernet_device:
npf_interface = interface["id"]
if not npf_interface:
raise DynamipsError(f"Could not find interface {ethernet_device} on this host")
else:
ethernet_device = npf_interface
if not is_interface_up(ethernet_device):
raise DynamipsError(f"Ethernet interface {ethernet_device} is down")
nio = NIOGenericEthernet(node.hypervisor, ethernet_device)
elif nio_settings["type"] == "nio_linux_ethernet":
if sys.platform.startswith("win"):
raise DynamipsError("This NIO type is not supported on Windows")
ethernet_device = nio_settings["ethernet_device"]
nio = NIOLinuxEthernet(node.hypervisor, ethernet_device)
elif nio_settings["type"] == "nio_tap":
@ -564,7 +551,6 @@ class Dynamips(BaseManager):
await vm.set_idlepc("0x0")
was_auto_started = False
old_priority = None
try:
status = await vm.get_status()
if status != "running":
@ -576,8 +562,6 @@ class Dynamips(BaseManager):
if not idlepcs:
raise DynamipsError("No Idle-PC values found")
if sys.platform.startswith("win"):
old_priority = vm.set_process_priority_windows(vm.hypervisor.process.pid)
for idlepc in idlepcs:
match = re.search(r"^0x[0-9a-f]{8}$", idlepc.split()[0])
if not match:
@ -606,8 +590,6 @@ class Dynamips(BaseManager):
except DynamipsError:
raise
finally:
if old_priority is not None:
vm.set_process_priority_windows(vm.hypervisor.process.pid, old_priority)
if was_auto_started:
await vm.stop()
return validated_idlepc

@ -118,11 +118,6 @@ class Hypervisor(DynamipsHypervisor):
self._command = self._build_command()
env = os.environ.copy()
if sys.platform.startswith("win"):
# add the Npcap directory to $PATH to force Dynamips to use npcap DLL instead of Winpcap (if installed)
system_root = os.path.join(os.path.expandvars("%SystemRoot%"), "System32", "Npcap")
if os.path.isdir(system_root):
env["PATH"] = system_root + ";" + env["PATH"]
try:
log.info(f"Starting Dynamips: {self._command}")
self._stdout_file = os.path.join(self.working_dir, f"dynamips_i{self._id}_stdout.txt")

@ -103,10 +103,7 @@ class Router(BaseNode):
self._idlesleep = 30
self._ghost_file = ""
self._ghost_status = 0
if sys.platform.startswith("win"):
self._exec_area = 16 # 16 MB by default on Windows (Cygwin)
else:
self._exec_area = 64 # 64 MB on other systems
self._exec_area = 64
self._disk0 = 0 # Megabytes
self._disk1 = 0 # Megabytes
self._auto_delete_disks = False
@ -711,29 +708,6 @@ class Router(BaseNode):
log.info(f'Router "{self._name}" [{self._id}]: idle-PC set to {idlepc}')
self._idlepc = idlepc
def set_process_priority_windows(self, pid, priority=None):
"""
Sets process priority on Windows
:param pid: process PID
"""
import win32api
import win32process
import win32con
import pywintypes
old_priority = None
try:
handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, True, pid)
old_priority = win32process.GetPriorityClass(handle)
if priority is None:
priority = win32process.BELOW_NORMAL_PRIORITY_CLASS
win32process.SetPriorityClass(handle, priority)
except pywintypes.error as e:
log.error(f"Cannot set priority for Dynamips process (PID={pid}) ")
return old_priority
async def get_idle_pc_prop(self):
"""
Gets the idle PC proposals.
@ -751,13 +725,8 @@ class Router(BaseNode):
await asyncio.sleep(20) # leave time to the router to boot
log.info(f'Router "{self._name}" [{self._id}] has started calculating Idle-PC values')
old_priority = None
if sys.platform.startswith("win"):
old_priority = self.set_process_priority_windows(self._hypervisor.process.pid)
begin = time.time()
idlepcs = await self._hypervisor.send(f'vm get_idle_pc_prop "{self._name}" 0')
if old_priority is not None:
self.set_process_priority_windows(self._hypervisor.process.pid, old_priority)
log.info(
'Router "{name}" [{id}] has finished calculating Idle-PC values after {time:.4f} seconds'.format(
name=self._name, id=self._id, time=time.time() - begin

@ -114,29 +114,15 @@ class Qemu(BaseManager):
else:
log.warning("The PATH environment variable doesn't exist")
# look for Qemu binaries in the current working directory and $PATH
if sys.platform.startswith("win"):
# add specific Windows paths
if hasattr(sys, "frozen"):
# add any qemu dir in the same location as gns3server.exe to the list of paths
if sys.platform.startswith("darwin") and hasattr(sys, "frozen"):
# add specific locations on Mac OS X regardless of what's in $PATH
paths.update(["/usr/bin", "/usr/local/bin", "/opt/local/bin"])
try:
exec_dir = os.path.dirname(os.path.abspath(sys.executable))
for f in os.listdir(exec_dir):
if f.lower().startswith("qemu"):
paths.add(os.path.join(exec_dir, f))
if "PROGRAMFILES(X86)" in os.environ and os.path.exists(os.environ["PROGRAMFILES(X86)"]):
paths.add(os.path.join(os.environ["PROGRAMFILES(X86)"], "qemu"))
if "PROGRAMFILES" in os.environ and os.path.exists(os.environ["PROGRAMFILES"]):
paths.add(os.path.join(os.environ["PROGRAMFILES"], "qemu"))
elif sys.platform.startswith("darwin"):
if hasattr(sys, "frozen"):
# add specific locations on Mac OS X regardless of what's in $PATH
paths.update(["/usr/bin", "/usr/local/bin", "/opt/local/bin"])
try:
exec_dir = os.path.dirname(os.path.abspath(sys.executable))
paths.add(os.path.abspath(os.path.join(exec_dir, "qemu/bin")))
# If the user run the server by hand from outside
except FileNotFoundError:
paths.add("/Applications/GNS3.app/Contents/MacOS/qemu/bin")
paths.add(os.path.abspath(os.path.join(exec_dir, "qemu/bin")))
# If the user run the server by hand from outside
except FileNotFoundError:
paths.add("/Applications/GNS3.app/Contents/MacOS/qemu/bin")
return paths
@staticmethod
@ -205,31 +191,16 @@ class Qemu(BaseManager):
:param qemu_path: path to Qemu executable.
"""
if sys.platform.startswith("win"):
# Qemu on Windows doesn't return anything with parameter -version
# look for a version number in version.txt file in the same directory instead
version_file = os.path.join(os.path.dirname(qemu_path), "version.txt")
if os.path.isfile(version_file):
try:
with open(version_file, "rb") as file:
version = file.read().decode("utf-8").strip()
match = re.search(r"[0-9\.]+", version)
if match:
return version
except (UnicodeDecodeError, OSError) as e:
log.warning(f"could not read {version_file}: {e}")
return ""
else:
try:
output = await subprocess_check_output(qemu_path, "-version", "-nographic")
match = re.search(r"version\s+([0-9a-z\-\.]+)", output)
if match:
version = match.group(1)
return version
else:
raise QemuError(f"Could not determine the Qemu version for {qemu_path}")
except (OSError, subprocess.SubprocessError) as e:
raise QemuError(f"Error while looking for the Qemu version: {e}")
try:
output = await subprocess_check_output(qemu_path, "-version", "-nographic")
match = re.search(r"version\s+([0-9a-z\-\.]+)", output)
if match:
version = match.group(1)
return version
else:
raise QemuError(f"Could not determine the Qemu version for {qemu_path}")
except (OSError, subprocess.SubprocessError) as e:
raise QemuError(f"Error while looking for the Qemu version: {e}")
@staticmethod
async def _get_qemu_img_version(qemu_img_path):
@ -250,38 +221,6 @@ class Qemu(BaseManager):
except (OSError, subprocess.SubprocessError) as e:
raise QemuError(f"Error while looking for the Qemu-img version: {e}")
@staticmethod
def get_haxm_windows_version():
"""
Gets the HAXM version number (Windows).
:returns: HAXM version number. Returns None if HAXM is not installed.
"""
assert sys.platform.startswith("win")
import winreg
hkey = winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\Installer\UserData\S-1-5-18\Products"
)
version = None
for index in range(winreg.QueryInfoKey(hkey)[0]):
product_id = winreg.EnumKey(hkey, index)
try:
product_key = winreg.OpenKey(hkey, fr"{product_id}\InstallProperties")
try:
if winreg.QueryValueEx(product_key, "DisplayName")[0].endswith(
"Hardware Accelerated Execution Manager"
):
version = winreg.QueryValueEx(product_key, "DisplayVersion")[0]
break
finally:
winreg.CloseKey(product_key)
except OSError:
continue
winreg.CloseKey(hkey)
return version
@staticmethod
def get_legacy_vm_workdir(legacy_vm_id, name):
"""

@ -32,8 +32,9 @@ import gns3server
import subprocess
import time
import json
import shlex
from gns3server.utils import parse_version, shlex_quote
from gns3server.utils import parse_version
from gns3server.utils.asyncio import subprocess_check_output, cancellable_wait_run_in_executor
from .qemu_error import QemuError
from .utils.qcow2 import Qcow2, Qcow2Error
@ -220,8 +221,6 @@ class QemuVM(BaseNode):
"""
if qemu_path and os.pathsep not in qemu_path:
if sys.platform.startswith("win") and ".exe" not in qemu_path.lower():
qemu_path += "w.exe"
new_qemu_path = shutil.which(qemu_path, path=os.pathsep.join(self._manager.paths_list()))
if new_qemu_path is None:
raise QemuError(f"QEMU binary path {qemu_path} is not found in the path")
@ -271,10 +270,7 @@ class QemuVM(BaseNode):
def platform(self, platform):
self._platform = platform
if sys.platform.startswith("win"):
self.qemu_path = f"qemu-system-{platform}w.exe"
else:
self.qemu_path = f"qemu-system-{platform}"
self.qemu_path = f"qemu-system-{platform}"
def _disk_setter(self, variable, value):
"""
@ -901,8 +897,8 @@ class QemuVM(BaseNode):
options = options.replace("-enable-kvm", "-machine accel=kvm")
if "-enable-hax" in options:
if not sys.platform.startswith("win"):
# HAXM is only available on Windows
if not sys.platform.startswith("darwin"):
# HAXM is only available on macOS
options = options.replace("-enable-hax", "")
else:
options = options.replace("-enable-hax", "-machine accel=hax")
@ -1002,52 +998,25 @@ class QemuVM(BaseNode):
if self._process_priority == "normal":
return
if sys.platform.startswith("win"):
try:
import win32api
import win32con
import win32process
except ImportError:
log.error(f"pywin32 must be installed to change the priority class for QEMU VM {self._name}")
else:
log.info(f"Setting QEMU VM {self._name} priority class to {self._process_priority}")
handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, 0, self._process.pid)
if self._process_priority == "realtime":
priority = win32process.REALTIME_PRIORITY_CLASS
elif self._process_priority == "very high":
priority = win32process.HIGH_PRIORITY_CLASS
elif self._process_priority == "high":
priority = win32process.ABOVE_NORMAL_PRIORITY_CLASS
elif self._process_priority == "low":
priority = win32process.BELOW_NORMAL_PRIORITY_CLASS
elif self._process_priority == "very low":
priority = win32process.IDLE_PRIORITY_CLASS
else:
priority = win32process.NORMAL_PRIORITY_CLASS
try:
win32process.SetPriorityClass(handle, priority)
except win32process.error as e:
log.error(f'Could not change process priority for QEMU VM "{self._name}": {e}')
if self._process_priority == "realtime":
priority = -20
elif self._process_priority == "very high":
priority = -15
elif self._process_priority == "high":
priority = -5
elif self._process_priority == "low":
priority = 5
elif self._process_priority == "very low":
priority = 19
else:
if self._process_priority == "realtime":
priority = -20
elif self._process_priority == "very high":
priority = -15
elif self._process_priority == "high":
priority = -5
elif self._process_priority == "low":
priority = 5
elif self._process_priority == "very low":
priority = 19
else:
priority = 0
try:
process = await asyncio.create_subprocess_exec(
"renice", "-n", str(priority), "-p", str(self._process.pid)
)
await process.wait()
except (OSError, subprocess.SubprocessError) as e:
log.error(f'Could not change process priority for QEMU VM "{self._name}": {e}')
priority = 0
try:
process = await asyncio.create_subprocess_exec(
"renice", "-n", str(priority), "-p", str(self._process.pid)
)
await process.wait()
except (OSError, subprocess.SubprocessError) as e:
log.error(f'Could not change process priority for QEMU VM "{self._name}": {e}')
def _stop_cpulimit(self):
"""
@ -1070,14 +1039,8 @@ class QemuVM(BaseNode):
return
try:
if sys.platform.startswith("win") and hasattr(sys, "frozen"):
cpulimit_exec = os.path.join(
os.path.dirname(os.path.abspath(sys.executable)), "cpulimit", "cpulimit.exe"
)
else:
cpulimit_exec = "cpulimit"
subprocess.Popen(
[cpulimit_exec, "--lazy", f"--pid={self._process.pid}", f"--limit={self._cpu_throttling}"],
["cpulimit", "--lazy", f"--pid={self._process.pid}", f"--limit={self._cpu_throttling}"],
cwd=self.working_dir,
)
log.info(f"CPU throttled to {self._cpu_throttling}%")
@ -1133,7 +1096,7 @@ class QemuVM(BaseNode):
self.check_available_ram(self.ram)
command = await self._build_command()
command_string = " ".join(shlex_quote(s) for s in command)
command_string = " ".join(shlex.quote(s) for s in command)
try:
log.info(f"Starting QEMU with: {command_string}")
self._stdout_file = os.path.join(self.working_dir, "qemu.log")
@ -1193,8 +1156,7 @@ class QemuVM(BaseNode):
if self.started:
log.info("QEMU process has stopped, return code: %d", returncode)
await self.stop()
# A return code of 1 seem fine on Windows
if returncode != 0 and (not sys.platform.startswith("win") or returncode != 1):
if returncode != 0:
self.project.emit(
"log.error",
{"message": f"QEMU process has stopped, return code: {returncode}\n{self.read_stdout()}"},
@ -1822,7 +1784,7 @@ class QemuVM(BaseNode):
self._qemu_img_stdout_file = os.path.join(self.working_dir, "qemu-img.log")
log.info(f"logging to {self._qemu_img_stdout_file}")
command_string = " ".join(shlex_quote(s) for s in command)
command_string = " ".join(shlex.quote(s) for s in command)
log.info(f"Executing qemu-img with: {command_string}")
with open(self._qemu_img_stdout_file, "w", encoding="utf-8") as fd:
process = await asyncio.create_subprocess_exec(
@ -2272,15 +2234,7 @@ class QemuVM(BaseNode):
require_hardware_accel = self.manager.config.settings.Qemu.require_hardware_acceleration
if enable_hardware_accel and "-machine accel=tcg" not in options:
# Turn OFF hardware acceleration for non x86 architectures
if sys.platform.startswith("win"):
supported_binaries = [
"qemu-system-x86_64.exe",
"qemu-system-x86_64w.exe",
"qemu-system-i386.exe",
"qemu-system-i386w.exe",
]
else:
supported_binaries = ["qemu-system-x86_64", "qemu-system-i386", "qemu-kvm"]
supported_binaries = ["qemu-system-x86_64", "qemu-system-i386", "qemu-kvm"]
if os.path.basename(qemu_path) not in supported_binaries:
if require_hardware_accel:
raise QemuError(
@ -2298,29 +2252,6 @@ class QemuVM(BaseNode):
)
else:
return False
elif sys.platform.startswith("win"):
if require_hardware_accel:
# HAXM is only available starting with Qemu version 2.9.0
version = await self.manager.get_qemu_version(self.qemu_path)
if version and parse_version(version) < parse_version("2.9.0"):
raise QemuError(
f"HAXM acceleration can only be enable for Qemu version 2.9.0 and above (current version: {version})"
)
# check if HAXM is installed
version = self.manager.get_haxm_windows_version()
if version is None:
raise QemuError("HAXM acceleration support is not installed on this host")
log.info(f"HAXM support version {version} detected")
# check if the HAXM service is running
from gns3server.utils.windows_service import check_windows_service_is_running
if not check_windows_service_is_running("intelhaxm"):
raise QemuError("Intel HAXM service is not running on this host")
else:
return False
elif sys.platform.startswith("darwin"):
process = await asyncio.create_subprocess_shell("kextstat | grep com.intel.kext.intelhaxm")
await process.wait()
@ -2440,7 +2371,7 @@ class QemuVM(BaseNode):
# https://github.com/GNS3/gns3-server/issues/685
if version and parse_version(version) >= parse_version("2.4.0") and self.platform == "x86_64":
command.extend(["-machine", "smm=off"])
elif sys.platform.startswith("win") or sys.platform.startswith("darwin"):
elif sys.platform.startswith("darwin"):
command.extend(["-enable-hax"])
command.extend(["-boot", f"order={self._boot_priority}"])
command.extend(self._bios_option())

@ -27,7 +27,6 @@ import re
from gns3server.utils import parse_version
from gns3server.utils.asyncio import wait_for_process_termination
from gns3server.utils.asyncio import monitor_process
from gns3server.utils.asyncio import subprocess_check_output
from .ubridge_hypervisor import UBridgeHypervisor
from .ubridge_error import UbridgeError
@ -139,7 +138,7 @@ class Hypervisor(UBridgeHypervisor):
match = re.search(r"ubridge version ([0-9a-z\.]+)", output)
if match:
self._version = match.group(1)
if sys.platform.startswith("win") or sys.platform.startswith("darwin"):
if sys.platform.startswith("darwin"):
minimum_required_version = "0.9.12"
else:
# uBridge version 0.9.14 is required for packet filters
@ -158,11 +157,6 @@ class Hypervisor(UBridgeHypervisor):
"""
env = os.environ.copy()
if sys.platform.startswith("win"):
# add the Npcap directory to $PATH to force uBridge to use npcap DLL instead of Winpcap (if installed)
system_root = os.path.join(os.path.expandvars("%SystemRoot%"), "System32", "Npcap")
if os.path.isdir(system_root):
env["PATH"] = system_root + ";" + env["PATH"]
await self._check_ubridge_version(env)
try:
command = self._build_command()

@ -62,16 +62,7 @@ class VirtualBox(BaseManager):
vboxmanage_path = shutil.which(vboxmanage_path)
else:
log.info("A path to VBoxManage has not been configured, trying to find it...")
if sys.platform.startswith("win"):
if "VBOX_INSTALL_PATH" in os.environ:
vboxmanage_path_windows = os.path.join(os.environ["VBOX_INSTALL_PATH"], "VBoxManage.exe")
if os.path.exists(vboxmanage_path_windows):
vboxmanage_path = vboxmanage_path_windows
elif "VBOX_MSI_INSTALL_PATH" in os.environ:
vboxmanage_path_windows = os.path.join(os.environ["VBOX_MSI_INSTALL_PATH"], "VBoxManage.exe")
if os.path.exists(vboxmanage_path_windows):
vboxmanage_path = vboxmanage_path_windows
elif sys.platform.startswith("darwin"):
if sys.platform.startswith("darwin"):
vboxmanage_path_osx = "/Applications/VirtualBox.app/Contents/MacOS/VBoxManage"
if os.path.exists(vboxmanage_path_osx):
vboxmanage_path = vboxmanage_path_osx

@ -38,10 +38,6 @@ from gns3server.compute.nios.nio_udp import NIOUDP
from gns3server.compute.adapters.ethernet_adapter import EthernetAdapter
from gns3server.compute.base_node import BaseNode
if sys.platform.startswith("win"):
import msvcrt
import win32file
import logging
log = logging.getLogger(__name__)
@ -839,14 +835,11 @@ class VirtualBoxVM(BaseNode):
:returns: pipe path (string)
"""
if sys.platform.startswith("win"):
pipe_name = fr"\\.\pipe\gns3_vbox\{self.id}"
else:
pipe_name = os.path.join(tempfile.gettempdir(), "gns3_vbox", f"{self.id}")
try:
os.makedirs(os.path.dirname(pipe_name), exist_ok=True)
except OSError as e:
raise VirtualBoxError(f"Could not create the VirtualBox pipe directory: {e}")
pipe_name = os.path.join(tempfile.gettempdir(), "gns3_vbox", f"{self.id}")
try:
os.makedirs(os.path.dirname(pipe_name), exist_ok=True)
except OSError as e:
raise VirtualBoxError(f"Could not create the VirtualBox pipe directory: {e}")
return pipe_name
async def _set_serial_console(self):

@ -27,11 +27,12 @@ import subprocess
import logging
import codecs
import ipaddress
import shlex
from collections import OrderedDict
from gns3server.utils.interfaces import interfaces
from gns3server.utils.asyncio import subprocess_check_output
from gns3server.utils import parse_version, shlex_quote
from gns3server.utils import parse_version
log = logging.getLogger(__name__)
@ -53,10 +54,7 @@ class VMware(BaseManager):
self._vmnets = []
self._vmnets_info = {}
self._vmnet_start_range = 2
if sys.platform.startswith("win"):
self._vmnet_end_range = 19
else:
self._vmnet_end_range = 255
self._vmnet_end_range = 255
@property
def vmrun_path(self):
@ -95,15 +93,7 @@ class VMware(BaseManager):
# look for vmrun
vmrun_path = self.config.settings.VMware.vmrun_path
if not vmrun_path:
if sys.platform.startswith("win"):
vmrun_path = shutil.which("vmrun")
if vmrun_path is None:
# look for vmrun.exe using the VMware Workstation directory listed in the registry
vmrun_path = self._find_vmrun_registry(r"SOFTWARE\Wow6432Node\VMware, Inc.\VMware Workstation")
if vmrun_path is None:
# look for vmrun.exe using the VIX directory listed in the registry
vmrun_path = self._find_vmrun_registry(r"SOFTWARE\Wow6432Node\VMware, Inc.\VMware VIX")
elif sys.platform.startswith("darwin"):
if sys.platform.startswith("darwin"):
vmrun_path = "/Applications/VMware Fusion.app/Contents/Library/vmrun"
else:
vmrun_path = "vmrun"
@ -197,84 +187,44 @@ class VMware(BaseManager):
Check VMware version
"""
if sys.platform.startswith("win"):
# look for vmrun.exe using the directory listed in the registry
ws_version = self._find_vmware_version_registry(r"SOFTWARE\Wow6432Node\VMware, Inc.\VMware Workstation")
if ws_version is None:
player_version = self._find_vmware_version_registry(r"SOFTWARE\Wow6432Node\VMware, Inc.\VMware Player")
if player_version:
log.debug(f"VMware Player version {player_version} detected")
await self._check_vmware_player_requirements(player_version)
else:
log.warning("Could not find VMware version")
self._host_type = "ws"
else:
log.debug(f"VMware Workstation version {ws_version} detected")
await self._check_vmware_workstation_requirements(ws_version)
else:
if sys.platform.startswith("darwin"):
if not os.path.isdir("/Applications/VMware Fusion.app"):
raise VMwareError(
"VMware Fusion is not installed in the standard location /Applications/VMware Fusion.app"
)
self._host_type = "fusion"
return # FIXME: no version checking on Mac OS X but we support all versions of fusion
vmware_path = VMware._get_linux_vmware_binary()
if vmware_path is None:
raise VMwareError("VMware is not installed (vmware or vmplayer executable could not be found in $PATH)")
try:
output = await subprocess_check_output(vmware_path, "-v")
match = re.search(r"VMware Workstation ([0-9]+)\.", output)
version = None
if match:
# VMware Workstation has been detected
version = match.group(1)
log.debug(f"VMware Workstation version {version} detected")
await self._check_vmware_workstation_requirements(version)
match = re.search(r"VMware Player ([0-9]+)\.", output)
if match:
# VMware Player has been detected
version = match.group(1)
log.debug(f"VMware Player version {version} detected")
await self._check_vmware_player_requirements(version)
if version is None:
log.warning(f"Could not find VMware version. Output of VMware: {output}")
raise VMwareError(f"Could not find VMware version. Output of VMware: {output}")
except (OSError, subprocess.SubprocessError) as e:
log.error(f"Error while looking for the VMware version: {e}")
raise VMwareError(f"Error while looking for the VMware version: {e}")
if sys.platform.startswith("darwin"):
if not os.path.isdir("/Applications/VMware Fusion.app"):
raise VMwareError(
"VMware Fusion is not installed in the standard location /Applications/VMware Fusion.app"
)
self._host_type = "fusion"
return # FIXME: no version checking on Mac OS X but we support all versions of fusion
@staticmethod
def _get_vmnet_interfaces_registry():
vmware_path = VMware._get_linux_vmware_binary()
if vmware_path is None:
raise VMwareError("VMware is not installed (vmware or vmplayer executable could not be found in $PATH)")
import winreg
vmnet_interfaces = []
regkey = r"SOFTWARE\Wow6432Node\VMware, Inc.\VMnetLib\VMnetConfig"
try:
hkey = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, regkey)
for index in range(winreg.QueryInfoKey(hkey)[0]):
vmnet = winreg.EnumKey(hkey, index)
hkeyvmnet = winreg.OpenKey(hkey, vmnet)
if winreg.QueryInfoKey(hkeyvmnet)[1]:
# the vmnet has not been configure if the key has no values
vmnet = vmnet.replace("vm", "VM")
if vmnet not in ("VMnet0", "VMnet1", "VMnet8"):
vmnet_interfaces.append(vmnet)
winreg.CloseKey(hkeyvmnet)
winreg.CloseKey(hkey)
except OSError as e:
raise VMwareError(f"Could not read registry key {regkey}: {e}")
return vmnet_interfaces
output = await subprocess_check_output(vmware_path, "-v")
match = re.search(r"VMware Workstation ([0-9]+)\.", output)
version = None
if match:
# VMware Workstation has been detected
version = match.group(1)
log.debug(f"VMware Workstation version {version} detected")
await self._check_vmware_workstation_requirements(version)
match = re.search(r"VMware Player ([0-9]+)\.", output)
if match:
# VMware Player has been detected
version = match.group(1)
log.debug(f"VMware Player version {version} detected")
await self._check_vmware_player_requirements(version)
if version is None:
log.warning(f"Could not find VMware version. Output of VMware: {output}")
raise VMwareError(f"Could not find VMware version. Output of VMware: {output}")
except (OSError, subprocess.SubprocessError) as e:
log.error(f"Error while looking for the VMware version: {e}")
raise VMwareError(f"Error while looking for the VMware version: {e}")
@staticmethod
def _get_vmnet_interfaces():
if sys.platform.startswith("win"):
return VMware._get_vmnet_interfaces_registry()
elif sys.platform.startswith("darwin"):
if sys.platform.startswith("darwin"):
vmware_networking_file = "/Library/Preferences/VMware Fusion/networking"
else:
# location on Linux
@ -310,17 +260,7 @@ class VMware(BaseManager):
vmnet_interfaces = []
for interface in interfaces():
if sys.platform.startswith("win"):
if "netcard" in interface:
windows_name = interface["netcard"]
else:
windows_name = interface["name"]
match = re.search(r"(VMnet[0-9]+)", windows_name)
if match:
vmnet = match.group(1)
if vmnet not in ("VMnet0", "VMnet1", "VMnet8"):
vmnet_interfaces.append(vmnet)
elif interface["name"].startswith("vmnet"):
if interface["name"].startswith("vmnet"):
vmnet = interface["name"]
if vmnet not in ("vmnet0", "vmnet1", "vmnet8"):
vmnet_interfaces.append(interface["name"])
@ -428,7 +368,7 @@ class VMware(BaseManager):
command = [vmrun_path, "-T", self.host_type, subcommand]
command.extend(args)
command_string = " ".join([shlex_quote(c) for c in command])
command_string = " ".join([shlex.quote(c) for c in command])
log.log(log_level, f"Executing vmrun with command: {command_string}")
try:
process = await asyncio.create_subprocess_exec(
@ -677,9 +617,7 @@ class VMware(BaseManager):
:returns: path to the inventory file
"""
if sys.platform.startswith("win"):
return os.path.expandvars(r"%APPDATA%\Vmware\Inventory.vmls")
elif sys.platform.startswith("darwin"):
if sys.platform.startswith("darwin"):
return os.path.expanduser("~/Library/Application Support/VMware Fusion/vmInventory")
else:
return os.path.expanduser("~/.vmware/inventory.vmls")
@ -692,9 +630,7 @@ class VMware(BaseManager):
:returns: path to the preferences file
"""
if sys.platform.startswith("win"):
return os.path.expandvars(r"%APPDATA%\VMware\preferences.ini")
elif sys.platform.startswith("darwin"):
if sys.platform.startswith("darwin"):
return os.path.expanduser("~/Library/Preferences/VMware Fusion/preferences")
else:
return os.path.expanduser("~/.vmware/preferences")
@ -707,15 +643,7 @@ class VMware(BaseManager):
:returns: path to the default VM directory
"""
if sys.platform.startswith("win"):
import ctypes
import ctypes.wintypes
path = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
ctypes.windll.shell32.SHGetFolderPathW(None, 5, None, 0, path)
documents_folder = path.value
return [fr"{documents_folder}\My Virtual Machines", fr"{documents_folder}\Virtual Machines"]
elif sys.platform.startswith("darwin"):
if sys.platform.startswith("darwin"):
return [os.path.expanduser("~/Documents/Virtual Machines.localized")]
else:
return [os.path.expanduser("~/vmware")]

@ -882,14 +882,11 @@ class VMwareVM(BaseNode):
:returns: pipe path (string)
"""
if sys.platform.startswith("win"):
pipe_name = fr"\\.\pipe\gns3_vmware\{self.id}"
else:
pipe_name = os.path.join(tempfile.gettempdir(), "gns3_vmware", f"{self.id}")
try:
os.makedirs(os.path.dirname(pipe_name), exist_ok=True)
except OSError as e:
raise VMwareError(f"Could not create the VMware pipe directory: {e}")
pipe_name = os.path.join(tempfile.gettempdir(), "gns3_vmware", f"{self.id}")
try:
os.makedirs(os.path.dirname(pipe_name), exist_ok=True)
except OSError as e:
raise VMwareError(f"Could not create the VMware pipe directory: {e}")
return pipe_name
def _set_serial_console(self):

@ -63,63 +63,32 @@ class Config:
appname = "GNS3"
version = f"{__version_info__[0]}.{__version_info__[1]}"
if sys.platform.startswith("win"):
# On windows, the configuration file location can be one of the following:
# 1: %APPDATA%/GNS3/gns3_server.ini
# 2: %APPDATA%/GNS3.ini
# 3: %COMMON_APPDATA%/GNS3/gns3_server.ini
# 4: %COMMON_APPDATA%/GNS3.ini
# 5: server.ini in the current working directory
appdata = os.path.expandvars("%APPDATA%")
common_appdata = os.path.expandvars("%COMMON_APPDATA%")
if self._profile:
legacy_user_dir = os.path.join(appdata, appname, "profiles", self._profile)
versioned_user_dir = os.path.join(appdata, appname, version, "profiles", self._profile)
else:
legacy_user_dir = os.path.join(appdata, appname)
versioned_user_dir = os.path.join(appdata, appname, version)
server_filename = "gns3_server.ini"
if self._files is None and not hasattr(sys, "_called_from_test"):
self._files = [
os.path.join(os.getcwd(), server_filename),
os.path.join(versioned_user_dir, server_filename),
os.path.join(appdata, appname + ".ini"),
os.path.join(common_appdata, appname, server_filename),
os.path.join(common_appdata, appname + ".ini"),
]
# On UNIX-like platforms, the configuration file location can be one of the following:
# 1: $HOME/.config/GNS3/gns3_server.conf
# 2: $HOME/.config/GNS3.conf
# 3: /etc/xdg/GNS3/gns3_server.conf
# 4: /etc/xdg/GNS3.conf
# 5: gns3_server.conf in the current working directory
home = os.path.expanduser("~")
server_filename = "gns3_server.conf"
if self._profile:
legacy_user_dir = os.path.join(home, ".config", appname, "profiles", self._profile)
versioned_user_dir = os.path.join(home, ".config", appname, version, "profiles", self._profile)
else:
# On UNIX-like platforms, the configuration file location can be one of the following:
# 1: $HOME/.config/GNS3/gns3_server.conf
# 2: $HOME/.config/GNS3.conf
# 3: /etc/xdg/GNS3/gns3_server.conf
# 4: /etc/xdg/GNS3.conf
# 5: gns3_server.conf in the current working directory
home = os.path.expanduser("~")
server_filename = "gns3_server.conf"
if self._profile:
legacy_user_dir = os.path.join(home, ".config", appname, "profiles", self._profile)
versioned_user_dir = os.path.join(home, ".config", appname, version, "profiles", self._profile)
else:
legacy_user_dir = os.path.join(home, ".config", appname)
versioned_user_dir = os.path.join(home, ".config", appname, version)
if self._files is None and not hasattr(sys, "_called_from_test"):
self._files = [
os.path.join(os.getcwd(), server_filename),
os.path.join(versioned_user_dir, server_filename),
os.path.join(home, ".config", appname + ".conf"),
os.path.join("/etc/gns3", server_filename),
os.path.join("/etc/xdg", appname, server_filename),
os.path.join("/etc/xdg", appname + ".conf"),
]
legacy_user_dir = os.path.join(home, ".config", appname)
versioned_user_dir = os.path.join(home, ".config", appname, version)
if self._files is None and not hasattr(sys, "_called_from_test"):
self._files = [
os.path.join(os.getcwd(), server_filename),
os.path.join(versioned_user_dir, server_filename),
os.path.join(home, ".config", appname + ".conf"),
os.path.join("/etc/gns3", server_filename),
os.path.join("/etc/xdg", appname, server_filename),
os.path.join("/etc/xdg", appname + ".conf"),
]
if self._files is None:
self._files = []
@ -182,10 +151,7 @@ class Config:
Return the server configuration file path.
"""
if sys.platform.startswith("win"):
server_config_filename = "gns3_server.ini"
else:
server_config_filename = "gns3_server.conf"
server_config_filename = "gns3_server.conf"
return os.path.join(self.config_dir, server_config_filename)
def clear(self):

@ -77,9 +77,6 @@ class Controller:
self._load_controller_settings()
if server_config.enable_ssl:
if sys.platform.startswith("win"):
log.critical("SSL mode is not supported on Windows")
raise SystemExit
self._ssl_context = self._create_ssl_context(server_config)
protocol = server_config.protocol

@ -146,9 +146,6 @@ def _patch_mtime(path):
:param path: file path
"""
if sys.platform.startswith("win"):
# only UNIX type platforms
return
st = os.stat(path)
file_date = datetime.fromtimestamp(st.st_mtime)
if file_date.year < 1980:

@ -243,22 +243,11 @@ class VirtualBoxGNS3VM(BaseGNS3VM):
)
if not (await self._check_vboxnet_exists(vboxnet)):
if sys.platform.startswith("win") and vboxnet == "vboxnet0":
# The GNS3 VM is configured with vboxnet0 by default which is not available
# on Windows. Try to patch this with the first available vboxnet we find.
first_available_vboxnet = await self._find_first_available_vboxnet()
if first_available_vboxnet is None:
raise GNS3VMError(
f'Please add a VirtualBox host-only network with DHCP enabled and attached it to network adapter {hostonly_interface_number} for "{self._vmname}"'
)
await self.set_hostonly_network(hostonly_interface_number, first_available_vboxnet)
vboxnet = first_available_vboxnet
else:
raise GNS3VMError(
'VirtualBox host-only network "{}" does not exist, please make the sure the network adapter {} configuration is valid for "{}"'.format(
vboxnet, hostonly_interface_number, self._vmname
)
raise GNS3VMError(
'VirtualBox host-only network "{}" does not exist, please make the sure the network adapter {} configuration is valid for "{}"'.format(
vboxnet, hostonly_interface_number, self._vmname
)
)
if not (await self._check_dhcp_server(vboxnet)):
raise GNS3VMError(f'DHCP must be enabled on VirtualBox host-only network "{vboxnet}"')

@ -42,16 +42,6 @@ def create_startup_handler(app: FastAPI) -> Callable:
logger = logging.getLogger("asyncio")
logger.setLevel(logging.ERROR)
if sys.platform.startswith("win"):
# Add a periodic callback to give a chance to process signals on Windows
# because asyncio.add_signal_handler() is not supported yet on that platform
# otherwise the loop runs outside of signal module's ability to trap signals.
def wakeup():
loop.call_later(0.5, wakeup)
loop.call_later(0.5, wakeup)
if log.getEffectiveLevel() == logging.DEBUG:
# On debug version we enable info that
# coroutine is not called in a way await/await

@ -38,7 +38,7 @@ class ColouredFormatter(logging.Formatter):
message = super().format(record)
if not colour or sys.platform.startswith("win"):
if not colour:
return message.replace("#RESET#", "")
level_no = record.levelno
@ -150,11 +150,6 @@ def init_logger(level, logfile=None, max_bytes=10000000, backup_count=10, compre
stream_handler.formatter = ColouredFormatter(
"{asctime} {levelname} {filename}:{lineno} {message}", "%Y-%m-%d %H:%M:%S", "{"
)
elif sys.platform.startswith("win"):
stream_handler = WinStreamHandler(sys.stdout)
stream_handler.formatter = ColouredFormatter(
"{asctime} {levelname} {filename}:{lineno} {message}", "%Y-%m-%d %H:%M:%S", "{"
)
else:
stream_handler = ColouredStreamHandler(sys.stdout)
stream_handler.formatter = ColouredFormatter(

@ -29,19 +29,6 @@ import gns3server.utils.get_resource
import os
import sys
import types
# To avoid strange bug later we switch the event loop before any other operation
if sys.platform.startswith("win"):
import asyncio
# use the Proactor event loop on Windows
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
if sys.platform.startswith("win"):
sys.modules["termios"] = types.ModuleType("termios")
def daemonize():
@ -77,9 +64,10 @@ def main():
Entry point for GNS3 server
"""
if not sys.platform.startswith("win"):
if "--daemon" in sys.argv:
daemonize()
if sys.platform.startswith("win"):
raise SystemExit("Windows is not a supported platform to run the GNS3 server")
if "--daemon" in sys.argv:
daemonize()
from gns3server.server import Server
Server().run()

@ -64,8 +64,8 @@ class Server:
or there: http://robjwells.com/post/61198832297/get-your-us-ascii-out-of-my-face
"""
# no need to check on Windows or when this application is frozen
if sys.platform.startswith("win") or hasattr(sys, "frozen"):
# no need to check when this application is frozen
if hasattr(sys, "frozen"):
return
language = encoding = None
@ -185,20 +185,11 @@ class Server:
except asyncio.CancelledError:
pass
signals = [] # SIGINT and SIGTERM are already registered by uvicorn
if sys.platform.startswith("win"):
signals.extend(["SIGBREAK"])
else:
signals.extend(["SIGHUP", "SIGQUIT"])
signals = ["SIGHUP", "SIGQUIT"] # SIGINT and SIGTERM are already registered by uvicorn
for signal_name in signals:
callback = functools.partial(signal_handler, signal_name)
if sys.platform.startswith("win"):
# add_signal_handler() is not yet supported on Windows
signal.signal(getattr(signal, signal_name), callback)
else:
loop = asyncio.get_event_loop()
loop.add_signal_handler(getattr(signal, signal_name), callback)
loop = asyncio.get_event_loop()
loop.add_signal_handler(getattr(signal, signal_name), callback)
@staticmethod
def _kill_ghosts():
@ -250,10 +241,6 @@ class Server:
args = self._parse_arguments(sys.argv[1:])
if args.daemon and sys.platform.startswith("win"):
log.critical("Daemon is not supported on Windows")
sys.exit(1)
if args.pid:
self._pid_lock(args.pid)
self._kill_ghosts()
@ -316,9 +303,6 @@ class Server:
access_log = True
if config.Server.enable_ssl:
if sys.platform.startswith("win"):
log.critical("SSL mode is not supported on Windows")
raise SystemExit
log.info("SSL is enabled")
config = uvicorn.Config(
@ -346,10 +330,6 @@ class Server:
loop = asyncio.get_event_loop()
loop.run_until_complete(server.serve())
except OSError as e:
# This is to ignore OSError: [WinError 0] The operation completed successfully exception on Windows.
if not sys.platform.startswith("win") or not e.winerror == 0:
raise
except Exception as e:
log.critical(f"Critical error while running the server: {e}", exc_info=1)
CrashReport.instance().capture_exception()

@ -89,14 +89,3 @@ def parse_version(version):
version.append("000000")
version.append("final")
return tuple(version)
def shlex_quote(s):
"""
Compatible shlex_quote to handle case where Windows needs double quotes around file names, not single quotes.
"""
if sys.platform.startswith("win"):
return s if re.match(r"^[-_\w./]+$", s) else '"%s"' % s.replace('"', '\\"')
else:
return shlex.quote(s)

@ -136,22 +136,6 @@ async def wait_for_file_creation(path, timeout=60):
raise asyncio.TimeoutError()
async def wait_for_named_pipe_creation(pipe_path, timeout=60):
import win32pipe
import pywintypes
while timeout > 0:
try:
win32pipe.WaitNamedPipe(pipe_path, 1)
except pywintypes.error:
await asyncio.sleep(0.5)
timeout -= 0.5
else:
return
raise asyncio.TimeoutError()
def locking(f):
@functools.wraps(f)
async def wrapper(oself, *args, **kwargs):

@ -15,19 +15,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import asyncio
from gns3server.utils.asyncio import wait_for_file_creation, wait_for_named_pipe_creation
from gns3server.utils.asyncio import wait_for_file_creation
from gns3server.compute.error import NodeError
"""
This module handle connection to unix socket or Windows named pipe
This module handle connection to unix socket
"""
if sys.platform.startswith("win"):
import win32file
import win32pipe
import msvcrt
class SerialReaderWriterProtocol(asyncio.Protocol):
@ -61,50 +56,6 @@ class SerialReaderWriterProtocol(asyncio.Protocol):
self._output.feed_eof()
class WindowsPipe:
"""
Write input and output stream to the same object
"""
def __init__(self, path):
self._handle = open(path, "a+b")
self._pipe = msvcrt.get_osfhandle(self._handle.fileno())
async def read(self, n=-1):
(read, num_avail, num_message) = win32pipe.PeekNamedPipe(self._pipe, 0)
if num_avail > 0:
(error_code, output) = win32file.ReadFile(self._pipe, num_avail, None)
return output
await asyncio.sleep(0.01)
return b""
def at_eof(self):
return False
def write(self, data):
win32file.WriteFile(self._pipe, data)
async def drain(self):
return
def close(self):
pass
async def _asyncio_open_serial_windows(path):
"""
Open a windows named pipe
:returns: An IO like object
"""
try:
await wait_for_named_pipe_creation(path)
except asyncio.TimeoutError:
raise NodeError(f'Pipe file "{path}" is missing')
return WindowsPipe(path)
async def _asyncio_open_serial_unix(path):
"""
Open a unix socket or a windows named pipe
@ -128,12 +79,9 @@ async def _asyncio_open_serial_unix(path):
async def asyncio_open_serial(path):
"""
Open a unix socket or a windows named pipe
Open an unix socket
:returns: An IO like object
"""
if sys.platform.startswith("win"):
return await _asyncio_open_serial_windows(path)
else:
return await _asyncio_open_serial_unix(path)
return await _asyncio_open_serial_unix(path)

@ -21,7 +21,6 @@ import socket
import struct
import psutil
from .windows_service import check_windows_service_is_running
from gns3server.compute.compute_error import ComputeError
from gns3server.config import Config
@ -197,55 +196,36 @@ def interfaces():
"""
results = []
if not sys.platform.startswith("win"):
allowed_interfaces = Config.instance().settings.Server.allowed_interfaces
net_if_addrs = psutil.net_if_addrs()
for interface in sorted(net_if_addrs.keys()):
if allowed_interfaces and interface not in allowed_interfaces and not interface.startswith("gns3tap"):
log.warning(f"Interface '{interface}' is not allowed to be used on this server")
continue
ip_address = ""
mac_address = ""
netmask = ""
interface_type = "ethernet"
for addr in net_if_addrs[interface]:
# get the first available IPv4 address only
if addr.family == socket.AF_INET:
ip_address = addr.address
netmask = addr.netmask
if addr.family == psutil.AF_LINK:
mac_address = addr.address
if interface.startswith("tap"):
# found no way to reliably detect a TAP interface
interface_type = "tap"
results.append(
{
"id": interface,
"name": interface,
"ip_address": ip_address,
"netmask": netmask,
"mac_address": mac_address,
"type": interface_type,
}
)
else:
try:
service_installed = True
if not check_windows_service_is_running("npf") and not check_windows_service_is_running("npcap"):
service_installed = False
else:
results = get_windows_interfaces()
except ImportError:
message = (
"pywin32 module is not installed, please install it on the server to get the available interface names"
)
raise ComputeError(message)
except Exception as e:
log.error(f"uncaught exception {type(e)}", exc_info=1)
raise ComputeError(f"uncaught exception: {e}")
if service_installed is False:
raise ComputeError("The Winpcap or Npcap is not installed or running")
allowed_interfaces = Config.instance().settings.Server.allowed_interfaces
net_if_addrs = psutil.net_if_addrs()
for interface in sorted(net_if_addrs.keys()):
if allowed_interfaces and interface not in allowed_interfaces and not interface.startswith("gns3tap"):
log.warning(f"Interface '{interface}' is not allowed to be used on this server")
continue
ip_address = ""
mac_address = ""
netmask = ""
interface_type = "ethernet"
for addr in net_if_addrs[interface]:
# get the first available IPv4 address only
if addr.family == socket.AF_INET:
ip_address = addr.address
netmask = addr.netmask
if addr.family == psutil.AF_LINK:
mac_address = addr.address
if interface.startswith("tap"):
# found no way to reliably detect a TAP interface
interface_type = "tap"
results.append(
{
"id": interface,
"name": interface,
"ip_address": ip_address,
"netmask": netmask,
"mac_address": mac_address,
"type": interface_type,
}
)
# This interface have special behavior
for result in results:

@ -1,138 +0,0 @@
#!/usr/bin/env python
#
# Copyright (C) 2016 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import os
import argparse
import shutil
import ipaddress
if sys.platform.startswith("win"):
import wmi
else:
raise SystemExit("This script must run on Windows!")
def parse_add_loopback():
"""
Validate params when adding a loopback adapter
"""
class Add(argparse.Action):
def __call__(self, parser, args, values, option_string=None):
try:
ipaddress.IPv4Interface(f"{values[1]}/{values[2]}")
except ipaddress.AddressValueError as e:
raise argparse.ArgumentTypeError(f"Invalid IP address: {e}")
except ipaddress.NetmaskValueError as e:
raise argparse.ArgumentTypeError(f"Invalid subnet mask: {e}")
setattr(args, self.dest, values)
return Add
def add_loopback(devcon_path, name, ip_address, netmask):
# save the list of network adapter in order to find the one we are about to add
previous_adapters = wmi.WMI().Win32_NetworkAdapter()
for adapter in previous_adapters:
if "Loopback" in adapter.Description and adapter.NetConnectionID == name:
raise SystemExit(f'Windows loopback adapter named "{name}" already exists')
# install a new Windows loopback adapter
os.system('"{}" install {}\\inf\\netloop.inf *MSLOOP'.format(devcon_path, os.path.expandvars("%WINDIR%")))
# configure the new Windows loopback adapter
for adapter in wmi.WMI().Win32_NetworkAdapter():
if "Loopback" in adapter.Description and adapter not in previous_adapters:
print(f'Renaming loopback adapter "{adapter.NetConnectionID}" to "{name}"')
adapter.NetConnectionID = name
for network_config in wmi.WMI().Win32_NetworkAdapterConfiguration(IPEnabled=True):
if network_config.InterfaceIndex == adapter.InterfaceIndex:
print(f'Configuring loopback adapter "{name}" with {ip_address} {netmask}')
retcode = network_config.EnableStatic(IPAddress=[ip_address], SubnetMask=[netmask])[0]
if retcode == 1:
print("A reboot is required")
elif retcode != 0:
print('Error while configuring IP/Subnet mask on "{}"')
# FIXME: support gateway?
# network_config.SetGateways(DefaultIPGateway=[""])
break
# restart winpcap/npcap services to take the new adapter into account
os.system("net stop npf")
os.system("net start npf")
os.system("net stop npcap")
os.system("net start npcap")
def remove_loopback(devcon_path, name):
deleted = False
for adapter in wmi.WMI().Win32_NetworkAdapter():
if "Loopback" in adapter.Description and adapter.NetConnectionID == name:
# remove a Windows loopback adapter
print(f'Removing loopback adapter "{name}"')
os.system(f'"{devcon_path}" remove @{adapter.PNPDeviceID}')
deleted = True
if not deleted:
raise SystemExit(f'Could not find adapter "{name}"')
# update winpcap/npcap services
os.system("net stop npf")
os.system("net start npf")
os.system("net stop npcap")
os.system("net start npcap")
def main():
"""
Entry point for the Windows loopback tool.
"""
parser = argparse.ArgumentParser(description="%(prog)s add/remove Windows loopback adapters")
parser.add_argument("-a", "--add", nargs=3, action=parse_add_loopback(), help="add a Windows loopback adapter")
parser.add_argument("-r", "--remove", action="store", help="remove a Windows loopback adapter")
try:
args = parser.parse_args()
except argparse.ArgumentTypeError as e:
raise SystemExit(e)
# devcon is required to install/remove Windows loopback adapters
devcon_path = shutil.which("devcon")
if not devcon_path:
raise SystemExit("Could not find devcon.exe")
from win32com.shell import shell
if not shell.IsUserAnAdmin():
raise SystemExit("You must run this script as an administrator")
try:
if args.add:
add_loopback(devcon_path, args.add[0], args.add[1], args.add[2])
if args.remove:
remove_loopback(devcon_path, args.remove)
except SystemExit as e:
print(e)
os.system("pause")
if __name__ == "__main__":
main()

@ -1,39 +0,0 @@
#!/usr/bin/env python
#
# Copyright (C) 2016 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Check for Windows service.
"""
from gns3server.compute.compute_error import ComputeError
def check_windows_service_is_running(service_name):
import pywintypes
import win32service
import win32serviceutil
try:
if win32serviceutil.QueryServiceStatus(service_name, None)[1] != win32service.SERVICE_RUNNING:
return False
except pywintypes.error as e:
if e.winerror == 1060:
return False
else:
raise ComputeError(f"Could not check if the {service_name} service is running: {e.strerror}")
return True

@ -79,8 +79,7 @@ setup(
entry_points={
"console_scripts": [
"gns3server = gns3server.main:main",
"gns3vmnet = gns3server.utils.vmnet:main",
"gns3loopback = gns3server.utils.windows_loopback:main"
"gns3vmnet = gns3server.utils.vmnet:main"
]
},
packages=find_packages(".", exclude=["docs", "tests*"]),

@ -29,7 +29,6 @@ from gns3server.utils.path import get_default_project_directory
pytestmark = pytest.mark.asyncio
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
async def test_get(app: FastAPI, compute_client: AsyncClient, windows_platform) -> None:
response = await compute_client.get(app.url_path_for("compute:get_capabilities"))
@ -43,7 +42,6 @@ async def test_get(app: FastAPI, compute_client: AsyncClient, windows_platform)
}
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
async def test_get_on_gns3vm(app: FastAPI, compute_client: AsyncClient, on_gns3vm) -> None:
response = await compute_client.get(app.url_path_for("compute:get_capabilities"))

@ -25,8 +25,7 @@ from unittest.mock import patch
from gns3server.compute.project import Project
pytestmark = [pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows"),
pytest.mark.asyncio]
pytestmark = [pytest.mark.asyncio]
@pytest.fixture

@ -209,7 +209,7 @@ async def test_download_image_forbidden(app: FastAPI, compute_client: AsyncClien
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.skipif(not sys.platform.startswith("win") and os.getuid() == 0, reason="Root can delete any image")
@pytest.mark.skipif(os.getuid() == 0, reason="Root can delete any image")
async def test_upload_image_permission_denied(app: FastAPI, compute_client: AsyncClient, images_dir: str) -> None:
os.makedirs(os.path.join(images_dir, "IOS"), exist_ok=True)

@ -28,8 +28,7 @@ from unittest.mock import patch
from gns3server.compute.project import Project
pytestmark = [pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows"),
pytest.mark.asyncio]
pytestmark = [pytest.mark.asyncio]
@pytest.fixture

@ -34,11 +34,7 @@ pytestmark = pytest.mark.asyncio
def fake_qemu_bin(monkeypatch, tmpdir) -> str:
monkeypatch.setenv("PATH", str(tmpdir))
if sys.platform.startswith("win"):
bin_path = os.path.join(os.environ["PATH"], "qemu-system-x86_64w.exe")
else:
bin_path = os.path.join(os.environ["PATH"], "qemu-system-x86_64")
bin_path = os.path.join(os.environ["PATH"], "qemu-system-x86_64")
with open(bin_path, "w+") as f:
f.write("1")
os.chmod(bin_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
@ -120,7 +116,6 @@ async def test_qemu_create_with_params(app: FastAPI,
assert response.json()["hda_disk_image_md5sum"] == "c4ca4238a0b923820dcc509a6f75849b"
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
async def test_qemu_create_with_project_file(app: FastAPI,
compute_client: AsyncClient,
compute_project: Project,
@ -411,7 +406,7 @@ async def test_download_image_forbidden_location(app: FastAPI, compute_client: A
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.skipif(not sys.platform.startswith("win") and os.getuid() == 0, reason="Root can delete any image")
@pytest.mark.skipif(os.getuid() == 0, reason="Root can delete any image")
async def test_upload_image_permission_denied(app: FastAPI, compute_client: AsyncClient, images_dir: str) -> None:
with open(os.path.join(images_dir, "QEMU", "test2.tmp"), "w+") as f:

@ -1401,7 +1401,6 @@ async def test_get_image_information(compute_project, manager):
mock.assert_called_with("GET", "images/ubuntu:latest/json")
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_mount_binds(vm):
@ -1476,7 +1475,6 @@ async def test_create_network_interfaces(vm):
assert "eth5" not in content
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_fix_permission(vm):
@ -1489,7 +1487,6 @@ async def test_fix_permission(vm):
assert process.wait.called
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_fix_permission_not_running(vm):

@ -44,7 +44,6 @@ def test_vm_invalid_dynamips_path(manager, config):
manager.find_dynamips()
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported by Windows")
def test_vm_non_executable_dynamips_path(manager, config):
tmpfile = tempfile.NamedTemporaryFile()

@ -26,14 +26,10 @@ import shutil
from tests.utils import asyncio_patch, AsyncioMagicMock
from unittest.mock import patch, MagicMock
pytestmark = pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
if not sys.platform.startswith("win"):
from gns3server.compute.iou.iou_vm import IOUVM
from gns3server.compute.iou.iou_error import IOUError
from gns3server.compute.iou import IOU
from unittest.mock import MagicMock
from gns3server.compute.iou.iou_vm import IOUVM
from gns3server.compute.iou.iou_error import IOUError
from gns3server.compute.iou import IOU
@pytest.fixture

@ -43,10 +43,7 @@ async def test_get_qemu_version():
with asyncio_patch("gns3server.compute.qemu.subprocess_check_output", return_value="QEMU emulator version 2.2.0, Copyright (c) 2003-2008 Fabrice Bellard"):
version = await Qemu.get_qemu_version("/tmp/qemu-test")
if sys.platform.startswith("win"):
assert version == ""
else:
assert version == "2.2.0"
assert version == "2.2.0"
@pytest.mark.asyncio
@ -62,10 +59,7 @@ async def test_binary_list(monkeypatch, tmpdir):
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
with asyncio_patch("gns3server.compute.qemu.subprocess_check_output", return_value="QEMU emulator version 2.2.0, Copyright (c) 2003-2008 Fabrice Bellard") as mock:
if sys.platform.startswith("win"):
version = ""
else:
version = "2.2.0"
version = "2.2.0"
qemus = await Qemu.binary_list()

@ -57,10 +57,7 @@ def fake_qemu_img_binary(monkeypatch, tmpdir):
def fake_qemu_binary(monkeypatch, tmpdir):
monkeypatch.setenv("PATH", str(tmpdir))
if sys.platform.startswith("win"):
bin_path = os.path.join(os.environ["PATH"], "qemu-system-x86_64w.exe")
else:
bin_path = os.path.join(os.environ["PATH"], "qemu-system-x86_64")
bin_path = os.path.join(os.environ["PATH"], "qemu-system-x86_64")
with open(bin_path, "w+") as f:
f.write("1")
os.chmod(bin_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
@ -190,7 +187,6 @@ async def test_termination_callback(vm):
assert event == vm
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_termination_callback_error(vm, tmpdir):
@ -276,10 +272,7 @@ def test_set_qemu_path(vm, tmpdir, fake_qemu_binary):
vm.qemu_path = None
# Should not crash with unicode characters
if sys.platform.startswith("win"):
path = str(tmpdir / "\u62FF" / "qemu-system-mipsw.exe")
else:
path = str(tmpdir / "\u62FF" / "qemu-system-mips")
path = str(tmpdir / "\u62FF" / "qemu-system-mips")
os.makedirs(str(tmpdir / "\u62FF"))
@ -291,10 +284,9 @@ def test_set_qemu_path(vm, tmpdir, fake_qemu_binary):
f.write("1")
# Raise because file is not executable
if not sys.platform.startswith("win"):
with pytest.raises(QemuError):
vm.qemu_path = path
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
with pytest.raises(QemuError):
vm.qemu_path = path
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
vm.qemu_path = path
assert vm.qemu_path == path
@ -314,8 +306,7 @@ def test_set_qemu_path_windows(vm):
bin_path = os.path.join(os.environ["PATH"], "qemu-system-x86_64w.EXE")
open(bin_path, "w+").close()
if not sys.platform.startswith("win"):
os.chmod(bin_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
os.chmod(bin_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
vm.qemu_path = bin_path
@ -327,8 +318,7 @@ def test_set_qemu_path_old_windows(vm):
bin_path = os.path.join(os.environ["PATH"], "qemu.exe")
open(bin_path, "w+").close()
if not sys.platform.startswith("win"):
os.chmod(bin_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
os.chmod(bin_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
vm.qemu_path = bin_path
@ -336,7 +326,6 @@ def test_set_qemu_path_old_windows(vm):
assert vm.platform == "i386"
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
def test_set_qemu_path_kvm_binary(vm, fake_qemu_binary):
bin_path = os.path.join(os.environ["PATH"], "qemu-kvm")
@ -358,10 +347,7 @@ async def test_set_platform(compute_project, manager):
with patch("shutil.which", return_value="/bin/qemu-system-x86_64") as which_mock:
with patch("gns3server.compute.qemu.QemuVM._check_qemu_path"):
vm = QemuVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", compute_project, manager, platform="x86_64")
if sys.platform.startswith("win"):
which_mock.assert_called_with("qemu-system-x86_64w.exe", path=mock.ANY)
else:
which_mock.assert_called_with("qemu-system-x86_64", path=mock.ANY)
which_mock.assert_called_with("qemu-system-x86_64", path=mock.ANY)
assert vm.platform == "x86_64"
assert vm.qemu_path == "/bin/qemu-system-x86_64"
@ -447,7 +433,6 @@ async def test_disk_options_multiple_disk(vm, tmpdir, fake_qemu_img_binary):
]
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_set_process_priority(vm, fake_qemu_img_binary):
@ -461,7 +446,6 @@ async def test_set_process_priority(vm, fake_qemu_img_binary):
assert args == ("renice", "-n", "5", "-p", "42")
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_set_process_priority_normal(vm, fake_qemu_img_binary):
@ -634,7 +618,6 @@ async def test_build_command_kvm_2_4(linux_platform, vm, fake_qemu_binary):
]
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_build_command_without_display(vm):
@ -757,7 +740,6 @@ async def test_build_command_large_number_of_adapters(vm):
await vm._build_command()
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_build_command_with_invalid_options(vm):

@ -162,7 +162,7 @@ async def test_project_delete():
assert os.path.exists(directory) is False
@pytest.mark.skipif(not sys.platform.startswith("win") and os.getuid() == 0, reason="Root can delete any project")
@pytest.mark.skipif(os.getuid() == 0, reason="Root can delete any project")
@pytest.mark.asyncio
async def test_project_delete_permission_issue():

@ -190,10 +190,7 @@ async def test_stop(vm):
await vm.stop()
assert vm.is_running() is False
if sys.platform.startswith("win"):
process.send_signal.assert_called_with(1)
else:
process.terminate.assert_called_with()
process.terminate.assert_called_with()
await queue.get(1) #  Ping
await queue.get(1) #  Started
@ -226,10 +223,7 @@ async def test_reload(vm):
await vm.reload()
assert vm.is_running() is True
if sys.platform.startswith("win"):
process.send_signal.assert_called_with(1)
else:
process.terminate.assert_called_with()
process.terminate.assert_called_with()
@pytest.mark.asyncio
@ -240,7 +234,6 @@ async def test_add_nio_binding_udp(vm):
assert nio.lport == 4242
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_add_nio_binding_tap(vm, ethernet_device):

@ -32,20 +32,6 @@ sys._called_from_test = True
sys.original_platform = sys.platform
if sys.platform.startswith("win") and sys.version_info < (3, 8):
@pytest.fixture(scope="session")
def event_loop(request):
"""
Overwrite pytest_asyncio event loop on Windows for Python < 3.8
As of Python 3.8, the default event loop on Windows is Proactor
"""
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
yield loop
asyncio.set_event_loop(None)
# https://github.com/pytest-dev/pytest-asyncio/issues/68
# this event_loop is used by pytest-asyncio, and redefining it
# is currently the only way of changing the scope of this fixture

@ -69,7 +69,6 @@ async def test_json(controller):
assert vm.asdict() == vm._settings
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not working well on Windows")
@pytest.mark.asyncio
async def test_update_settings(controller):
@ -88,7 +87,6 @@ async def test_update_settings(controller):
assert "vm" not in controller.computes
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not working well on Windows")
@pytest.mark.asyncio
async def test_auto_start(controller, dummy_gns3vm, dummy_engine):
"""
@ -106,7 +104,6 @@ async def test_auto_start(controller, dummy_gns3vm, dummy_engine):
assert controller.computes["vm"].password.get_secret_value() == "world"
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not working well on Windows")
@pytest.mark.asyncio
async def test_auto_start_with_error(controller, dummy_gns3vm, dummy_engine):

@ -134,7 +134,6 @@ async def test_init_path(tmpdir):
assert p.path == str(tmpdir)
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_changing_path_with_quote_not_allowed(tmpdir):

@ -45,7 +45,6 @@ async def test_exception_wait_run_in_executor():
await wait_run_in_executor(raise_exception)
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_subprocess_check_output(tmpdir):

@ -33,18 +33,13 @@ def test_interfaces():
assert "name" in interface
assert "ip_address" in interface
assert "mac_address" in interface
if sys.platform.startswith("win"):
assert "netcard" in interface
assert "type" in interface
assert "netmask" in interface
def test_has_netmask(config):
if sys.platform.startswith("win"):
# No loopback
pass
elif sys.platform.startswith("darwin"):
if sys.platform.startswith("darwin"):
assert has_netmask("lo0") is True
else:
assert has_netmask("lo") is True
@ -52,10 +47,7 @@ def test_has_netmask(config):
def test_is_interface_up():
if sys.platform.startswith("win"):
# is_interface_up() always returns True on Windows
pass
elif sys.platform.startswith("darwin"):
if sys.platform.startswith("darwin"):
assert is_interface_up("lo0") is True
else:
assert is_interface_up("lo") is True

Loading…
Cancel
Save