1
0
mirror of https://github.com/GNS3/gns3-server synced 2025-01-20 13:01:11 +00:00
gns3-server/tests/compute/docker/test_docker.py
grossmj d3860ba84b
Merge branch '2.2' into 3.0
# Conflicts:
#	CHANGELOG
#	dev-requirements.txt
#	gns3server/compute/qemu/__init__.py
#	gns3server/compute/vmware/__init__.py
#	gns3server/controller/snapshot.py
#	gns3server/crash_report.py
#	gns3server/handlers/api/compute/atm_switch_handler.py
#	gns3server/run.py
#	gns3server/static/web-ui/26.77d4bfd104f37c42e028.js
#	gns3server/static/web-ui/index.html
#	gns3server/static/web-ui/runtime.415291667f70565cd8ef.js
#	gns3server/utils/__init__.py
#	gns3server/utils/images.py
#	gns3server/utils/interfaces.py
#	gns3server/version.py
#	gns3server/web/web_server.py
#	pytest.ini
#	requirements.txt
#	scripts/update-bundled-web-ui.sh
#	setup.py
#	tests/api/routes/compute/test_dynamips_nodes.py
#	tests/compute/builtin/nodes/test_cloud.py
#	tests/compute/docker/test_docker.py
#	tests/compute/docker/test_docker_vm.py
#	tests/compute/dynamips/test_dynamips_manager.py
#	tests/compute/dynamips/test_dynamips_router.py
#	tests/compute/iou/test_iou_vm.py
#	tests/compute/qemu/test_qcow2.py
#	tests/compute/qemu/test_qemu_manager.py
#	tests/compute/qemu/test_qemu_vm.py
#	tests/compute/test_base_node.py
#	tests/compute/test_manager.py
#	tests/compute/test_project.py
#	tests/compute/traceng/test_traceng_vm.py
#	tests/compute/virtualbox/test_virtualbox_manager.py
#	tests/compute/virtualbox/test_virtualbox_vm.py
#	tests/compute/vmware/test_vmware_manager.py
#	tests/compute/vmware/test_vmware_vm.py
#	tests/compute/vpcs/test_vpcs_vm.py
#	tests/conftest.py
#	tests/controller/gns3vm/test_remote_gns3_vm.py
#	tests/controller/gns3vm/test_virtualbox_gns3_vm.py
#	tests/controller/gns3vm/test_vmware_gns3_vm.py
#	tests/controller/test_export_project.py
#	tests/controller/test_gns3vm.py
#	tests/controller/test_import_project.py
#	tests/test_config.py
#	tests/utils/test_asyncio.py
#	tests/utils/test_images.py
#	tests/web/test_response.py
2024-05-16 17:53:39 +07:00

262 lines
9.5 KiB
Python

