1
0
mirror of https://github.com/GNS3/gns3-server synced 2024-11-28 19:28:07 +00:00

Merge remote-tracking branch 'origin/3.0' into gh-pages

This commit is contained in:
github-actions 2022-03-28 00:58:55 +00:00
commit c02dfdbd5b
66 changed files with 453 additions and 1809 deletions

View File

@ -17,7 +17,7 @@ jobs:
strategy:
matrix:
python-version: ["3.6", "3.7", "3.8", "3.9", "3.10"]
python-version: ["3.7", "3.8", "3.9", "3.10"]
steps:
- uses: actions/checkout@v2

View File

@ -1,5 +1,17 @@
# Change Log
## 2.2.31 26/02/2022
* Install setuptools v59.6.0 when using Python 3.6
## 2.2.30 25/02/2022
* Support GNS3 variables in Docker environment variables. Fixes #2033
* Release web UI 2.2.30
* Set setuptools to v60.6.0
* qemu_vm.py Linked node test.
* Fix dead link in README.rst Fixes #2022
## 2.2.29 08/01/2022
* Release web UI 2.2.29

134
README.md Normal file
View File

@ -0,0 +1,134 @@
# GNS3 server repository
[![Style](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![GitHub Actions tests](https://github.com/GNS3/gns3-server/workflows/testing/badge.svg?branch=3.0)](https://github.com/GNS3/gns3-server/actions?query=workflow%3Atesting+branch%3A3.0)
[![Latest PyPi version](https://img.shields.io/pypi/v/gns3-server.svg)](https://pypi.python.org/pypi/gns3-server)
[![Snyk scanning](https://snyk.io/test/github/GNS3/gns3-server/badge.svg)](https://snyk.io/test/github/GNS3/gns3-server)
The GNS3 server manages emulators and other virtualization software such as Dynamips, Qemu/KVM, Docker, VPCS, VirtualBox and VMware Workstation.
Clients like the [GNS3 GUI](https://github.com/GNS3/gns3-gui/) and the [GNS3 Web UI](https://github.com/GNS3/gns3-web-ui>) control the server using a HTTP REST API.
## Installation
These instructions are for using GNS3, please see below for development.
### Windows & macOS
Please use our [Windows installer or DMG package](https://gns3.com/software/download) to install the stable build along with the GNS3 VM.
Note that as of GNS3 version above 3.0, you must run the server using the GNS3 VM or on a Linux system (remote, cloud or virtual machine).
### Linux
#### Ubuntu based distributions
We build and test packages for actively supported Ubuntu versions.
Other distros based on Ubuntu, like Mint, should also be supported.
Packages can be installed from our Personal Package Archives (PPA) repository:
```shell
sudo apt update
sudo apt install software-properties-common
sudo add-apt-repository ppa:gns3/ppa
sudo apt update
sudo apt install gns3-gui gns3-server
```
#### Other Linux distributions
GNS3 is often packaged for other distributions by third-parties:
* [Gentoo](https://packages.gentoo.org/package/net-misc/gns3-server)
* [Alpine](https://pkgs.alpinelinux.org/package/v3.10/community/x86_64/gns3-server)
* [NixOS](https://search.nixos.org/packages?channel=21.11&from=0&size=50&sort=relevance&type=packages&query=gns3-server)
#### PyPi
You may use PyPi in case no package is provided, or you would like to do a manual installation:
* https://pypi.org/project/gns3-server/
* https://pypi.org/project/gns3-gui/
```shell
python3 -m pip install gns3-gui
python3 -m pip install gns3-server
```
The downside of this method is you will have to manually install all dependencies (see below).
Please see our [documentation](https://docs.gns3.com/docs/getting-started/installation/linux) for more details.
### Software dependencies
In addition to Python dependencies, other software may be required, recommended or optional.
* [uBridge](https://github.com/GNS3/ubridge/) is required, it interconnects the nodes.
* [Dynamips](https://github.com/GNS3/dynamips/) is required for running IOS routers (using real IOS images) as well as the internal switches and hubs.
* [VPCS](https://github.com/GNS3/vpcs/) is recommended, it is a builtin node simulating a very simple computer to perform connectivity tests using ping, traceroute etc.
* Qemu is strongly recommended as most node types are based on Qemu, for example Cisco IOSv and Arista vEOS.
* libvirt is recommended as it's needed for the NAT cloud.
* Docker is optional, some nodes are based on Docker.
* mtools is recommended to support data transfer to/from QEMU VMs using virtual disks.
* i386-libraries of libc and libcrypto are optional, they are only needed to run IOU based nodes.
Note that Docker needs the script program (`bsdutils` or `util-linux` package), when running a Docker VM and a static busybox during installation (python3 setup.py install / pip3 install / package creation).
## Development
### Setting up
These commands will install the server as well as all Python dependencies:
```shell
git clone https://github.com/GNS3/gns3-server
cd gns3-server
git checkout 3.0
python3 -m venv venv-gns3server
source venv-gns3server/bin/activate
python3 setup.py install
python3 -m gns3server --local
```
You will have to manually install other software dependencies (see above), for Dynamips, VPCS and uBridge the easiest is to install from our PPA.
### Docker container
Alternatively, you can run the GNS3 server in a container
```shell
bash scripts/docker_dev_server.sh
```
### Running tests
First, install the development dependencies:
```shell
python3 -m pip install -r dev-requirements.txt
```
Then run the tests using pytest:
```shell
python3 -m pytest -vv tests/
```
### API documentation
The API documentation can be accessed when running the server locally:
* On `http://IP:PORT/docs` to see with Swagger UI (i.e. `http://localhost:3080/docs`)
* On `http://IP:PORT/redoc` to see with ReDoc (i.e. `http://localhost:3080/redoc`)
The documentation can also be viewed [online](http://apiv3.gns3.net) however it may not be the most up-to-date version since it needs manually synchronization with the current code. Also, you cannot use this to interact with a GNS3 server.
### Branches
#### master
master is the next stable release, you can test it in your day-to -day activities.
Bug fixes or small improvements pull requests go here.
3.x development brand for the next major release.
**Never** use this branch for production. Pull requests for major new features go here.

View File

@ -1,246 +0,0 @@
GNS3-server
===========
.. image:: https://img.shields.io/badge/code%20style-black-000000.svg
:target: https://github.com/psf/black
.. image:: https://github.com/GNS3/gns3-server/workflows/testing/badge.svg?branch=3.0
:target: https://github.com/GNS3/gns3-server/actions?query=workflow%3Atesting+branch%3A3.0
.. image:: https://img.shields.io/pypi/v/gns3-server.svg
:target: https://pypi.python.org/pypi/gns3-server
.. image:: https://snyk.io/test/github/GNS3/gns3-server/badge.svg
:target: https://snyk.io/test/github/GNS3/gns3-server
This is the GNS3 server repository.
The GNS3 server manages emulators such as Dynamips, VirtualBox or Qemu/KVM.
Clients like the `GNS3 GUI <https://github.com/GNS3/gns3-gui/>`_ and the `GNS3 Web UI <https://github.com/GNS3/gns3-web-ui>`_ control the server using a HTTP REST API.
Software dependencies
---------------------
In addition of Python dependencies listed in a section below, other software may be required, recommended or optional.
* `uBridge <https://github.com/GNS3/ubridge/>`_ is required, it interconnects the nodes.
* `Dynamips <https://github.com/GNS3/dynamips/>`_ is required for running IOS routers (using real IOS images) as well as the internal switches and hubs.
* `VPCS <https://github.com/GNS3/vpcs/>`_ is recommended, it is a builtin node simulating a very simple computer to perform connectitivy tests using ping, traceroute etc.
* Qemu is strongly recommended on Linux, as most node types are based on Qemu, for example Cisco IOSv and Arista vEOS.
* libvirt is recommended (Linux only), as it's needed for the NAT cloud.
* Docker is optional (Linux only), some nodes are based on Docker.
* mtools is recommended to support data transfer to/from QEMU VMs using virtual disks.
* i386-libraries of libc and libcrypto are optional (Linux only), they are only needed to run IOU based nodes.
Docker support
**************
Docker support needs the script program (`bsdutils` or `util-linux` package), when running a docker VM and a static busybox during installation (python3 setup.py install / pip3 install / package creation).
Branches
--------
master
******
master is the next stable release, you can test it in your day to day activities.
Bug fixes or small improvements pull requests go here.
2.x (2.3 for example)
*********************
Next major release
*Never* use this branch for production. Pull requests for major new features go here.
Linux
-----
GNS3 is perhaps packaged for your distribution:
* Gentoo: https://packages.gentoo.org/package/net-misc/gns3-server
* Alpine: https://pkgs.alpinelinux.org/package/v3.10/community/x86_64/gns3-server
* NixOS: https://search.nixos.org/packages?channel=21.11&from=0&size=50&sort=relevance&type=packages&query=gns3-server
Linux (Debian based)
--------------------
The following instructions have been tested with Ubuntu and Mint.
You must be connected to the Internet in order to install the dependencies.
Dependencies:
- Python 3.6, setuptools and the ones listed `here <https://github.com/GNS3/gns3-server/blob/master/requirements.txt>`_
The following commands will install some of these dependencies:
.. code:: bash
sudo apt-get install python3-setuptools
Finally these commands will install the server as well as the rest of the dependencies:
.. code:: bash
cd gns3-server-master
python3 -m venv venv-gns3server
source venv-gns3server/bin/activate
sudo python3 setup.py install
python3 -m gns3server --local
To run tests use:
.. code:: bash
python3 -m pytest tests
Docker container
****************
For development you can run the GNS3 server in a container
.. code:: bash
bash scripts/docker_dev_server.sh
Run as daemon (Unix only)
**************************
You will find init sample scripts for various systems
inside the init directory.
Usefull options:
* --daemon: start process as a daemon
* --log logfile: store output in a logfile
* --pid pidfile: store the pid of the running process in a file and prevent double execution
All init scripts require the creation of a GNS3 user. You can change it to another user.
.. code:: bash
sudo adduser gns3
upstart
-------
For ubuntu < 15.04
You need to copy init/gns3.conf.upstart to /etc/init/gns3.conf
.. code:: bash
sudo chown root /etc/init/gns3.conf
sudo service gns3 start
systemd
-------
You need to copy init/gns3.service.systemd to /lib/systemd/system/gns3.service
.. code:: bash
sudo chown root /lib/systemd/system/gns3.service
sudo systemctl start gns3
Windows
-------
Please use our `all-in-one installer <https://community.gns3.com/community/software/download>`_ to install the stable build.
If you install via source you need to first install:
- Python (3.3 or above) - https://www.python.org/downloads/windows/
- Pywin32 - https://sourceforge.net/projects/pywin32/
Then you can call
.. code:: bash
python setup.py install
to install the remaining dependencies.
To run the tests, you also need to call
.. code:: bash
pip install pytest pytest-capturelog
before actually running the tests with
.. code:: bash
python setup.py test
or with
.. code:: bash
py.test -v
Mac OS X
--------
Please use our DMG package for a simple installation.
If you want to test the current git version or contribute to the project,
you can follow these instructions with virtualenwrapper: http://virtualenvwrapper.readthedocs.org/
and homebrew: http://brew.sh/.
.. code:: bash
brew install python3
mkvirtualenv gns3-server --python=/usr/local/bin/python3.5
python3 setup.py install
gns3server
SSL
---
If you want enable SSL support on GNS3 you can generate a self signed certificate:
.. code:: bash
bash gns3server/cert_utils/create_cert.sh
This command will put the files in ~/.config/GNS3/ssl
After you can start the server in SSL mode with:
.. code:: bash
python gns3server/main.py --certfile ~/.config/GNS3/ssl/server.cert --certkey ~/.config/GNS3/ssl/server.key --ssl
Or in your gns3_server.conf by adding in the Server section:
.. code:: ini
[Server]
certfile=/Users/noplay/.config/GNS3/ssl/server.cert
certkey=/Users/noplay/.config/GNS3/ssl/server.key
ssl=True
Running tests
*************
Just run:
.. code:: bash
py.test -vv
If you want test coverage:
.. code:: bash
py.test --cov-report term-missing --cov=gns3server
Security issues
----------------
Please contact us using contact form available here:
http://docs.gns3.com/1ON9JBXSeR7Nt2-Qum2o3ZX0GU86BZwlmNSUgvmqNWGY/index.html

View File

@ -1,6 +1,6 @@
-r requirements.txt
pytest==6.2.5
pytest==7.0.0
flake8==4.0.1
pytest-timeout==2.0.1
pytest-asyncio==0.16.0

View File

@ -230,16 +230,6 @@ async def start_capture(
"""
pcap_file_path = os.path.join(node.project.capture_working_directory(), node_capture_data.capture_file_name)
if sys.platform.startswith("win"):
# FIXME: Dynamips (Cygwin actually) doesn't like non ascii paths on Windows
try:
pcap_file_path.encode("ascii")
except UnicodeEncodeError:
raise DynamipsError(
f"The capture file path '{pcap_file_path}' must only contain ASCII (English) characters"
)
await node.start_capture(adapter_number, port_number, pcap_file_path, node_capture_data.data_link_type)
return {"pcap_file_path": pcap_file_path}

View File

@ -24,7 +24,7 @@ from gns3server.db.repositories.base import BaseRepository
async def get_db_session(request: HTTPConnection) -> AsyncSession:
session = AsyncSession(request.app.state._db_engine, expire_on_commit=False)
async with AsyncSession(request.app.state._db_engine, expire_on_commit=False) as session:
try:
yield session
finally:

View File

@ -23,11 +23,13 @@ import logging
import urllib.parse
from fastapi import APIRouter, Request, Response, Depends, status
from starlette.requests import ClientDisconnect
from sqlalchemy.orm.exc import MultipleResultsFound
from typing import List, Optional
from gns3server import schemas
from gns3server.utils.images import InvalidImageError, default_images_directory, write_image
from gns3server.config import Config
from gns3server.utils.images import InvalidImageError, write_image
from gns3server.db.repositories.images import ImagesRepository
from gns3server.db.repositories.templates import TemplatesRepository
from gns3server.db.repositories.rbac import RbacRepository
@ -62,7 +64,6 @@ async def get_images(
async def upload_image(
image_path: str,
request: Request,
image_type: schemas.ImageType = schemas.ImageType.qemu,
images_repo: ImagesRepository = Depends(get_repository(ImagesRepository)),
templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository)),
current_user: schemas.User = Depends(get_current_active_user),
@ -72,24 +73,26 @@ async def upload_image(
"""
Upload an image.
Example: curl -X POST http://host:port/v3/images/upload/my_image_name.qcow2?image_type=qemu \
Example: curl -X POST http://host:port/v3/images/upload/my_image_name.qcow2 \
-H 'Authorization: Bearer <token>' --data-binary @"/path/to/image.qcow2"
"""
image_path = urllib.parse.unquote(image_path)
image_dir, image_name = os.path.split(image_path)
directory = default_images_directory(image_type)
full_path = os.path.abspath(os.path.join(directory, image_dir, image_name))
if os.path.commonprefix([directory, full_path]) != directory:
# check if the path is within the default images directory
base_images_directory = os.path.expanduser(Config.instance().settings.Server.images_path)
full_path = os.path.abspath(os.path.join(base_images_directory, image_dir, image_name))
if os.path.commonprefix([base_images_directory, full_path]) != base_images_directory:
raise ControllerForbiddenError(f"Cannot write image, '{image_path}' is forbidden")
print(image_path)
if await images_repo.get_image(image_path):
raise ControllerBadRequestError(f"Image '{image_path}' already exists")
try:
image = await write_image(image_name, image_type, full_path, request.stream(), images_repo)
except (OSError, InvalidImageError) as e:
raise ControllerError(f"Could not save {image_type} image '{image_path}': {e}")
image = await write_image(image_path, full_path, request.stream(), images_repo)
except (OSError, InvalidImageError, ClientDisconnect) as e:
raise ControllerError(f"Could not save image '{image_path}': {e}")
if install_appliances:
# attempt to automatically create templates based on image checksum
@ -100,7 +103,7 @@ async def upload_image(
templates_repo,
rbac_repo,
current_user,
directory
os.path.dirname(image.path)
)
return image

View File

@ -32,10 +32,7 @@ if (
or os.environ.get("PYTEST_BUILD_DOCUMENTATION") == "1"
):
# IOU & Docker only runs on Linux but test suite works on UNIX platform
if not sys.platform.startswith("win"):
from .docker import Docker
MODULES.append(Docker)
from .iou import IOU
MODULES.append(Docker)
MODULES.append(IOU)

View File

@ -301,10 +301,6 @@ class BaseManager:
:returns: True or False
"""
if sys.platform.startswith("win"):
# do not check anything on Windows
return True
if sys.platform.startswith("darwin"):
if os.stat(executable).st_uid == 0:
return True
@ -425,7 +421,6 @@ class BaseManager:
valid_directory_prefices.append(extra_dir)
# Windows path should not be send to a unix server
if not sys.platform.startswith("win"):
if re.match(r"^[A-Z]:", path) is not None:
raise NodeError(
f"'{path}' is not allowed on this remote server. Please only use a file from '{img_directory}'"

View File

@ -892,34 +892,6 @@ class BaseNode:
await self._ubridge_send(
'bridge add_nio_linux_raw {name} "{interface}"'.format(name=bridge_name, interface=ethernet_interface)
)
elif sys.platform.startswith("win"):
# on Windows we use Winpcap/Npcap
windows_interfaces = interfaces()
npf_id = None
source_mac = None
for interface in windows_interfaces:
# Winpcap/Npcap uses a NPF ID to identify an interface on Windows
if "netcard" in interface and ethernet_interface in interface["netcard"]:
npf_id = interface["id"]
source_mac = interface["mac_address"]
elif ethernet_interface in interface["name"]:
npf_id = interface["id"]
source_mac = interface["mac_address"]
if npf_id:
await self._ubridge_send(
'bridge add_nio_ethernet {name} "{interface}"'.format(name=bridge_name, interface=npf_id)
)
else:
raise NodeError(f"Could not find NPF id for interface {ethernet_interface}")
if block_host_traffic:
if source_mac:
await self._ubridge_send(
'bridge set_pcap_filter {name} "not ether src {mac}"'.format(name=bridge_name, mac=source_mac)
)
log.info(f"PCAP filter applied on '{ethernet_interface}' for source MAC {source_mac}")
else:
log.warning(f"Could not block host network traffic on {ethernet_interface} (no MAC address found)")
else:
# on other platforms we just rely on the pcap library
await self._ubridge_send(

View File

@ -310,10 +310,6 @@ class Cloud(BaseNode):
"uBridge requires root access or the capability to interact with Ethernet and TAP adapters"
)
if sys.platform.startswith("win"):
await self._add_ubridge_ethernet_connection(bridge_name, port_info["interface"])
else:
if port_info["type"] == "ethernet":
network_interfaces = [interface["name"] for interface in self._interfaces()]
if not port_info["interface"] in network_interfaces:

View File

@ -446,6 +446,12 @@ class DockerVM(BaseNode):
continue
if not e.startswith("GNS3_"):
formatted = self._format_env(variables, e)
vm_name = self._name.replace(",", ",,")
project_path = self.project.path.replace(",", ",,")
formatted = formatted.replace("%vm-name%", '"' + vm_name.replace('"', '\\"') + '"')
formatted = formatted.replace("%vm-id%", self._id)
formatted = formatted.replace("%project-id%", self.project.id)
formatted = formatted.replace("%project-path%", '"' + project_path.replace('"', '\\"') + '"')
params["Env"].append(formatted)
if self._console_type == "vnc":

View File

@ -348,23 +348,10 @@ class Dynamips(BaseManager):
nio.suspend = nio_settings.get("suspend", False)
elif nio_settings["type"] == "nio_generic_ethernet":
ethernet_device = nio_settings["ethernet_device"]
if sys.platform.startswith("win"):
# replace the interface name by the GUID on Windows
windows_interfaces = interfaces()
npf_interface = None
for interface in windows_interfaces:
if interface["name"] == ethernet_device:
npf_interface = interface["id"]
if not npf_interface:
raise DynamipsError(f"Could not find interface {ethernet_device} on this host")
else:
ethernet_device = npf_interface
if not is_interface_up(ethernet_device):
raise DynamipsError(f"Ethernet interface {ethernet_device} is down")
nio = NIOGenericEthernet(node.hypervisor, ethernet_device)
elif nio_settings["type"] == "nio_linux_ethernet":
if sys.platform.startswith("win"):
raise DynamipsError("This NIO type is not supported on Windows")
ethernet_device = nio_settings["ethernet_device"]
nio = NIOLinuxEthernet(node.hypervisor, ethernet_device)
elif nio_settings["type"] == "nio_tap":
@ -564,7 +551,6 @@ class Dynamips(BaseManager):
await vm.set_idlepc("0x0")
was_auto_started = False
old_priority = None
try:
status = await vm.get_status()
if status != "running":
@ -576,8 +562,6 @@ class Dynamips(BaseManager):
if not idlepcs:
raise DynamipsError("No Idle-PC values found")
if sys.platform.startswith("win"):
old_priority = vm.set_process_priority_windows(vm.hypervisor.process.pid)
for idlepc in idlepcs:
match = re.search(r"^0x[0-9a-f]{8}$", idlepc.split()[0])
if not match:
@ -606,8 +590,6 @@ class Dynamips(BaseManager):
except DynamipsError:
raise
finally:
if old_priority is not None:
vm.set_process_priority_windows(vm.hypervisor.process.pid, old_priority)
if was_auto_started:
await vm.stop()
return validated_idlepc

View File

@ -118,11 +118,6 @@ class Hypervisor(DynamipsHypervisor):
self._command = self._build_command()
env = os.environ.copy()
if sys.platform.startswith("win"):
# add the Npcap directory to $PATH to force Dynamips to use npcap DLL instead of Winpcap (if installed)
system_root = os.path.join(os.path.expandvars("%SystemRoot%"), "System32", "Npcap")
if os.path.isdir(system_root):
env["PATH"] = system_root + ";" + env["PATH"]
try:
log.info(f"Starting Dynamips: {self._command}")
self._stdout_file = os.path.join(self.working_dir, f"dynamips_i{self._id}_stdout.txt")

View File

@ -103,10 +103,7 @@ class Router(BaseNode):
self._idlesleep = 30
self._ghost_file = ""
self._ghost_status = 0
if sys.platform.startswith("win"):
self._exec_area = 16 # 16 MB by default on Windows (Cygwin)
else:
self._exec_area = 64 # 64 MB on other systems
self._exec_area = 64
self._disk0 = 0 # Megabytes
self._disk1 = 0 # Megabytes
self._auto_delete_disks = False
@ -711,29 +708,6 @@ class Router(BaseNode):
log.info(f'Router "{self._name}" [{self._id}]: idle-PC set to {idlepc}')
self._idlepc = idlepc
def set_process_priority_windows(self, pid, priority=None):
"""
Sets process priority on Windows
:param pid: process PID
"""
import win32api
import win32process
import win32con
import pywintypes
old_priority = None
try:
handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, True, pid)
old_priority = win32process.GetPriorityClass(handle)
if priority is None:
priority = win32process.BELOW_NORMAL_PRIORITY_CLASS
win32process.SetPriorityClass(handle, priority)
except pywintypes.error as e:
log.error(f"Cannot set priority for Dynamips process (PID={pid}) ")
return old_priority
async def get_idle_pc_prop(self):
"""
Gets the idle PC proposals.
@ -751,13 +725,8 @@ class Router(BaseNode):
await asyncio.sleep(20) # leave time to the router to boot
log.info(f'Router "{self._name}" [{self._id}] has started calculating Idle-PC values')
old_priority = None
if sys.platform.startswith("win"):
old_priority = self.set_process_priority_windows(self._hypervisor.process.pid)
begin = time.time()
idlepcs = await self._hypervisor.send(f'vm get_idle_pc_prop "{self._name}" 0')
if old_priority is not None:
self.set_process_priority_windows(self._hypervisor.process.pid, old_priority)
log.info(
'Router "{name}" [{id}] has finished calculating Idle-PC values after {time:.4f} seconds'.format(
name=self._name, id=self._id, time=time.time() - begin

View File

@ -114,21 +114,7 @@ class Qemu(BaseManager):
else:
log.warning("The PATH environment variable doesn't exist")
# look for Qemu binaries in the current working directory and $PATH
if sys.platform.startswith("win"):
# add specific Windows paths
if hasattr(sys, "frozen"):
# add any qemu dir in the same location as gns3server.exe to the list of paths
exec_dir = os.path.dirname(os.path.abspath(sys.executable))
for f in os.listdir(exec_dir):
if f.lower().startswith("qemu"):
paths.add(os.path.join(exec_dir, f))
if "PROGRAMFILES(X86)" in os.environ and os.path.exists(os.environ["PROGRAMFILES(X86)"]):
paths.add(os.path.join(os.environ["PROGRAMFILES(X86)"], "qemu"))
if "PROGRAMFILES" in os.environ and os.path.exists(os.environ["PROGRAMFILES"]):
paths.add(os.path.join(os.environ["PROGRAMFILES"], "qemu"))
elif sys.platform.startswith("darwin"):
if hasattr(sys, "frozen"):
if sys.platform.startswith("darwin") and hasattr(sys, "frozen"):
# add specific locations on Mac OS X regardless of what's in $PATH
paths.update(["/usr/bin", "/usr/local/bin", "/opt/local/bin"])
try:
@ -205,21 +191,6 @@ class Qemu(BaseManager):
:param qemu_path: path to Qemu executable.
"""
if sys.platform.startswith("win"):
# Qemu on Windows doesn't return anything with parameter -version
# look for a version number in version.txt file in the same directory instead
version_file = os.path.join(os.path.dirname(qemu_path), "version.txt")
if os.path.isfile(version_file):
try:
with open(version_file, "rb") as file:
version = file.read().decode("utf-8").strip()
match = re.search(r"[0-9\.]+", version)
if match:
return version
except (UnicodeDecodeError, OSError) as e:
log.warning(f"could not read {version_file}: {e}")
return ""
else:
try:
output = await subprocess_check_output(qemu_path, "-version", "-nographic")
match = re.search(r"version\s+([0-9a-z\-\.]+)", output)
@ -250,38 +221,6 @@ class Qemu(BaseManager):
except (OSError, subprocess.SubprocessError) as e:
raise QemuError(f"Error while looking for the Qemu-img version: {e}")
@staticmethod
def get_haxm_windows_version():
"""
Gets the HAXM version number (Windows).
:returns: HAXM version number. Returns None if HAXM is not installed.
"""
assert sys.platform.startswith("win")
import winreg
hkey = winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\Installer\UserData\S-1-5-18\Products"
)
version = None
for index in range(winreg.QueryInfoKey(hkey)[0]):
product_id = winreg.EnumKey(hkey, index)
try:
product_key = winreg.OpenKey(hkey, fr"{product_id}\InstallProperties")
try:
if winreg.QueryValueEx(product_key, "DisplayName")[0].endswith(
"Hardware Accelerated Execution Manager"
):
version = winreg.QueryValueEx(product_key, "DisplayVersion")[0]
break
finally:
winreg.CloseKey(product_key)
except OSError:
continue
winreg.CloseKey(hkey)
return version
@staticmethod
def get_legacy_vm_workdir(legacy_vm_id, name):
"""

View File

@ -32,8 +32,9 @@ import gns3server
import subprocess
import time
import json
import shlex
from gns3server.utils import parse_version, shlex_quote
from gns3server.utils import parse_version
from gns3server.utils.asyncio import subprocess_check_output, cancellable_wait_run_in_executor
from .qemu_error import QemuError
from .utils.qcow2 import Qcow2, Qcow2Error
@ -220,8 +221,6 @@ class QemuVM(BaseNode):
"""
if qemu_path and os.pathsep not in qemu_path:
if sys.platform.startswith("win") and ".exe" not in qemu_path.lower():
qemu_path += "w.exe"
new_qemu_path = shutil.which(qemu_path, path=os.pathsep.join(self._manager.paths_list()))
if new_qemu_path is None:
raise QemuError(f"QEMU binary path {qemu_path} is not found in the path")
@ -271,9 +270,6 @@ class QemuVM(BaseNode):
def platform(self, platform):
self._platform = platform
if sys.platform.startswith("win"):
self.qemu_path = f"qemu-system-{platform}w.exe"
else:
self.qemu_path = f"qemu-system-{platform}"
def _disk_setter(self, variable, value):
@ -289,7 +285,7 @@ class QemuVM(BaseNode):
for node in self.manager.nodes:
if node != self and getattr(node, variable) == value:
raise QemuError(
f"Sorry a node without the linked base setting enabled can only be used once on your server. {value} is already used by {node.name}"
f"Sorry a node without the linked base setting enabled can only be used once on your server. {value} is already used by {node.name} in project {node.project.name}"
)
setattr(self, "_" + variable, value)
log.info(
@ -901,8 +897,8 @@ class QemuVM(BaseNode):
options = options.replace("-enable-kvm", "-machine accel=kvm")
if "-enable-hax" in options:
if not sys.platform.startswith("win"):
# HAXM is only available on Windows
if not sys.platform.startswith("darwin"):
# HAXM is only available on macOS
options = options.replace("-enable-hax", "")
else:
options = options.replace("-enable-hax", "-machine accel=hax")
@ -1002,33 +998,6 @@ class QemuVM(BaseNode):
if self._process_priority == "normal":
return
if sys.platform.startswith("win"):
try:
import win32api
import win32con
import win32process
except ImportError:
log.error(f"pywin32 must be installed to change the priority class for QEMU VM {self._name}")
else:
log.info(f"Setting QEMU VM {self._name} priority class to {self._process_priority}")
handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, 0, self._process.pid)
if self._process_priority == "realtime":
priority = win32process.REALTIME_PRIORITY_CLASS
elif self._process_priority == "very high":
priority = win32process.HIGH_PRIORITY_CLASS
elif self._process_priority == "high":
priority = win32process.ABOVE_NORMAL_PRIORITY_CLASS
elif self._process_priority == "low":
priority = win32process.BELOW_NORMAL_PRIORITY_CLASS
elif self._process_priority == "very low":
priority = win32process.IDLE_PRIORITY_CLASS
else:
priority = win32process.NORMAL_PRIORITY_CLASS
try:
win32process.SetPriorityClass(handle, priority)
except win32process.error as e:
log.error(f'Could not change process priority for QEMU VM "{self._name}": {e}')
else:
if self._process_priority == "realtime":
priority = -20
elif self._process_priority == "very high":
@ -1070,14 +1039,8 @@ class QemuVM(BaseNode):
return
try:
if sys.platform.startswith("win") and hasattr(sys, "frozen"):
cpulimit_exec = os.path.join(
os.path.dirname(os.path.abspath(sys.executable)), "cpulimit", "cpulimit.exe"
)
else:
cpulimit_exec = "cpulimit"
subprocess.Popen(
[cpulimit_exec, "--lazy", f"--pid={self._process.pid}", f"--limit={self._cpu_throttling}"],
["cpulimit", "--lazy", f"--pid={self._process.pid}", f"--limit={self._cpu_throttling}"],
cwd=self.working_dir,
)
log.info(f"CPU throttled to {self._cpu_throttling}%")
@ -1133,7 +1096,7 @@ class QemuVM(BaseNode):
self.check_available_ram(self.ram)
command = await self._build_command()
command_string = " ".join(shlex_quote(s) for s in command)
command_string = " ".join(shlex.quote(s) for s in command)
try:
log.info(f"Starting QEMU with: {command_string}")
self._stdout_file = os.path.join(self.working_dir, "qemu.log")
@ -1193,8 +1156,7 @@ class QemuVM(BaseNode):
if self.started:
log.info("QEMU process has stopped, return code: %d", returncode)
await self.stop()
# A return code of 1 seem fine on Windows
if returncode != 0 and (not sys.platform.startswith("win") or returncode != 1):
if returncode != 0:
self.project.emit(
"log.error",
{"message": f"QEMU process has stopped, return code: {returncode}\n{self.read_stdout()}"},
@ -1822,7 +1784,7 @@ class QemuVM(BaseNode):
self._qemu_img_stdout_file = os.path.join(self.working_dir, "qemu-img.log")
log.info(f"logging to {self._qemu_img_stdout_file}")
command_string = " ".join(shlex_quote(s) for s in command)
command_string = " ".join(shlex.quote(s) for s in command)
log.info(f"Executing qemu-img with: {command_string}")
with open(self._qemu_img_stdout_file, "w", encoding="utf-8") as fd:
process = await asyncio.create_subprocess_exec(
@ -2272,14 +2234,6 @@ class QemuVM(BaseNode):
require_hardware_accel = self.manager.config.settings.Qemu.require_hardware_acceleration
if enable_hardware_accel and "-machine accel=tcg" not in options:
# Turn OFF hardware acceleration for non x86 architectures
if sys.platform.startswith("win"):
supported_binaries = [
"qemu-system-x86_64.exe",
"qemu-system-x86_64w.exe",
"qemu-system-i386.exe",
"qemu-system-i386w.exe",
]
else:
supported_binaries = ["qemu-system-x86_64", "qemu-system-i386", "qemu-kvm"]
if os.path.basename(qemu_path) not in supported_binaries:
if require_hardware_accel:
@ -2298,29 +2252,6 @@ class QemuVM(BaseNode):
)
else:
return False
elif sys.platform.startswith("win"):
if require_hardware_accel:
# HAXM is only available starting with Qemu version 2.9.0
version = await self.manager.get_qemu_version(self.qemu_path)
if version and parse_version(version) < parse_version("2.9.0"):
raise QemuError(
f"HAXM acceleration can only be enable for Qemu version 2.9.0 and above (current version: {version})"
)
# check if HAXM is installed
version = self.manager.get_haxm_windows_version()
if version is None:
raise QemuError("HAXM acceleration support is not installed on this host")
log.info(f"HAXM support version {version} detected")
# check if the HAXM service is running
from gns3server.utils.windows_service import check_windows_service_is_running
if not check_windows_service_is_running("intelhaxm"):
raise QemuError("Intel HAXM service is not running on this host")
else:
return False
elif sys.platform.startswith("darwin"):
process = await asyncio.create_subprocess_shell("kextstat | grep com.intel.kext.intelhaxm")
await process.wait()
@ -2440,7 +2371,7 @@ class QemuVM(BaseNode):
# https://github.com/GNS3/gns3-server/issues/685
if version and parse_version(version) >= parse_version("2.4.0") and self.platform == "x86_64":
command.extend(["-machine", "smm=off"])
elif sys.platform.startswith("win") or sys.platform.startswith("darwin"):
elif sys.platform.startswith("darwin"):
command.extend(["-enable-hax"])
command.extend(["-boot", f"order={self._boot_priority}"])
command.extend(self._bios_option())

View File

@ -27,7 +27,6 @@ import re
from gns3server.utils import parse_version
from gns3server.utils.asyncio import wait_for_process_termination
from gns3server.utils.asyncio import monitor_process
from gns3server.utils.asyncio import subprocess_check_output
from .ubridge_hypervisor import UBridgeHypervisor
from .ubridge_error import UbridgeError
@ -139,7 +138,7 @@ class Hypervisor(UBridgeHypervisor):
match = re.search(r"ubridge version ([0-9a-z\.]+)", output)
if match:
self._version = match.group(1)
if sys.platform.startswith("win") or sys.platform.startswith("darwin"):
if sys.platform.startswith("darwin"):
minimum_required_version = "0.9.12"
else:
# uBridge version 0.9.14 is required for packet filters
@ -158,11 +157,6 @@ class Hypervisor(UBridgeHypervisor):
"""
env = os.environ.copy()
if sys.platform.startswith("win"):
# add the Npcap directory to $PATH to force uBridge to use npcap DLL instead of Winpcap (if installed)
system_root = os.path.join(os.path.expandvars("%SystemRoot%"), "System32", "Npcap")
if os.path.isdir(system_root):
env["PATH"] = system_root + ";" + env["PATH"]
await self._check_ubridge_version(env)
try:
command = self._build_command()

View File

@ -62,16 +62,7 @@ class VirtualBox(BaseManager):
vboxmanage_path = shutil.which(vboxmanage_path)
else:
log.info("A path to VBoxManage has not been configured, trying to find it...")
if sys.platform.startswith("win"):
if "VBOX_INSTALL_PATH" in os.environ:
vboxmanage_path_windows = os.path.join(os.environ["VBOX_INSTALL_PATH"], "VBoxManage.exe")
if os.path.exists(vboxmanage_path_windows):
vboxmanage_path = vboxmanage_path_windows
elif "VBOX_MSI_INSTALL_PATH" in os.environ:
vboxmanage_path_windows = os.path.join(os.environ["VBOX_MSI_INSTALL_PATH"], "VBoxManage.exe")
if os.path.exists(vboxmanage_path_windows):
vboxmanage_path = vboxmanage_path_windows
elif sys.platform.startswith("darwin"):
if sys.platform.startswith("darwin"):
vboxmanage_path_osx = "/Applications/VirtualBox.app/Contents/MacOS/VBoxManage"
if os.path.exists(vboxmanage_path_osx):
vboxmanage_path = vboxmanage_path_osx

View File

@ -38,10 +38,6 @@ from gns3server.compute.nios.nio_udp import NIOUDP
from gns3server.compute.adapters.ethernet_adapter import EthernetAdapter
from gns3server.compute.base_node import BaseNode
if sys.platform.startswith("win"):
import msvcrt
import win32file
import logging
log = logging.getLogger(__name__)
@ -839,9 +835,6 @@ class VirtualBoxVM(BaseNode):
:returns: pipe path (string)
"""
if sys.platform.startswith("win"):
pipe_name = fr"\\.\pipe\gns3_vbox\{self.id}"
else:
pipe_name = os.path.join(tempfile.gettempdir(), "gns3_vbox", f"{self.id}")
try:
os.makedirs(os.path.dirname(pipe_name), exist_ok=True)

View File

@ -27,11 +27,12 @@ import subprocess
import logging
import codecs
import ipaddress
import shlex
from collections import OrderedDict
from gns3server.utils.interfaces import interfaces
from gns3server.utils.asyncio import subprocess_check_output
from gns3server.utils import parse_version, shlex_quote
from gns3server.utils import parse_version
log = logging.getLogger(__name__)
@ -53,9 +54,6 @@ class VMware(BaseManager):
self._vmnets = []
self._vmnets_info = {}
self._vmnet_start_range = 2
if sys.platform.startswith("win"):
self._vmnet_end_range = 19
else:
self._vmnet_end_range = 255
@property
@ -95,15 +93,7 @@ class VMware(BaseManager):
# look for vmrun
vmrun_path = self.config.settings.VMware.vmrun_path
if not vmrun_path:
if sys.platform.startswith("win"):
vmrun_path = shutil.which("vmrun")
if vmrun_path is None:
# look for vmrun.exe using the VMware Workstation directory listed in the registry
vmrun_path = self._find_vmrun_registry(r"SOFTWARE\Wow6432Node\VMware, Inc.\VMware Workstation")
if vmrun_path is None:
# look for vmrun.exe using the VIX directory listed in the registry
vmrun_path = self._find_vmrun_registry(r"SOFTWARE\Wow6432Node\VMware, Inc.\VMware VIX")
elif sys.platform.startswith("darwin"):
if sys.platform.startswith("darwin"):
vmrun_path = "/Applications/VMware Fusion.app/Contents/Library/vmrun"
else:
vmrun_path = "vmrun"
@ -197,21 +187,6 @@ class VMware(BaseManager):
Check VMware version
"""
if sys.platform.startswith("win"):
# look for vmrun.exe using the directory listed in the registry
ws_version = self._find_vmware_version_registry(r"SOFTWARE\Wow6432Node\VMware, Inc.\VMware Workstation")
if ws_version is None:
player_version = self._find_vmware_version_registry(r"SOFTWARE\Wow6432Node\VMware, Inc.\VMware Player")
if player_version:
log.debug(f"VMware Player version {player_version} detected")
await self._check_vmware_player_requirements(player_version)
else:
log.warning("Could not find VMware version")
self._host_type = "ws"
else:
log.debug(f"VMware Workstation version {ws_version} detected")
await self._check_vmware_workstation_requirements(ws_version)
else:
if sys.platform.startswith("darwin"):
if not os.path.isdir("/Applications/VMware Fusion.app"):
raise VMwareError(
@ -246,35 +221,10 @@ class VMware(BaseManager):
log.error(f"Error while looking for the VMware version: {e}")
raise VMwareError(f"Error while looking for the VMware version: {e}")
@staticmethod
def _get_vmnet_interfaces_registry():
import winreg
vmnet_interfaces = []
regkey = r"SOFTWARE\Wow6432Node\VMware, Inc.\VMnetLib\VMnetConfig"
try:
hkey = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, regkey)
for index in range(winreg.QueryInfoKey(hkey)[0]):
vmnet = winreg.EnumKey(hkey, index)
hkeyvmnet = winreg.OpenKey(hkey, vmnet)
if winreg.QueryInfoKey(hkeyvmnet)[1]:
# the vmnet has not been configure if the key has no values
vmnet = vmnet.replace("vm", "VM")
if vmnet not in ("VMnet0", "VMnet1", "VMnet8"):
vmnet_interfaces.append(vmnet)
winreg.CloseKey(hkeyvmnet)
winreg.CloseKey(hkey)
except OSError as e:
raise VMwareError(f"Could not read registry key {regkey}: {e}")
return vmnet_interfaces
@staticmethod
def _get_vmnet_interfaces():
if sys.platform.startswith("win"):
return VMware._get_vmnet_interfaces_registry()
elif sys.platform.startswith("darwin"):
if sys.platform.startswith("darwin"):
vmware_networking_file = "/Library/Preferences/VMware Fusion/networking"
else:
# location on Linux
@ -310,17 +260,7 @@ class VMware(BaseManager):
vmnet_interfaces = []
for interface in interfaces():
if sys.platform.startswith("win"):
if "netcard" in interface:
windows_name = interface["netcard"]
else:
windows_name = interface["name"]
match = re.search(r"(VMnet[0-9]+)", windows_name)
if match:
vmnet = match.group(1)
if vmnet not in ("VMnet0", "VMnet1", "VMnet8"):
vmnet_interfaces.append(vmnet)
elif interface["name"].startswith("vmnet"):
if interface["name"].startswith("vmnet"):
vmnet = interface["name"]
if vmnet not in ("vmnet0", "vmnet1", "vmnet8"):
vmnet_interfaces.append(interface["name"])
@ -428,7 +368,7 @@ class VMware(BaseManager):
command = [vmrun_path, "-T", self.host_type, subcommand]
command.extend(args)
command_string = " ".join([shlex_quote(c) for c in command])
command_string = " ".join([shlex.quote(c) for c in command])
log.log(log_level, f"Executing vmrun with command: {command_string}")
try:
process = await asyncio.create_subprocess_exec(
@ -677,9 +617,7 @@ class VMware(BaseManager):
:returns: path to the inventory file
"""
if sys.platform.startswith("win"):
return os.path.expandvars(r"%APPDATA%\Vmware\Inventory.vmls")
elif sys.platform.startswith("darwin"):
if sys.platform.startswith("darwin"):
return os.path.expanduser("~/Library/Application Support/VMware Fusion/vmInventory")
else:
return os.path.expanduser("~/.vmware/inventory.vmls")
@ -692,9 +630,7 @@ class VMware(BaseManager):
:returns: path to the preferences file
"""
if sys.platform.startswith("win"):
return os.path.expandvars(r"%APPDATA%\VMware\preferences.ini")
elif sys.platform.startswith("darwin"):
if sys.platform.startswith("darwin"):
return os.path.expanduser("~/Library/Preferences/VMware Fusion/preferences")
else:
return os.path.expanduser("~/.vmware/preferences")
@ -707,15 +643,7 @@ class VMware(BaseManager):
:returns: path to the default VM directory
"""
if sys.platform.startswith("win"):
import ctypes
import ctypes.wintypes
path = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
ctypes.windll.shell32.SHGetFolderPathW(None, 5, None, 0, path)
documents_folder = path.value
return [fr"{documents_folder}\My Virtual Machines", fr"{documents_folder}\Virtual Machines"]
elif sys.platform.startswith("darwin"):
if sys.platform.startswith("darwin"):
return [os.path.expanduser("~/Documents/Virtual Machines.localized")]
else:
return [os.path.expanduser("~/vmware")]

View File

@ -882,9 +882,6 @@ class VMwareVM(BaseNode):
:returns: pipe path (string)
"""
if sys.platform.startswith("win"):
pipe_name = fr"\\.\pipe\gns3_vmware\{self.id}"
else:
pipe_name = os.path.join(tempfile.gettempdir(), "gns3_vmware", f"{self.id}")
try:
os.makedirs(os.path.dirname(pipe_name), exist_ok=True)

View File

@ -63,37 +63,6 @@ class Config:
appname = "GNS3"
version = f"{__version_info__[0]}.{__version_info__[1]}"
if sys.platform.startswith("win"):
# On windows, the configuration file location can be one of the following:
# 1: %APPDATA%/GNS3/gns3_server.ini
# 2: %APPDATA%/GNS3.ini
# 3: %COMMON_APPDATA%/GNS3/gns3_server.ini
# 4: %COMMON_APPDATA%/GNS3.ini
# 5: server.ini in the current working directory
appdata = os.path.expandvars("%APPDATA%")
common_appdata = os.path.expandvars("%COMMON_APPDATA%")
if self._profile:
legacy_user_dir = os.path.join(appdata, appname, "profiles", self._profile)
versioned_user_dir = os.path.join(appdata, appname, version, "profiles", self._profile)
else:
legacy_user_dir = os.path.join(appdata, appname)
versioned_user_dir = os.path.join(appdata, appname, version)
server_filename = "gns3_server.ini"
if self._files is None and not hasattr(sys, "_called_from_test"):
self._files = [
os.path.join(os.getcwd(), server_filename),
os.path.join(versioned_user_dir, server_filename),
os.path.join(appdata, appname + ".ini"),
os.path.join(common_appdata, appname, server_filename),
os.path.join(common_appdata, appname + ".ini"),
]
else:
# On UNIX-like platforms, the configuration file location can be one of the following:
# 1: $HOME/.config/GNS3/gns3_server.conf
# 2: $HOME/.config/GNS3.conf
@ -182,9 +151,6 @@ class Config:
Return the server configuration file path.
"""
if sys.platform.startswith("win"):
server_config_filename = "gns3_server.ini"
else:
server_config_filename = "gns3_server.conf"
return os.path.join(self.config_dir, server_config_filename)

View File

@ -77,9 +77,6 @@ class Controller:
self._load_controller_settings()
if server_config.enable_ssl:
if sys.platform.startswith("win"):
log.critical("SSL mode is not supported on Windows")
raise SystemExit
self._ssl_context = self._create_ssl_context(server_config)
protocol = server_config.protocol

View File

@ -123,7 +123,7 @@ class ApplianceManager:
async with HTTPClient.get(image_url) as response:
if response.status != 200:
raise ControllerError(f"Could not download '{image_name}' due to HTTP error code {response.status}")
await write_image(image_name, image_type, image_path, response.content.iter_any(), images_repo)
await write_image(image_name, image_path, response.content.iter_any(), images_repo)
except (OSError, InvalidImageError) as e:
raise ControllerError(f"Could not save {image_type} image '{image_path}': {e}")
except ClientError as e:
@ -156,7 +156,7 @@ class ApplianceManager:
image_path = os.path.join(image_dir, appliance_file)
if os.path.exists(image_path) and await wait_run_in_executor(md5sum, image_path) == image_checksum:
async with aiofiles.open(image_path, "rb") as f:
await write_image(appliance_file, appliance.type, image_path, f, images_repo)
await write_image(appliance_file, image_path, f, images_repo)
else:
# download the image if there is a direct download URL
direct_download_url = image.get("direct_download_url")
@ -217,7 +217,7 @@ class ApplianceManager:
try:
schemas.Appliance.parse_obj(appliance.asdict())
except ValidationError as e:
log.warning(message=f"Could not validate appliance '{appliance.id}': {e}")
log.warning(f"Could not validate appliance '{appliance.id}': {e}")
if appliance.versions:
for version in appliance.versions:
if version.get("name") == image_version:

View File

@ -146,9 +146,6 @@ def _patch_mtime(path):
:param path: file path
"""
if sys.platform.startswith("win"):
# only UNIX type platforms
return
st = os.stat(path)
file_date = datetime.fromtimestamp(st.st_mtime)
if file_date.year < 1980:

View File

@ -243,17 +243,6 @@ class VirtualBoxGNS3VM(BaseGNS3VM):
)
if not (await self._check_vboxnet_exists(vboxnet)):
if sys.platform.startswith("win") and vboxnet == "vboxnet0":
# The GNS3 VM is configured with vboxnet0 by default which is not available
# on Windows. Try to patch this with the first available vboxnet we find.
first_available_vboxnet = await self._find_first_available_vboxnet()
if first_available_vboxnet is None:
raise GNS3VMError(
f'Please add a VirtualBox host-only network with DHCP enabled and attached it to network adapter {hostonly_interface_number} for "{self._vmname}"'
)
await self.set_hostonly_network(hostonly_interface_number, first_available_vboxnet)
vboxnet = first_available_vboxnet
else:
raise GNS3VMError(
'VirtualBox host-only network "{}" does not exist, please make the sure the network adapter {} configuration is valid for "{}"'.format(
vboxnet, hostonly_interface_number, self._vmname

View File

@ -42,16 +42,6 @@ def create_startup_handler(app: FastAPI) -> Callable:
logger = logging.getLogger("asyncio")
logger.setLevel(logging.ERROR)
if sys.platform.startswith("win"):
# Add a periodic callback to give a chance to process signals on Windows
# because asyncio.add_signal_handler() is not supported yet on that platform
# otherwise the loop runs outside of signal module's ability to trap signals.
def wakeup():
loop.call_later(0.5, wakeup)
loop.call_later(0.5, wakeup)
if log.getEffectiveLevel() == logging.DEBUG:
# On debug version we enable info that
# coroutine is not called in a way await/await

View File

@ -59,7 +59,7 @@ class CrashReport:
Report crash to a third party service
"""
DSN = "https://8eb8a1f4730949f9886df2c6fdc27755:795f8ac399d04d24a273fcb35f48b725@o19455.ingest.sentry.io/38482"
DSN = "https://8f474628c1e44d0799140ccf05c486b8:f952ab1783d3427188fd81cc37da323c@o19455.ingest.sentry.io/38482"
_instance = None
def __init__(self):

View File

@ -1,135 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import psutil
import platform
from gns3server.web.route import Route
from gns3server.config import Config
from gns3server.schemas.version import VERSION_SCHEMA
from gns3server.schemas.server_statistics import SERVER_STATISTICS_SCHEMA
from gns3server.compute.port_manager import PortManager
from gns3server.utils.cpu_percent import CpuPercent
from gns3server.utils.path import get_default_project_directory
from gns3server.version import __version__
from aiohttp.web import HTTPConflict
class ServerHandler:
@Route.get(
r"/version",
description="Retrieve the server version number",
output=VERSION_SCHEMA)
def version(request, response):
config = Config.instance()
local_server = config.get_section_config("Server").getboolean("local", False)
response.json({"version": __version__, "local": local_server})
@Route.get(
r"/statistics",
description="Retrieve server statistics",
output=SERVER_STATISTICS_SCHEMA,
status_codes={
200: "Statistics information returned",
409: "Conflict"
})
def statistics(request, response):
try:
memory_total = psutil.virtual_memory().total
memory_free = psutil.virtual_memory().available
memory_used = memory_total - memory_free # actual memory usage in a cross platform fashion
swap_total = psutil.swap_memory().total
swap_free = psutil.swap_memory().free
swap_used = psutil.swap_memory().used
cpu_percent = int(CpuPercent.get())
load_average_percent = [int(x / psutil.cpu_count() * 100) for x in psutil.getloadavg()]
memory_percent = int(psutil.virtual_memory().percent)
swap_percent = int(psutil.swap_memory().percent)
disk_usage_percent = int(psutil.disk_usage(get_default_project_directory()).percent)
except psutil.Error as e:
raise HTTPConflict(text="Psutil error detected: {}".format(e))
response.json({"memory_total": memory_total,
"memory_free": memory_free,
"memory_used": memory_used,
"swap_total": swap_total,
"swap_free": swap_free,
"swap_used": swap_used,
"cpu_usage_percent": cpu_percent,
"memory_usage_percent": memory_percent,
"swap_usage_percent": swap_percent,
"disk_usage_percent": disk_usage_percent,
"load_average_percent": load_average_percent})
@Route.get(
r"/debug",
description="Return debug information about the compute",
status_codes={
201: "Written"
})
def debug(request, response):
response.content_type = "text/plain"
response.text = ServerHandler._getDebugData()
@staticmethod
def _getDebugData():
try:
addrs = ["* {}: {}".format(key, val) for key, val in psutil.net_if_addrs().items()]
except UnicodeDecodeError:
addrs = ["INVALID ADDR WITH UNICODE CHARACTERS"]
data = """Version: {version}
OS: {os}
Python: {python}
CPU: {cpu}
Memory: {memory}
Networks:
{addrs}
""".format(
version=__version__,
os=platform.platform(),
python=platform.python_version(),
memory=psutil.virtual_memory(),
cpu=psutil.cpu_times(),
addrs="\n".join(addrs)
)
try:
connections = psutil.net_connections()
# You need to be root for OSX
except psutil.AccessDenied:
connections = None
if connections:
data += "\n\nConnections:\n"
for port in PortManager.instance().tcp_ports:
found = False
for open_port in connections:
if open_port.laddr[1] == port:
found = True
data += "TCP {}: {}\n".format(port, found)
for port in PortManager.instance().udp_ports:
found = False
for open_port in connections:
if open_port.laddr[1] == port:
found = True
data += "UDP {}: {}\n".format(port, found)
return data

View File

@ -38,7 +38,7 @@ class ColouredFormatter(logging.Formatter):
message = super().format(record)
if not colour or sys.platform.startswith("win"):
if not colour:
return message.replace("#RESET#", "")
level_no = record.levelno
@ -150,11 +150,6 @@ def init_logger(level, logfile=None, max_bytes=10000000, backup_count=10, compre
stream_handler.formatter = ColouredFormatter(
"{asctime} {levelname} {filename}:{lineno} {message}", "%Y-%m-%d %H:%M:%S", "{"
)
elif sys.platform.startswith("win"):
stream_handler = WinStreamHandler(sys.stdout)
stream_handler.formatter = ColouredFormatter(
"{asctime} {levelname} {filename}:{lineno} {message}", "%Y-%m-%d %H:%M:%S", "{"
)
else:
stream_handler = ColouredStreamHandler(sys.stdout)
stream_handler.formatter = ColouredFormatter(

View File

@ -29,19 +29,6 @@ import gns3server.utils.get_resource
import os
import sys
import types
# To avoid strange bug later we switch the event loop before any other operation
if sys.platform.startswith("win"):
import asyncio
# use the Proactor event loop on Windows
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
if sys.platform.startswith("win"):
sys.modules["termios"] = types.ModuleType("termios")
def daemonize():
@ -77,7 +64,8 @@ def main():
Entry point for GNS3 server
"""
if not sys.platform.startswith("win"):
if sys.platform.startswith("win"):
raise SystemExit("Windows is not a supported platform to run the GNS3 server")
if "--daemon" in sys.argv:
daemonize()
from gns3server.server import Server

View File

@ -32,7 +32,8 @@ class ImageBase(BaseModel):
Common image properties.
"""
filename: str = Field(..., description="Image name")
filename: str = Field(..., description="Image filename")
path: str = Field(..., description="Image path")
image_type: ImageType = Field(..., description="Image type")
image_size: int = Field(..., description="Image size in bytes")
checksum: str = Field(..., description="Checksum value")

View File

@ -64,8 +64,8 @@ class Server:
or there: http://robjwells.com/post/61198832297/get-your-us-ascii-out-of-my-face
"""
# no need to check on Windows or when this application is frozen
if sys.platform.startswith("win") or hasattr(sys, "frozen"):
# no need to check when this application is frozen
if hasattr(sys, "frozen"):
return
language = encoding = None
@ -185,18 +185,9 @@ class Server:
except asyncio.CancelledError:
pass
signals = [] # SIGINT and SIGTERM are already registered by uvicorn
if sys.platform.startswith("win"):
signals.extend(["SIGBREAK"])
else:
signals.extend(["SIGHUP", "SIGQUIT"])
signals = ["SIGHUP", "SIGQUIT"] # SIGINT and SIGTERM are already registered by uvicorn
for signal_name in signals:
callback = functools.partial(signal_handler, signal_name)
if sys.platform.startswith("win"):
# add_signal_handler() is not yet supported on Windows
signal.signal(getattr(signal, signal_name), callback)
else:
loop = asyncio.get_event_loop()
loop.add_signal_handler(getattr(signal, signal_name), callback)
@ -250,10 +241,6 @@ class Server:
args = self._parse_arguments(sys.argv[1:])
if args.daemon and sys.platform.startswith("win"):
log.critical("Daemon is not supported on Windows")
sys.exit(1)
if args.pid:
self._pid_lock(args.pid)
self._kill_ghosts()
@ -281,9 +268,9 @@ class Server:
else:
log.info(f"Compute authentication is enabled with username '{config.Server.compute_username}'")
# we only support Python 3 version >= 3.6
if sys.version_info < (3, 6, 0):
raise SystemExit("Python 3.6 or higher is required")
# we only support Python 3 version >= 3.7
if sys.version_info < (3, 7, 0):
raise SystemExit("Python 3.7 or higher is required")
log.info(
"Running with Python {major}.{minor}.{micro} and has PID {pid}".format(
@ -316,9 +303,6 @@ class Server:
access_log = True
if config.Server.enable_ssl:
if sys.platform.startswith("win"):
log.critical("SSL mode is not supported on Windows")
raise SystemExit
log.info("SSL is enabled")
config = uvicorn.Config(
@ -346,10 +330,6 @@ class Server:
loop = asyncio.get_event_loop()
loop.run_until_complete(server.serve())
except OSError as e:
# This is to ignore OSError: [WinError 0] The operation completed successfully exception on Windows.
if not sys.platform.startswith("win") or not e.winerror == 0:
raise
except Exception as e:
log.critical(f"Critical error while running the server: {e}", exc_info=1)
CrashReport.instance().capture_exception()

View File

@ -46,6 +46,6 @@
gtag('config', 'G-5D6FZL9923');
</script>
<script src="runtime.445d8d501d6ed9a85ab9.js" defer></script><script src="polyfills-es5.4eb1fdd946638b823036.js" nomodule defer></script><script src="polyfills.519e27ac94450b1b7e67.js" defer></script><script src="main.228427ce89197cdf9b3b.js" defer></script>
<script src="runtime.445d8d501d6ed9a85ab9.js" defer></script><script src="polyfills-es5.4eb1fdd946638b823036.js" nomodule defer></script><script src="polyfills.519e27ac94450b1b7e67.js" defer></script><script src="main.e8a548933524bbeda197.js" defer></script>
</body></html>

View File

@ -89,14 +89,3 @@ def parse_version(version):
version.append("000000")
version.append("final")
return tuple(version)
def shlex_quote(s):
"""
Compatible shlex_quote to handle case where Windows needs double quotes around file names, not single quotes.
"""
if sys.platform.startswith("win"):
return s if re.match(r"^[-_\w./]+$", s) else '"%s"' % s.replace('"', '\\"')
else:
return shlex.quote(s)

View File

@ -136,22 +136,6 @@ async def wait_for_file_creation(path, timeout=60):
raise asyncio.TimeoutError()
async def wait_for_named_pipe_creation(pipe_path, timeout=60):
import win32pipe
import pywintypes
while timeout > 0:
try:
win32pipe.WaitNamedPipe(pipe_path, 1)
except pywintypes.error:
await asyncio.sleep(0.5)
timeout -= 0.5
else:
return
raise asyncio.TimeoutError()
def locking(f):
@functools.wraps(f)
async def wrapper(oself, *args, **kwargs):

View File

@ -15,19 +15,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import asyncio
from gns3server.utils.asyncio import wait_for_file_creation, wait_for_named_pipe_creation
from gns3server.utils.asyncio import wait_for_file_creation
from gns3server.compute.error import NodeError
"""
This module handle connection to unix socket or Windows named pipe
This module handle connection to unix socket
"""
if sys.platform.startswith("win"):
import win32file
import win32pipe
import msvcrt
class SerialReaderWriterProtocol(asyncio.Protocol):
@ -61,50 +56,6 @@ class SerialReaderWriterProtocol(asyncio.Protocol):
self._output.feed_eof()
class WindowsPipe:
"""
Write input and output stream to the same object
"""
def __init__(self, path):
self._handle = open(path, "a+b")
self._pipe = msvcrt.get_osfhandle(self._handle.fileno())
async def read(self, n=-1):
(read, num_avail, num_message) = win32pipe.PeekNamedPipe(self._pipe, 0)
if num_avail > 0:
(error_code, output) = win32file.ReadFile(self._pipe, num_avail, None)
return output
await asyncio.sleep(0.01)
return b""
def at_eof(self):
return False
def write(self, data):
win32file.WriteFile(self._pipe, data)
async def drain(self):
return
def close(self):
pass
async def _asyncio_open_serial_windows(path):
"""
Open a windows named pipe
:returns: An IO like object
"""
try:
await wait_for_named_pipe_creation(path)
except asyncio.TimeoutError:
raise NodeError(f'Pipe file "{path}" is missing')
return WindowsPipe(path)
async def _asyncio_open_serial_unix(path):
"""
Open a unix socket or a windows named pipe
@ -128,12 +79,9 @@ async def _asyncio_open_serial_unix(path):
async def asyncio_open_serial(path):
"""
Open a unix socket or a windows named pipe
Open an unix socket
:returns: An IO like object
"""
if sys.platform.startswith("win"):
return await _asyncio_open_serial_windows(path)
else:
return await _asyncio_open_serial_unix(path)

View File

@ -225,45 +225,43 @@ class InvalidImageError(Exception):
return self._message
def check_valid_image_header(data: bytes, image_type: str, header_magic_len: int) -> None:
def check_valid_image_header(data: bytes) -> str:
if image_type == "ios":
# file must start with the ELF magic number, be 32-bit, big endian and have an ELF version of 1
if data[:header_magic_len] != b'\x7fELF\x01\x02\x01':
raise InvalidImageError("Invalid IOS file detected")
elif image_type == "iou":
# file must start with the ELF magic number, be 32-bit or 64-bit, little endian and have an ELF version of 1
# (normal IOS images are big endian!)
if data[:header_magic_len] != b'\x7fELF\x01\x01\x01' and data[:7] != b'\x7fELF\x02\x01\x01':
raise InvalidImageError("Invalid IOU file detected")
elif image_type == "qemu":
if data[:header_magic_len] != b'QFI\xfb' and data[:header_magic_len] != b'KDMV':
raise InvalidImageError("Invalid Qemu file detected (must be qcow2 or VDMK format)")
if data[:7] == b'\x7fELF\x01\x02\x01':
# for IOS images: file must start with the ELF magic number, be 32-bit, big endian and have an ELF version of 1
return "ios"
elif data[:7] == b'\x7fELF\x01\x01\x01' or data[:7] == b'\x7fELF\x02\x01\x01':
# for IOU images file must start with the ELF magic number, be 32-bit or 64-bit, little endian and
# have an ELF version of 1 (normal IOS images are big endian!)
return "iou"
elif data[:4] != b'QFI\xfb' or data[:4] != b'KDMV':
return "qemu"
else:
raise InvalidImageError("Could not detect image type, please make sure it is a valid image")
async def write_image(
image_name: str,
image_type: str,
path: str,
image_filename: str,
image_path: str,
stream: AsyncGenerator[bytes, None],
images_repo: ImagesRepository,
check_image_header=True
) -> models.Image:
log.info(f"Writing image file to '{path}'")
image_dir, image_name = os.path.split(image_filename)
log.info(f"Writing image file to '{image_path}'")
# Store the file under its final name only when the upload is completed
tmp_path = path + ".tmp"
os.makedirs(os.path.dirname(path), exist_ok=True)
tmp_path = image_path + ".tmp"
os.makedirs(os.path.dirname(image_path), exist_ok=True)
checksum = hashlib.md5()
header_magic_len = 7
if image_type == "qemu":
header_magic_len = 4
image_type = None
try:
async with aiofiles.open(tmp_path, "wb") as f:
async for chunk in stream:
if check_image_header and len(chunk) >= header_magic_len:
check_image_header = False
check_valid_image_header(chunk, image_type, header_magic_len)
image_type = check_valid_image_header(chunk)
await f.write(chunk)
checksum.update(chunk)
@ -273,12 +271,16 @@ async def write_image(
checksum = checksum.hexdigest()
duplicate_image = await images_repo.get_image_by_checksum(checksum)
if duplicate_image and os.path.dirname(duplicate_image.path) == os.path.dirname(path):
if duplicate_image and os.path.dirname(duplicate_image.path) == os.path.dirname(image_path):
raise InvalidImageError(f"Image {duplicate_image.filename} with "
f"same checksum already exists in the same directory")
except InvalidImageError:
os.remove(tmp_path)
raise
os.chmod(tmp_path, stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC)
shutil.move(tmp_path, path)
return await images_repo.add_image(image_name, image_type, image_size, path, checksum, checksum_algorithm="md5")
if not image_dir:
directory = default_images_directory(image_type)
os.makedirs(directory, exist_ok=True)
image_path = os.path.abspath(os.path.join(directory, image_filename))
shutil.move(tmp_path, image_path)
return await images_repo.add_image(image_name, image_type, image_size, image_path, checksum, checksum_algorithm="md5")

View File

@ -21,7 +21,6 @@ import socket
import struct
import psutil
from .windows_service import check_windows_service_is_running
from gns3server.compute.compute_error import ComputeError
from gns3server.config import Config
@ -197,7 +196,6 @@ def interfaces():
"""
results = []
if not sys.platform.startswith("win"):
allowed_interfaces = Config.instance().settings.Server.allowed_interfaces
net_if_addrs = psutil.net_if_addrs()
for interface in sorted(net_if_addrs.keys()):
@ -228,24 +226,6 @@ def interfaces():
"type": interface_type,
}
)
else:
try:
service_installed = True
if not check_windows_service_is_running("npf") and not check_windows_service_is_running("npcap"):
service_installed = False
else:
results = get_windows_interfaces()
except ImportError:
message = (
"pywin32 module is not installed, please install it on the server to get the available interface names"
)
raise ComputeError(message)
except Exception as e:
log.error(f"uncaught exception {type(e)}", exc_info=1)
raise ComputeError(f"uncaught exception: {e}")
if service_installed is False:
raise ComputeError("The Winpcap or Npcap is not installed or running")
# This interface have special behavior
for result in results:

View File

@ -1,138 +0,0 @@
#!/usr/bin/env python
#
# Copyright (C) 2016 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import os
import argparse
import shutil
import ipaddress
if sys.platform.startswith("win"):
import wmi
else:
raise SystemExit("This script must run on Windows!")
def parse_add_loopback():
"""
Validate params when adding a loopback adapter
"""
class Add(argparse.Action):
def __call__(self, parser, args, values, option_string=None):
try:
ipaddress.IPv4Interface(f"{values[1]}/{values[2]}")
except ipaddress.AddressValueError as e:
raise argparse.ArgumentTypeError(f"Invalid IP address: {e}")
except ipaddress.NetmaskValueError as e:
raise argparse.ArgumentTypeError(f"Invalid subnet mask: {e}")
setattr(args, self.dest, values)
return Add
def add_loopback(devcon_path, name, ip_address, netmask):
# save the list of network adapter in order to find the one we are about to add
previous_adapters = wmi.WMI().Win32_NetworkAdapter()
for adapter in previous_adapters:
if "Loopback" in adapter.Description and adapter.NetConnectionID == name:
raise SystemExit(f'Windows loopback adapter named "{name}" already exists')
# install a new Windows loopback adapter
os.system('"{}" install {}\\inf\\netloop.inf *MSLOOP'.format(devcon_path, os.path.expandvars("%WINDIR%")))
# configure the new Windows loopback adapter
for adapter in wmi.WMI().Win32_NetworkAdapter():
if "Loopback" in adapter.Description and adapter not in previous_adapters:
print(f'Renaming loopback adapter "{adapter.NetConnectionID}" to "{name}"')
adapter.NetConnectionID = name
for network_config in wmi.WMI().Win32_NetworkAdapterConfiguration(IPEnabled=True):
if network_config.InterfaceIndex == adapter.InterfaceIndex:
print(f'Configuring loopback adapter "{name}" with {ip_address} {netmask}')
retcode = network_config.EnableStatic(IPAddress=[ip_address], SubnetMask=[netmask])[0]
if retcode == 1:
print("A reboot is required")
elif retcode != 0:
print('Error while configuring IP/Subnet mask on "{}"')
# FIXME: support gateway?
# network_config.SetGateways(DefaultIPGateway=[""])
break
# restart winpcap/npcap services to take the new adapter into account
os.system("net stop npf")
os.system("net start npf")
os.system("net stop npcap")
os.system("net start npcap")
def remove_loopback(devcon_path, name):
deleted = False
for adapter in wmi.WMI().Win32_NetworkAdapter():
if "Loopback" in adapter.Description and adapter.NetConnectionID == name:
# remove a Windows loopback adapter
print(f'Removing loopback adapter "{name}"')
os.system(f'"{devcon_path}" remove @{adapter.PNPDeviceID}')
deleted = True
if not deleted:
raise SystemExit(f'Could not find adapter "{name}"')
# update winpcap/npcap services
os.system("net stop npf")
os.system("net start npf")
os.system("net stop npcap")
os.system("net start npcap")
def main():
"""
Entry point for the Windows loopback tool.
"""
parser = argparse.ArgumentParser(description="%(prog)s add/remove Windows loopback adapters")
parser.add_argument("-a", "--add", nargs=3, action=parse_add_loopback(), help="add a Windows loopback adapter")
parser.add_argument("-r", "--remove", action="store", help="remove a Windows loopback adapter")
try:
args = parser.parse_args()
except argparse.ArgumentTypeError as e:
raise SystemExit(e)
# devcon is required to install/remove Windows loopback adapters
devcon_path = shutil.which("devcon")
if not devcon_path:
raise SystemExit("Could not find devcon.exe")
from win32com.shell import shell
if not shell.IsUserAnAdmin():
raise SystemExit("You must run this script as an administrator")
try:
if args.add:
add_loopback(devcon_path, args.add[0], args.add[1], args.add[2])
if args.remove:
remove_loopback(devcon_path, args.remove)
except SystemExit as e:
print(e)
os.system("pause")
if __name__ == "__main__":
main()

View File

@ -1,39 +0,0 @@
#!/usr/bin/env python
#
# Copyright (C) 2016 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Check for Windows service.
"""
from gns3server.compute.compute_error import ComputeError
def check_windows_service_is_running(service_name):
import pywintypes
import win32service
import win32serviceutil
try:
if win32serviceutil.QueryServiceStatus(service_name, None)[1] != win32service.SERVICE_RUNNING:
return False
except pywintypes.error as e:
if e.winerror == 1060:
return False
else:
raise ComputeError(f"Could not check if the {service_name} service is running: {e.strerror}")
return True

View File

@ -1,344 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Set up and run the server.
"""
import os
import sys
import signal
import asyncio
import aiohttp
import aiohttp_cors
import functools
import time
import atexit
import weakref
# Import encoding now, to avoid implicit import later.
# Implicit import within threads may cause LookupError when standard library is in a ZIP
import encodings.idna
from .route import Route
from ..config import Config
from ..compute import MODULES
from ..compute.port_manager import PortManager
from ..compute.qemu import Qemu
from ..controller import Controller
# do not delete this import
import gns3server.handlers
import logging
log = logging.getLogger(__name__)
if not (aiohttp.__version__.startswith("3.")):
raise RuntimeError("aiohttp 3.x is required to run the GNS3 server")
class WebServer:
def __init__(self, host, port):
self._host = host
self._port = port
self._loop = None
self._handler = None
self._server = None
self._app = None
self._start_time = time.time()
self._running = False
self._closing = False
self._ssl_context = None
@staticmethod
def instance(host=None, port=None):
"""
Singleton to return only one instance of Server.
:returns: instance of Server
"""
if not hasattr(WebServer, "_instance") or WebServer._instance is None:
assert host is not None
assert port is not None
WebServer._instance = WebServer(host, port)
return WebServer._instance
def _run_application(self, handler, ssl_context=None):
try:
srv = self._loop.create_server(handler, self._host, self._port, ssl=ssl_context)
self._server, startup_res = self._loop.run_until_complete(asyncio.gather(srv, self._app.startup(), loop=self._loop))
except (RuntimeError, OSError, asyncio.CancelledError) as e:
log.critical("Could not start the server: {}".format(e))
return False
return True
async def reload_server(self):
"""
Reload the server.
"""
await Controller.instance().reload()
async def shutdown_server(self):
"""
Cleanly shutdown the server.
"""
if not self._closing:
self._closing = True
else:
log.warning("Close is already in progress")
return
# close websocket connections
websocket_connections = set(self._app['websockets'])
if websocket_connections:
log.info("Closing {} websocket connections...".format(len(websocket_connections)))
for ws in websocket_connections:
await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Server shutdown')
if self._server:
self._server.close()
await self._server.wait_closed()
if self._app:
await self._app.shutdown()
if self._handler:
await self._handler.shutdown(2) # Parameter is timeout
if self._app:
await self._app.cleanup()
await Controller.instance().stop()
for module in MODULES:
log.debug("Unloading module {}".format(module.__name__))
m = module.instance()
await m.unload()
if PortManager.instance().tcp_ports:
log.warning("TCP ports are still used {}".format(PortManager.instance().tcp_ports))
if PortManager.instance().udp_ports:
log.warning("UDP ports are still used {}".format(PortManager.instance().udp_ports))
try:
tasks = asyncio.all_tasks()
except AttributeError:
tasks = asyncio.Task.all_tasks()
for task in tasks:
task.cancel()
try:
await asyncio.wait_for(task, 1)
except BaseException:
pass
self._loop.stop()
def ssl_context(self):
"""
Returns the SSL context for the server.
"""
return self._ssl_context
def _signal_handling(self):
def signal_handler(signame, *args):
try:
if signame == "SIGHUP":
log.info("Server has got signal {}, reloading...".format(signame))
asyncio.ensure_future(self.reload_server())
else:
log.warning("Server has got signal {}, exiting...".format(signame))
asyncio.ensure_future(self.shutdown_server())
except asyncio.CancelledError:
pass
signals = ["SIGTERM", "SIGINT"]
if sys.platform.startswith("win"):
signals.extend(["SIGBREAK"])
else:
signals.extend(["SIGHUP", "SIGQUIT"])
for signal_name in signals:
callback = functools.partial(signal_handler, signal_name)
if sys.platform.startswith("win"):
# add_signal_handler() is not yet supported on Windows
signal.signal(getattr(signal, signal_name), callback)
else:
self._loop.add_signal_handler(getattr(signal, signal_name), callback)
def _create_ssl_context(self, server_config):
import ssl
ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
certfile = server_config["certfile"]
certkey = server_config["certkey"]
try:
ssl_context.load_cert_chain(certfile, certkey)
except FileNotFoundError:
log.critical("Could not find the SSL certfile or certkey")
raise SystemExit
except ssl.SSLError as e:
log.critical("SSL error: {}".format(e))
raise SystemExit
log.info("SSL is enabled")
return ssl_context
async def start_shell(self):
log.error("The embedded shell has been deactivated in this version of GNS3")
return
try:
from ptpython.repl import embed
except ImportError:
log.error("Unable to start a shell: the ptpython module must be installed!")
return
await embed(globals(), locals(), return_asyncio_coroutine=True, patch_stdout=True, history_filename=".gns3_shell_history")
def _exit_handling(self):
"""
Makes sure the asyncio loop is closed.
"""
def close_asyncio_loop():
loop = None
try:
loop = asyncio.get_event_loop()
except AttributeError:
pass
if loop is not None:
loop.close()
atexit.register(close_asyncio_loop)
async def _on_startup(self, *args):
"""
Called when the HTTP server start
"""
await Controller.instance().start()
# Because with a large image collection
# without md5sum already computed we start the
# computing with server start
asyncio.ensure_future(Qemu.instance().list_images())
def run(self):
"""
Starts the server.
"""
server_logger = logging.getLogger('aiohttp.server')
# In debug mode we don't use the standard request log but a more complete in response.py
if log.getEffectiveLevel() == logging.DEBUG:
server_logger.setLevel(logging.CRITICAL)
logger = logging.getLogger("asyncio")
logger.setLevel(logging.ERROR)
if sys.platform.startswith("win"):
loop = asyncio.get_event_loop()
# Add a periodic callback to give a chance to process signals on Windows
# because asyncio.add_signal_handler() is not supported yet on that platform
# otherwise the loop runs outside of signal module's ability to trap signals.
def wakeup():
loop.call_later(0.5, wakeup)
loop.call_later(0.5, wakeup)
server_config = Config.instance().get_section_config("Server")
self._ssl_context = None
if server_config.getboolean("ssl"):
if sys.platform.startswith("win"):
log.critical("SSL mode is not supported on Windows")
raise SystemExit
self._ssl_context = self._create_ssl_context(server_config)
self._loop = asyncio.get_event_loop()
if log.getEffectiveLevel() == logging.DEBUG:
# On debug version we enable info that
# coroutine is not called in a way await/await
self._loop.set_debug(True)
for key, val in os.environ.items():
log.debug("ENV %s=%s", key, val)
self._app = aiohttp.web.Application()
# Keep a list of active websocket connections
self._app['websockets'] = weakref.WeakSet()
# Background task started with the server
self._app.on_startup.append(self._on_startup)
resource_options = aiohttp_cors.ResourceOptions(expose_headers="*", allow_headers="*", max_age=0)
# Allow CORS for this domains
cors = aiohttp_cors.setup(self._app, defaults={
# Default web server for web gui dev
"http://127.0.0.1:8080": resource_options,
"http://localhost:8080": resource_options,
"http://127.0.0.1:4200": resource_options,
"http://localhost:4200": resource_options,
"http://gns3.github.io": resource_options,
"https://gns3.github.io": resource_options
})
PortManager.instance().console_host = self._host
for method, route, handler in Route.get_routes():
log.debug("Adding route: {} {}".format(method, route))
cors.add(self._app.router.add_route(method, route, handler))
for module in MODULES:
log.debug("Loading module {}".format(module.__name__))
m = module.instance()
m.port_manager = PortManager.instance()
log.info("Starting server on {}:{}".format(self._host, self._port))
self._handler = self._app.make_handler()
if self._run_application(self._handler, self._ssl_context) is False:
self._loop.stop()
sys.exit(1)
self._signal_handling()
self._exit_handling()
if server_config.getboolean("shell"):
asyncio.ensure_future(self.start_shell())
try:
self._loop.run_forever()
except TypeError as e:
# This is to ignore an asyncio.windows_events exception
# on Windows when the process gets the SIGBREAK signal
# TypeError: async() takes 1 positional argument but 3 were given
log.warning("TypeError exception in the loop {}".format(e))
finally:
if self._loop.is_running():
try:
self._loop.run_until_complete(self.shutdown_server())
except asyncio.CancelledError:
pass

View File

@ -1,23 +1,18 @@
uvicorn==0.17.0
fastapi==0.72.0
uvicorn==0.17.6
fastapi==0.75.0
python-multipart==0.0.5
websockets==10.1 ; python_version >= "3.7"
websockets==9.1 ; python_version < "3.7"
aiohttp==3.8.1 ; python_version >= "3.7"
aiohttp==3.7.4.post0 ; python_version < "3.7"
async-timeout==4.0.2 ; python_version >= "3.7"
async-timeout==3.0.1 ; python_version < "3.7"
websockets==10.2
aiohttp==3.8.1
async-timeout==4.0.2
aiofiles==0.8.0
Jinja2==3.0.3
sentry-sdk==1.5.2
sentry-sdk==1.5.7
psutil==5.9.0
distro==1.6.0
distro==1.7.0
py-cpuinfo==8.0.0
sqlalchemy==1.4.29
sqlalchemy==1.4.32
aiosqlite===0.17.0
passlib[bcrypt]==1.7.4
python-jose==3.3.0
email-validator==1.1.3
async-exit-stack==1.0.1 ; python_version < "3.7"
async-generator==1.10 ; python_version < "3.7"
setuptools
setuptools==60.6.0 # don't upgrade because of https://github.com/pypa/setuptools/issues/3084

View File

@ -23,9 +23,9 @@ import subprocess
from setuptools import setup, find_packages
from setuptools.command.test import test as TestCommand
# we only support Python 3 version >= 3.6
if len(sys.argv) >= 2 and sys.argv[1] == "install" and sys.version_info < (3, 6, 0):
raise SystemExit("Python 3.6 or higher is required")
# we only support Python 3 version >= 3.7
if len(sys.argv) >= 2 and sys.argv[1] == "install" and sys.version_info < (3, 7, 0):
raise SystemExit("Python 3.7 or higher is required")
class PyTest(TestCommand):
@ -74,13 +74,12 @@ setup(
license="GNU General Public License v3 (GPLv3)",
cmdclass={"test": PyTest},
description="GNS3 server",
long_description=open("README.rst", "r").read(),
long_description=open("README.md", "r").read(),
install_requires=dependencies,
entry_points={
"console_scripts": [
"gns3server = gns3server.main:main",
"gns3vmnet = gns3server.utils.vmnet:main",
"gns3loopback = gns3server.utils.windows_loopback:main"
"gns3vmnet = gns3server.utils.vmnet:main"
]
},
packages=find_packages(".", exclude=["docs", "tests*"]),

View File

@ -29,7 +29,6 @@ from gns3server.utils.path import get_default_project_directory
pytestmark = pytest.mark.asyncio
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
async def test_get(app: FastAPI, compute_client: AsyncClient, windows_platform) -> None:
response = await compute_client.get(app.url_path_for("compute:get_capabilities"))
@ -43,7 +42,6 @@ async def test_get(app: FastAPI, compute_client: AsyncClient, windows_platform)
}
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
async def test_get_on_gns3vm(app: FastAPI, compute_client: AsyncClient, on_gns3vm) -> None:
response = await compute_client.get(app.url_path_for("compute:get_capabilities"))

View File

@ -25,8 +25,7 @@ from unittest.mock import patch
from gns3server.compute.project import Project
pytestmark = [pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows"),
pytest.mark.asyncio]
pytestmark = [pytest.mark.asyncio]
@pytest.fixture

View File

@ -209,7 +209,7 @@ async def test_download_image_forbidden(app: FastAPI, compute_client: AsyncClien
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.skipif(not sys.platform.startswith("win") and os.getuid() == 0, reason="Root can delete any image")
@pytest.mark.skipif(os.getuid() == 0, reason="Root can delete any image")
async def test_upload_image_permission_denied(app: FastAPI, compute_client: AsyncClient, images_dir: str) -> None:
os.makedirs(os.path.join(images_dir, "IOS"), exist_ok=True)

View File

@ -28,8 +28,7 @@ from unittest.mock import patch
from gns3server.compute.project import Project
pytestmark = [pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows"),
pytest.mark.asyncio]
pytestmark = [pytest.mark.asyncio]
@pytest.fixture

View File

@ -34,10 +34,6 @@ pytestmark = pytest.mark.asyncio
def fake_qemu_bin(monkeypatch, tmpdir) -> str:
monkeypatch.setenv("PATH", str(tmpdir))
if sys.platform.startswith("win"):
bin_path = os.path.join(os.environ["PATH"], "qemu-system-x86_64w.exe")
else:
bin_path = os.path.join(os.environ["PATH"], "qemu-system-x86_64")
with open(bin_path, "w+") as f:
f.write("1")
@ -120,7 +116,6 @@ async def test_qemu_create_with_params(app: FastAPI,
assert response.json()["hda_disk_image_md5sum"] == "c4ca4238a0b923820dcc509a6f75849b"
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
async def test_qemu_create_with_project_file(app: FastAPI,
compute_client: AsyncClient,
compute_project: Project,
@ -411,7 +406,7 @@ async def test_download_image_forbidden_location(app: FastAPI, compute_client: A
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.skipif(not sys.platform.startswith("win") and os.getuid() == 0, reason="Root can delete any image")
@pytest.mark.skipif(os.getuid() == 0, reason="Root can delete any image")
async def test_upload_image_permission_denied(app: FastAPI, compute_client: AsyncClient, images_dir: str) -> None:
with open(os.path.join(images_dir, "QEMU", "test2.tmp"), "w+") as f:

View File

@ -60,7 +60,7 @@ def ios_image(tmpdir) -> str:
Create a fake IOS image on disk
"""
path = os.path.join(tmpdir, "ios.bin")
path = os.path.join(tmpdir, "ios_image.bin")
with open(path, "wb+") as f:
f.write(b'\x7fELF\x01\x02\x01')
return path
@ -74,7 +74,7 @@ def qcow2_image(tmpdir) -> str:
path = os.path.join(tmpdir, "image.qcow2")
with open(path, "wb+") as f:
f.write(b'QFI\xfb')
f.write(b'QFI\xfb\x00\x00\x00')
return path
@ -137,7 +137,6 @@ class TestImageRoutes:
response = await client.post(
app.url_path_for("upload_image", image_path=image_name),
params={"image_type": image_type},
content=image_data)
if valid_request:
@ -168,7 +167,6 @@ class TestImageRoutes:
image_data = f.read()
response = await client.post(
app.url_path_for("upload_image", image_path=image_name),
params={"image_type": "qemu"},
content=image_data)
assert response.status_code == status.HTTP_400_BAD_REQUEST
@ -191,7 +189,6 @@ class TestImageRoutes:
image_data = f.read()
response = await client.post(
app.url_path_for("upload_image", image_path=image_name),
params={"image_type": "qemu"},
content=image_data)
assert response.status_code == status.HTTP_201_CREATED
@ -214,7 +211,8 @@ class TestImageRoutes:
images_dir: str,
qcow2_image: str,
subdir: str,
expected_result: int
expected_result: int,
db_session: AsyncSession
) -> None:
image_name = os.path.basename(qcow2_image)
@ -223,7 +221,6 @@ class TestImageRoutes:
image_path = os.path.join(subdir, image_name)
response = await client.post(
app.url_path_for("upload_image", image_path=image_path),
params={"image_type": "qemu"},
content=image_data)
assert response.status_code == expected_result
@ -273,7 +270,7 @@ class TestImageRoutes:
image_data = f.read()
response = await client.post(
app.url_path_for("upload_image", image_path=image_name),
params={"image_type": "qemu", "install_appliances": "true"},
params={"install_appliances": "true"},
content=image_data)
assert response.status_code == status.HTTP_201_CREATED

View File

@ -1401,7 +1401,6 @@ async def test_get_image_information(compute_project, manager):
mock.assert_called_with("GET", "images/ubuntu:latest/json")
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_mount_binds(vm):
@ -1476,7 +1475,6 @@ async def test_create_network_interfaces(vm):
assert "eth5" not in content
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_fix_permission(vm):
@ -1489,7 +1487,6 @@ async def test_fix_permission(vm):
assert process.wait.called
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_fix_permission_not_running(vm):

View File

@ -44,7 +44,6 @@ def test_vm_invalid_dynamips_path(manager, config):
manager.find_dynamips()
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported by Windows")
def test_vm_non_executable_dynamips_path(manager, config):
tmpfile = tempfile.NamedTemporaryFile()

View File

@ -26,11 +26,7 @@ import shutil
from tests.utils import asyncio_patch, AsyncioMagicMock
from unittest.mock import patch, MagicMock
pytestmark = pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
if not sys.platform.startswith("win"):
from unittest.mock import MagicMock
from gns3server.compute.iou.iou_vm import IOUVM
from gns3server.compute.iou.iou_error import IOUError
from gns3server.compute.iou import IOU

View File

@ -43,9 +43,6 @@ async def test_get_qemu_version():
with asyncio_patch("gns3server.compute.qemu.subprocess_check_output", return_value="QEMU emulator version 2.2.0, Copyright (c) 2003-2008 Fabrice Bellard"):
version = await Qemu.get_qemu_version("/tmp/qemu-test")
if sys.platform.startswith("win"):
assert version == ""
else:
assert version == "2.2.0"
@ -62,9 +59,6 @@ async def test_binary_list(monkeypatch, tmpdir):
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
with asyncio_patch("gns3server.compute.qemu.subprocess_check_output", return_value="QEMU emulator version 2.2.0, Copyright (c) 2003-2008 Fabrice Bellard") as mock:
if sys.platform.startswith("win"):
version = ""
else:
version = "2.2.0"
qemus = await Qemu.binary_list()

View File

@ -57,9 +57,6 @@ def fake_qemu_img_binary(monkeypatch, tmpdir):
def fake_qemu_binary(monkeypatch, tmpdir):
monkeypatch.setenv("PATH", str(tmpdir))
if sys.platform.startswith("win"):
bin_path = os.path.join(os.environ["PATH"], "qemu-system-x86_64w.exe")
else:
bin_path = os.path.join(os.environ["PATH"], "qemu-system-x86_64")
with open(bin_path, "w+") as f:
f.write("1")
@ -190,7 +187,6 @@ async def test_termination_callback(vm):
assert event == vm
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_termination_callback_error(vm, tmpdir):
@ -276,9 +272,6 @@ def test_set_qemu_path(vm, tmpdir, fake_qemu_binary):
vm.qemu_path = None
# Should not crash with unicode characters
if sys.platform.startswith("win"):
path = str(tmpdir / "\u62FF" / "qemu-system-mipsw.exe")
else:
path = str(tmpdir / "\u62FF" / "qemu-system-mips")
os.makedirs(str(tmpdir / "\u62FF"))
@ -291,7 +284,6 @@ def test_set_qemu_path(vm, tmpdir, fake_qemu_binary):
f.write("1")
# Raise because file is not executable
if not sys.platform.startswith("win"):
with pytest.raises(QemuError):
vm.qemu_path = path
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
@ -314,7 +306,6 @@ def test_set_qemu_path_windows(vm):
bin_path = os.path.join(os.environ["PATH"], "qemu-system-x86_64w.EXE")
open(bin_path, "w+").close()
if not sys.platform.startswith("win"):
os.chmod(bin_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
vm.qemu_path = bin_path
@ -327,7 +318,6 @@ def test_set_qemu_path_old_windows(vm):
bin_path = os.path.join(os.environ["PATH"], "qemu.exe")
open(bin_path, "w+").close()
if not sys.platform.startswith("win"):
os.chmod(bin_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
vm.qemu_path = bin_path
@ -336,7 +326,6 @@ def test_set_qemu_path_old_windows(vm):
assert vm.platform == "i386"
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
def test_set_qemu_path_kvm_binary(vm, fake_qemu_binary):
bin_path = os.path.join(os.environ["PATH"], "qemu-kvm")
@ -358,9 +347,6 @@ async def test_set_platform(compute_project, manager):
with patch("shutil.which", return_value="/bin/qemu-system-x86_64") as which_mock:
with patch("gns3server.compute.qemu.QemuVM._check_qemu_path"):
vm = QemuVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", compute_project, manager, platform="x86_64")
if sys.platform.startswith("win"):
which_mock.assert_called_with("qemu-system-x86_64w.exe", path=mock.ANY)
else:
which_mock.assert_called_with("qemu-system-x86_64", path=mock.ANY)
assert vm.platform == "x86_64"
assert vm.qemu_path == "/bin/qemu-system-x86_64"
@ -447,7 +433,6 @@ async def test_disk_options_multiple_disk(vm, tmpdir, fake_qemu_img_binary):
]
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_set_process_priority(vm, fake_qemu_img_binary):
@ -461,7 +446,6 @@ async def test_set_process_priority(vm, fake_qemu_img_binary):
assert args == ("renice", "-n", "5", "-p", "42")
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_set_process_priority_normal(vm, fake_qemu_img_binary):
@ -634,7 +618,6 @@ async def test_build_command_kvm_2_4(linux_platform, vm, fake_qemu_binary):
]
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_build_command_without_display(vm):
@ -757,7 +740,6 @@ async def test_build_command_large_number_of_adapters(vm):
await vm._build_command()
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_build_command_with_invalid_options(vm):

View File

@ -162,7 +162,7 @@ async def test_project_delete():
assert os.path.exists(directory) is False
@pytest.mark.skipif(not sys.platform.startswith("win") and os.getuid() == 0, reason="Root can delete any project")
@pytest.mark.skipif(os.getuid() == 0, reason="Root can delete any project")
@pytest.mark.asyncio
async def test_project_delete_permission_issue():

View File

@ -190,9 +190,6 @@ async def test_stop(vm):
await vm.stop()
assert vm.is_running() is False
if sys.platform.startswith("win"):
process.send_signal.assert_called_with(1)
else:
process.terminate.assert_called_with()
await queue.get(1) #  Ping
@ -226,9 +223,6 @@ async def test_reload(vm):
await vm.reload()
assert vm.is_running() is True
if sys.platform.startswith("win"):
process.send_signal.assert_called_with(1)
else:
process.terminate.assert_called_with()
@ -240,7 +234,6 @@ async def test_add_nio_binding_udp(vm):
assert nio.lport == 4242
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_add_nio_binding_tap(vm, ethernet_device):

View File

@ -32,20 +32,6 @@ sys._called_from_test = True
sys.original_platform = sys.platform
if sys.platform.startswith("win") and sys.version_info < (3, 8):
@pytest.fixture(scope="session")
def event_loop(request):
"""
Overwrite pytest_asyncio event loop on Windows for Python < 3.8
As of Python 3.8, the default event loop on Windows is Proactor
"""
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
yield loop
asyncio.set_event_loop(None)
# https://github.com/pytest-dev/pytest-asyncio/issues/68
# this event_loop is used by pytest-asyncio, and redefining it
# is currently the only way of changing the scope of this fixture

View File

@ -69,7 +69,6 @@ async def test_json(controller):
assert vm.asdict() == vm._settings
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not working well on Windows")
@pytest.mark.asyncio
async def test_update_settings(controller):
@ -88,7 +87,6 @@ async def test_update_settings(controller):
assert "vm" not in controller.computes
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not working well on Windows")
@pytest.mark.asyncio
async def test_auto_start(controller, dummy_gns3vm, dummy_engine):
"""
@ -106,7 +104,6 @@ async def test_auto_start(controller, dummy_gns3vm, dummy_engine):
assert controller.computes["vm"].password.get_secret_value() == "world"
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not working well on Windows")
@pytest.mark.asyncio
async def test_auto_start_with_error(controller, dummy_gns3vm, dummy_engine):

View File

@ -134,7 +134,6 @@ async def test_init_path(tmpdir):
assert p.path == str(tmpdir)
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_changing_path_with_quote_not_allowed(tmpdir):

View File

@ -45,7 +45,6 @@ async def test_exception_wait_run_in_executor():
await wait_run_in_executor(raise_exception)
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
@pytest.mark.asyncio
async def test_subprocess_check_output(tmpdir):

View File

@ -33,18 +33,13 @@ def test_interfaces():
assert "name" in interface
assert "ip_address" in interface
assert "mac_address" in interface
if sys.platform.startswith("win"):
assert "netcard" in interface
assert "type" in interface
assert "netmask" in interface
def test_has_netmask(config):
if sys.platform.startswith("win"):
# No loopback
pass
elif sys.platform.startswith("darwin"):
if sys.platform.startswith("darwin"):
assert has_netmask("lo0") is True
else:
assert has_netmask("lo") is True
@ -52,10 +47,7 @@ def test_has_netmask(config):
def test_is_interface_up():
if sys.platform.startswith("win"):
# is_interface_up() always returns True on Windows
pass
elif sys.platform.startswith("darwin"):
if sys.platform.startswith("darwin"):
assert is_interface_up("lo0") is True
else:
assert is_interface_up("lo") is True

View File

@ -1,4 +1,4 @@
-r requirements.txt
pywin32==301
pywin32==303
wmi==1.5.1