#!/usr/bin/env python
#
# Copyright (C) 2020 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import pytest
import pytest_asyncio
from unittest.mock import MagicMock, patch
from tests.utils import asyncio_patch, AsyncioMagicMock
from gns3server.compute.docker import Docker, DOCKER_PREFERRED_API_VERSION, DOCKER_MINIMUM_API_VERSION
from gns3server.compute.docker.docker_error import DockerError, DockerHttp404Error
@pytest_asyncio.fixture
async def vm():
vm = Docker()
vm._connected = True
vm._session = MagicMock()
vm._session.closed = False
return vm
@pytest.mark.asyncio
async def test_query_success(vm):
response = MagicMock()
response.status = 200
response.headers = {'CONTENT-TYPE': 'application/json'}
async def read():
return b'{"c": false}'
response.read.side_effect = read
vm._session.request = AsyncioMagicMock(return_value=response)
data = await vm.query("POST", "test", data={"a": True}, params={"b": 1})
vm._session.request.assert_called_with('POST',
'http://docker/v1.25/test',
data='{"a": true}',
headers={'content-type': 'application/json'},
params={'b': 1},
timeout=300)
assert data == {"c": False}
@pytest.mark.asyncio
async def test_query_error(vm):
response = MagicMock()
response.status = 404
async def read():
return b"NOT FOUND"
response.read.side_effect = read
vm._session.request = AsyncioMagicMock(return_value=response)
with pytest.raises(DockerError):
await vm.query("POST", "test", data={"a": True}, params={"b": 1})
vm._session.request.assert_called_with('POST',
'http://docker/v1.25/test',
data='{"a": true}',
headers={'content-type': 'application/json'},
params={'b': 1},
timeout=300)
@pytest.mark.asyncio
async def test_query_error_json(vm):
response = MagicMock()
response.status = 404
async def read():
return b'{"message": "Error"}'
response.read.side_effect = read
vm._session.request = AsyncioMagicMock(return_value=response)
with pytest.raises(DockerError):
await vm.query("POST", "test", data={"a": True}, params={"b": 1})
vm._session.request.assert_called_with('POST',
'http://docker/v1.25/test',
data='{"a": true}',
headers={'content-type': 'application/json'},
params={'b': 1},
timeout=300)
@pytest.mark.asyncio
async def test_list_images():
response = [
{
"RepoTags": [
"ubuntu:12.04",
"ubuntu:precise",
"ubuntu:latest"
],
"Id": "8dbd9e392a964056420e5d58ca5cc376ef18e2de93b5cc90e868a1bbc8318c1c",
"Created": 1365714795,
"Size": 131506275,
"VirtualSize": 131506275
},
{
"RepoTags": [
"ubuntu:12.10",
"ubuntu:quantal",
"<none>:<none>"
],
"ParentId": "27cf784147099545",
"Id": "b750fe79269d2ec9a3c593ef05b4332b1d1a02a62b4accb2c21d589ff2f5f2dc",
"Created": 1364102658,
"Size": 24653,
"VirtualSize": 180116135
}
]
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock:
images = await Docker.instance().list_images()
mock.assert_called_with("GET", "images/json", params={"all": 0})
assert len(images) == 5
assert {"image": "ubuntu:12.04"} in images
assert {"image": "ubuntu:precise"} in images
assert {"image": "ubuntu:latest"} in images
assert {"image": "ubuntu:12.10"} in images
assert {"image": "ubuntu:quantal"} in images
@pytest.mark.asyncio
async def test_pull_image():
class Response:
"""
Simulate a response split in multiple packets
"""
def __init__(self):
self._read = -1
async def read(self, size):
self._read += 1
if self._read == 0:
return b'{"progress": "0/100",'
elif self._read == 1:
return '"id": 42}'
else:
None
mock_query = MagicMock()
mock_query.content.return_value = Response()
with asyncio_patch("gns3server.compute.docker.Docker.query", side_effect=DockerHttp404Error("404")):
with asyncio_patch("gns3server.compute.docker.Docker.http_query", return_value=mock_query) as mock:
await Docker.instance().pull_image("ubuntu")
mock.assert_called_with("POST", "images/create", params={"fromImage": "ubuntu"}, timeout=None)
@pytest.mark.asyncio
async def test_docker_check_connection_docker_minimum_version(vm):
response = {
'ApiVersion': '1.01',
'Version': '1.12'
}
with patch("gns3server.compute.docker.Docker.connector"), \
asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response):
vm._connected = False
with pytest.raises(DockerError):
await vm._check_connection()
@pytest.mark.asyncio
async def test_docker_check_connection_docker_preferred_version_against_newer(vm):
response = {
'ApiVersion': '1.31'
}
with patch("gns3server.compute.docker.Docker.connector"), \
asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response):
vm._connected = False
await vm._check_connection()
assert vm._api_version == DOCKER_PREFERRED_API_VERSION
@pytest.mark.asyncio
async def test_docker_check_connection_docker_preferred_version_against_older(vm):
response = {
'ApiVersion': '1.27',
}
with patch("gns3server.compute.docker.Docker.connector"), \
asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response):
vm._connected = False
await vm._check_connection()
assert vm._api_version == DOCKER_MINIMUM_API_VERSION
@pytest.mark.asyncio
async def test_install_busybox():
mock_process = MagicMock()
mock_process.returncode = 1 # means that busybox is not dynamically linked
mock_process.communicate = AsyncioMagicMock(return_value=(b"", b"not a dynamic executable"))
with patch("gns3server.compute.docker.os.path.isfile", return_value=False):
with patch("gns3server.compute.docker.shutil.which", return_value="/usr/bin/busybox"):
with asyncio_patch("gns3server.compute.docker.asyncio.create_subprocess_exec", return_value=mock_process) as create_subprocess_mock:
with patch("gns3server.compute.docker.shutil.copy2") as copy2_mock:
dst_dir = Docker.resources_path()
await Docker.install_busybox(dst_dir)
create_subprocess_mock.assert_called_with(
"ldd",
"/usr/bin/busybox",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
assert copy2_mock.called
@pytest.mark.asyncio
async def test_install_busybox_dynamic_linked():
mock_process = MagicMock()
mock_process.returncode = 0 # means that busybox is dynamically linked
mock_process.communicate = AsyncioMagicMock(return_value=(b"Dynamically linked library", b""))
with patch("os.path.isfile", return_value=False):
with patch("gns3server.compute.docker.shutil.which", return_value="/usr/bin/busybox"):
with asyncio_patch("gns3server.compute.docker.asyncio.create_subprocess_exec", return_value=mock_process):
with pytest.raises(DockerError) as e:
dst_dir = Docker.resources_path()
await Docker.install_busybox(dst_dir)
assert str(e.value) == "No busybox executable could be found, please install busybox (apt install busybox-static on Debian/Ubuntu) and make sure it is in your PATH"
@pytest.mark.asyncio
async def test_install_busybox_no_executables():
with patch("gns3server.compute.docker.os.path.isfile", return_value=False):
with patch("gns3server.compute.docker.shutil.which", return_value=None):
with pytest.raises(DockerError) as e:
dst_dir = Docker.resources_path()
await Docker.install_busybox(dst_dir)
assert str(e.value) == "No busybox executable could be found, please install busybox (apt install busybox-static on Debian/Ubuntu) and make sure it is in your PATH"