mirror of https://github.com/GNS3/gns3-server
commit
8cf55166cb
@ -0,0 +1,227 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
#
|
||||||
|
# Copyright (C) 2014 GNS3 Technologies Inc.
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import asyncio.subprocess
|
||||||
|
|
||||||
|
import logging
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Mostly from https://code.google.com/p/miniboa/source/browse/trunk/miniboa/telnet.py
|
||||||
|
|
||||||
|
# Telnet Commands
|
||||||
|
SE = 240 # End of sub-negotiation parameters
|
||||||
|
NOP = 241 # No operation
|
||||||
|
DATMK = 242 # Data stream portion of a sync.
|
||||||
|
BREAK = 243 # NVT Character BRK
|
||||||
|
IP = 244 # Interrupt Process
|
||||||
|
AO = 245 # Abort Output
|
||||||
|
AYT = 246 # Are you there
|
||||||
|
EC = 247 # Erase Character
|
||||||
|
EL = 248 # Erase Line
|
||||||
|
GA = 249 # The Go Ahead Signal
|
||||||
|
SB = 250 # Sub-option to follow
|
||||||
|
WILL = 251 # Will; request or confirm option begin
|
||||||
|
WONT = 252 # Wont; deny option request
|
||||||
|
DO = 253 # Do = Request or confirm remote option
|
||||||
|
DONT = 254 # Don't = Demand or confirm option halt
|
||||||
|
IAC = 255 # Interpret as Command
|
||||||
|
SEND = 1 # Sub-process negotiation SEND command
|
||||||
|
IS = 0 # Sub-process negotiation IS command
|
||||||
|
|
||||||
|
# Telnet Options
|
||||||
|
BINARY = 0 # Transmit Binary
|
||||||
|
ECHO = 1 # Echo characters back to sender
|
||||||
|
RECON = 2 # Reconnection
|
||||||
|
SGA = 3 # Suppress Go-Ahead
|
||||||
|
TMARK = 6 # Timing Mark
|
||||||
|
TTYPE = 24 # Terminal Type
|
||||||
|
NAWS = 31 # Negotiate About Window Size
|
||||||
|
LINEMO = 34 # Line Mode
|
||||||
|
|
||||||
|
READ_SIZE = 1024
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncioTelnetServer:
|
||||||
|
|
||||||
|
def __init__(self, reader=None, writer=None):
|
||||||
|
self._reader = reader
|
||||||
|
self._writer = writer
|
||||||
|
self._clients = set()
|
||||||
|
self._lock = asyncio.Lock()
|
||||||
|
self._reader_process = None
|
||||||
|
self._current_read = None
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def run(self, network_reader, network_writer):
|
||||||
|
# Keep track of connected clients
|
||||||
|
self._clients.add(network_writer)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Send initial telnet session opening
|
||||||
|
network_writer.write(bytes([IAC, WILL, ECHO,
|
||||||
|
IAC, WILL, SGA,
|
||||||
|
IAC, WILL, BINARY,
|
||||||
|
IAC, DO, BINARY]))
|
||||||
|
yield from network_writer.drain()
|
||||||
|
|
||||||
|
yield from self._process(network_reader, network_writer)
|
||||||
|
except ConnectionResetError:
|
||||||
|
with (yield from self._lock):
|
||||||
|
if self._reader_process == network_reader:
|
||||||
|
self._reader_process = None
|
||||||
|
# Cancel current read from this reader
|
||||||
|
self._current_read.cancel()
|
||||||
|
self._clients.remove(network_writer)
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def _get_reader(self, network_reader):
|
||||||
|
"""
|
||||||
|
Get a reader or None if another reader is already reading.
|
||||||
|
"""
|
||||||
|
with (yield from self._lock):
|
||||||
|
if self._reader_process is None:
|
||||||
|
self._reader_process = network_reader
|
||||||
|
if self._reader_process == network_reader:
|
||||||
|
self._current_read = asyncio.async(self._reader.read(READ_SIZE))
|
||||||
|
return self._current_read
|
||||||
|
return None
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def _process(self, network_reader, network_writer):
|
||||||
|
network_read = asyncio.async(network_reader.read(READ_SIZE))
|
||||||
|
reader_read = yield from self._get_reader(network_reader)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if reader_read is None:
|
||||||
|
reader_read = yield from self._get_reader(network_reader)
|
||||||
|
if reader_read is None:
|
||||||
|
done, pending = yield from asyncio.wait(
|
||||||
|
[
|
||||||
|
network_read,
|
||||||
|
],
|
||||||
|
timeout=1,
|
||||||
|
return_when=asyncio.FIRST_COMPLETED)
|
||||||
|
else:
|
||||||
|
done, pending = yield from asyncio.wait(
|
||||||
|
[
|
||||||
|
network_read,
|
||||||
|
reader_read
|
||||||
|
],
|
||||||
|
return_when=asyncio.FIRST_COMPLETED)
|
||||||
|
for coro in done:
|
||||||
|
data = coro.result()
|
||||||
|
if coro == network_read:
|
||||||
|
network_read = asyncio.async(network_reader.read(READ_SIZE))
|
||||||
|
if IAC in data:
|
||||||
|
data = yield from self._IAC_parser(data, network_reader, network_writer)
|
||||||
|
if self._writer:
|
||||||
|
self._writer.write(data)
|
||||||
|
yield from self._writer.drain()
|
||||||
|
elif coro == reader_read:
|
||||||
|
reader_read = yield from self._get_reader(network_reader)
|
||||||
|
# Replicate the output on all clients
|
||||||
|
for writer in self._clients:
|
||||||
|
writer.write(data)
|
||||||
|
yield from writer.drain()
|
||||||
|
|
||||||
|
def _IAC_parser(self, buf, network_reader, network_writer):
|
||||||
|
"""
|
||||||
|
Processes and removes any Telnet commands from the buffer.
|
||||||
|
|
||||||
|
:param buf: buffer
|
||||||
|
:returns: buffer minus Telnet commands
|
||||||
|
"""
|
||||||
|
|
||||||
|
skip_to = 0
|
||||||
|
while True:
|
||||||
|
# Locate an IAC to process
|
||||||
|
iac_loc = buf.find(IAC, skip_to)
|
||||||
|
if iac_loc < 0:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Get the TELNET command
|
||||||
|
iac_cmd = bytearray([IAC])
|
||||||
|
try:
|
||||||
|
iac_cmd.append(buf[iac_loc + 1])
|
||||||
|
except IndexError:
|
||||||
|
d = yield from network_reader.read(1)
|
||||||
|
buf.extend(d)
|
||||||
|
iac_cmd.append(buf[iac_loc + 1])
|
||||||
|
|
||||||
|
# Is this just a 2-byte TELNET command?
|
||||||
|
if iac_cmd[1] not in [WILL, WONT, DO, DONT]:
|
||||||
|
if iac_cmd[1] == AYT:
|
||||||
|
log.debug("Telnet server received Are-You-There (AYT)")
|
||||||
|
network_writer.write(b'\r\nYour Are-You-There received. I am here.\r\n')
|
||||||
|
elif iac_cmd[1] == IAC:
|
||||||
|
# It's data, not an IAC
|
||||||
|
iac_cmd.pop()
|
||||||
|
# This prevents the 0xff from being
|
||||||
|
# interrupted as yet another IAC
|
||||||
|
skip_to = iac_loc + 1
|
||||||
|
log.debug("Received IAC IAC")
|
||||||
|
elif iac_cmd[1] == NOP:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
log.debug("Unhandled telnet command: "
|
||||||
|
"{0:#x} {1:#x}".format(*iac_cmd))
|
||||||
|
|
||||||
|
# This must be a 3-byte TELNET command
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
iac_cmd.append(buf[iac_loc + 2])
|
||||||
|
except IndexError:
|
||||||
|
d = yield from network_reader.read(1)
|
||||||
|
buf.extend(d)
|
||||||
|
iac_cmd.append(buf[iac_loc + 2])
|
||||||
|
# We do ECHO, SGA, and BINARY. Period.
|
||||||
|
if iac_cmd[1] == DO and iac_cmd[2] not in [ECHO, SGA, BINARY]:
|
||||||
|
network_writer.write(bytes([IAC, WONT, iac_cmd[2]]))
|
||||||
|
log.debug("Telnet WON'T {:#x}".format(iac_cmd[2]))
|
||||||
|
else:
|
||||||
|
log.debug("Unhandled telnet command: "
|
||||||
|
"{0:#x} {1:#x} {2:#x}".format(*iac_cmd))
|
||||||
|
|
||||||
|
# Remove the entire TELNET command from the buffer
|
||||||
|
buf = buf.replace(iac_cmd, b'', 1)
|
||||||
|
|
||||||
|
yield from network_writer.drain()
|
||||||
|
|
||||||
|
# Return the new copy of the buffer, minus telnet commands
|
||||||
|
return buf
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
|
process = loop.run_until_complete(asyncio.async(asyncio.subprocess.create_subprocess_exec("bash",
|
||||||
|
stdout=asyncio.subprocess.PIPE,
|
||||||
|
stderr=asyncio.subprocess.STDOUT,
|
||||||
|
stdin=asyncio.subprocess.PIPE)))
|
||||||
|
server = AsyncioTelnetServer(reader=process.stdout, writer=process.stdin)
|
||||||
|
|
||||||
|
coro = asyncio.start_server(server.run, '127.0.0.1', 2222, loop=loop)
|
||||||
|
s = loop.run_until_complete(coro)
|
||||||
|
|
||||||
|
try:
|
||||||
|
loop.run_forever()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
# Close the server
|
||||||
|
s.close()
|
||||||
|
loop.run_until_complete(s.wait_closed())
|
||||||
|
loop.close()
|
@ -0,0 +1,125 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
#
|
||||||
|
# Copyright (C) 2015 GNS3 Technologies Inc.
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import os
|
||||||
|
import stat
|
||||||
|
import sys
|
||||||
|
import uuid
|
||||||
|
import aiohttp
|
||||||
|
|
||||||
|
from tests.utils import asyncio_patch
|
||||||
|
from unittest.mock import patch, MagicMock, PropertyMock
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def base_params():
|
||||||
|
"""Return standard parameters"""
|
||||||
|
return {"name": "PC TEST 1", "image": "nginx", "start_command": "nginx-daemon", "adapters": 2, "environment": "YES=1\nNO=0"}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def vm(server, project, base_params):
|
||||||
|
with asyncio_patch("gns3server.modules.docker.Docker.list_images", return_value=[{"image": "nginx"}]) as mock_list:
|
||||||
|
with asyncio_patch("gns3server.modules.docker.Docker.query", return_value={"Id": "8bd8153ea8f5"}) as mock:
|
||||||
|
response = server.post("/projects/{project_id}/docker/vms".format(project_id=project.id), base_params)
|
||||||
|
if response.status != 201:
|
||||||
|
print(response.body)
|
||||||
|
assert response.status == 201
|
||||||
|
return response.json
|
||||||
|
|
||||||
|
|
||||||
|
def test_docker_create(server, project, base_params):
|
||||||
|
with asyncio_patch("gns3server.modules.docker.Docker.list_images", return_value=[{"image": "nginx"}]) as mock_list:
|
||||||
|
with asyncio_patch("gns3server.modules.docker.Docker.query", return_value={"Id": "8bd8153ea8f5"}) as mock:
|
||||||
|
response = server.post("/projects/{project_id}/docker/vms".format(project_id=project.id), base_params)
|
||||||
|
assert response.status == 201
|
||||||
|
assert response.route == "/projects/{project_id}/docker/vms"
|
||||||
|
assert response.json["name"] == "PC TEST 1"
|
||||||
|
assert response.json["project_id"] == project.id
|
||||||
|
assert response.json["container_id"] == "8bd8153ea8f5"
|
||||||
|
assert response.json["image"] == "nginx"
|
||||||
|
assert response.json["adapters"] == 2
|
||||||
|
assert response.json["environment"] == "YES=1\nNO=0"
|
||||||
|
|
||||||
|
|
||||||
|
def test_docker_start(server, vm):
|
||||||
|
with asyncio_patch("gns3server.modules.docker.docker_vm.DockerVM.start", return_value=True) as mock:
|
||||||
|
response = server.post("/projects/{project_id}/docker/vms/{vm_id}/start".format(project_id=vm["project_id"], vm_id=vm["vm_id"]))
|
||||||
|
assert mock.called
|
||||||
|
assert response.status == 204
|
||||||
|
|
||||||
|
|
||||||
|
def test_docker_stop(server, vm):
|
||||||
|
with asyncio_patch("gns3server.modules.docker.docker_vm.DockerVM.stop", return_value=True) as mock:
|
||||||
|
response = server.post("/projects/{project_id}/docker/vms/{vm_id}/stop".format(project_id=vm["project_id"], vm_id=vm["vm_id"]))
|
||||||
|
assert mock.called
|
||||||
|
assert response.status == 204
|
||||||
|
|
||||||
|
|
||||||
|
def test_docker_reload(server, vm):
|
||||||
|
with asyncio_patch("gns3server.modules.docker.docker_vm.DockerVM.restart", return_value=True) as mock:
|
||||||
|
response = server.post("/projects/{project_id}/docker/vms/{vm_id}/reload".format(project_id=vm["project_id"], vm_id=vm["vm_id"]))
|
||||||
|
assert mock.called
|
||||||
|
assert response.status == 204
|
||||||
|
|
||||||
|
|
||||||
|
def test_docker_delete(server, vm):
|
||||||
|
with asyncio_patch("gns3server.modules.docker.docker_vm.DockerVM.remove", return_value=True) as mock:
|
||||||
|
response = server.delete("/projects/{project_id}/docker/vms/{vm_id}".format(project_id=vm["project_id"], vm_id=vm["vm_id"]))
|
||||||
|
assert mock.called
|
||||||
|
assert response.status == 204
|
||||||
|
|
||||||
|
|
||||||
|
def test_docker_reload(server, vm):
|
||||||
|
with asyncio_patch("gns3server.modules.docker.docker_vm.DockerVM.pause", return_value=True) as mock:
|
||||||
|
response = server.post("/projects/{project_id}/docker/vms/{vm_id}/suspend".format(project_id=vm["project_id"], vm_id=vm["vm_id"]))
|
||||||
|
assert mock.called
|
||||||
|
assert response.status == 204
|
||||||
|
|
||||||
|
|
||||||
|
def test_docker_nio_create_udp(server, vm):
|
||||||
|
response = server.post("/projects/{project_id}/docker/vms/{vm_id}/adapters/0/ports/0/nio".format(project_id=vm["project_id"], vm_id=vm["vm_id"]), {"type": "nio_udp",
|
||||||
|
"lport": 4242,
|
||||||
|
"rport": 4343,
|
||||||
|
"rhost": "127.0.0.1"},
|
||||||
|
example=True)
|
||||||
|
assert response.status == 201
|
||||||
|
assert response.route == "/projects/{project_id}/docker/vms/{vm_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio"
|
||||||
|
assert response.json["type"] == "nio_udp"
|
||||||
|
|
||||||
|
|
||||||
|
def test_docker_delete_nio(server, vm):
|
||||||
|
with asyncio_patch("gns3server.modules.docker.docker_vm.DockerVM.adapter_remove_nio_binding") as mock:
|
||||||
|
response = server.delete("/projects/{project_id}/docker/vms/{vm_id}/adapters/0/ports/0/nio".format(project_id=vm["project_id"], vm_id=vm["vm_id"]), example=True)
|
||||||
|
assert response.status == 204
|
||||||
|
assert response.route == "/projects/{project_id}/docker/vms/{vm_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio"
|
||||||
|
|
||||||
|
|
||||||
|
def test_docker_update(server, vm, tmpdir, free_console_port):
|
||||||
|
with asyncio_patch("gns3server.modules.docker.docker_vm.DockerVM.update") as mock:
|
||||||
|
response = server.put("/projects/{project_id}/docker/vms/{vm_id}".format(project_id=vm["project_id"], vm_id=vm["vm_id"]), {"name": "test",
|
||||||
|
"console": free_console_port,
|
||||||
|
"start_command": "yes",
|
||||||
|
"environment": "GNS3=1\nGNS4=0"},
|
||||||
|
example=True)
|
||||||
|
assert mock.called
|
||||||
|
assert response.status == 200
|
||||||
|
assert response.json["name"] == "test"
|
||||||
|
assert response.json["console"] == free_console_port
|
||||||
|
assert response.json["start_command"] == "yes"
|
||||||
|
assert response.json["environment"] == "GNS3=1\nGNS4=0"
|
@ -0,0 +1,130 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
#
|
||||||
|
# Copyright (C) 2015 GNS3 Technologies Inc.
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import asyncio
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
from tests.utils import asyncio_patch
|
||||||
|
from gns3server.modules.docker import Docker
|
||||||
|
from gns3server.modules.docker.docker_error import DockerError
|
||||||
|
|
||||||
|
|
||||||
|
def test_query_success(loop):
|
||||||
|
|
||||||
|
vm = Docker()
|
||||||
|
response = MagicMock()
|
||||||
|
response.status = 200
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def read():
|
||||||
|
return b'{"c": false}'
|
||||||
|
|
||||||
|
response.read.side_effect = read
|
||||||
|
with asyncio_patch("aiohttp.request", return_value=response) as mock:
|
||||||
|
data = loop.run_until_complete(asyncio.async(vm.query("POST", "test", data={"a": True}, params={"b": 1})))
|
||||||
|
mock.assert_called_with('POST',
|
||||||
|
'http://docker/test',
|
||||||
|
connector=vm._connector,
|
||||||
|
data='{"a": true}',
|
||||||
|
headers={'content-type': 'application/json'},
|
||||||
|
params={'b': 1})
|
||||||
|
|
||||||
|
assert data == {"c": False}
|
||||||
|
|
||||||
|
|
||||||
|
def test_query_error(loop):
|
||||||
|
|
||||||
|
vm = Docker()
|
||||||
|
response = MagicMock()
|
||||||
|
response.status = 404
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def read():
|
||||||
|
return b"NOT FOUND"
|
||||||
|
|
||||||
|
response.read.side_effect = read
|
||||||
|
with asyncio_patch("aiohttp.request", return_value=response) as mock:
|
||||||
|
with pytest.raises(DockerError):
|
||||||
|
data = loop.run_until_complete(asyncio.async(vm.query("POST", "test", data={"a": True}, params={"b": 1})))
|
||||||
|
mock.assert_called_with('POST',
|
||||||
|
'http://docker/test',
|
||||||
|
connector=vm._connector,
|
||||||
|
data='{"a": true}',
|
||||||
|
headers={'content-type': 'application/json'},
|
||||||
|
params={'b': 1})
|
||||||
|
|
||||||
|
|
||||||
|
def test_query_error_json(loop):
|
||||||
|
|
||||||
|
vm = Docker()
|
||||||
|
response = MagicMock()
|
||||||
|
response.status = 404
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def read():
|
||||||
|
return b'{"message": "Error"}'
|
||||||
|
|
||||||
|
response.read.side_effect = read
|
||||||
|
with asyncio_patch("aiohttp.request", return_value=response) as mock:
|
||||||
|
with pytest.raises(DockerError):
|
||||||
|
data = loop.run_until_complete(asyncio.async(vm.query("POST", "test", data={"a": True}, params={"b": 1})))
|
||||||
|
mock.assert_called_with('POST',
|
||||||
|
'http://docker/test',
|
||||||
|
connector=vm._connector,
|
||||||
|
data='{"a": true}',
|
||||||
|
headers={'content-type': 'application/json'},
|
||||||
|
params={'b': 1})
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_images(loop):
|
||||||
|
response = [
|
||||||
|
{
|
||||||
|
"RepoTags": [
|
||||||
|
"ubuntu:12.04",
|
||||||
|
"ubuntu:precise",
|
||||||
|
"ubuntu:latest"
|
||||||
|
],
|
||||||
|
"Id": "8dbd9e392a964056420e5d58ca5cc376ef18e2de93b5cc90e868a1bbc8318c1c",
|
||||||
|
"Created": 1365714795,
|
||||||
|
"Size": 131506275,
|
||||||
|
"VirtualSize": 131506275
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"RepoTags": [
|
||||||
|
"ubuntu:12.10",
|
||||||
|
"ubuntu:quantal",
|
||||||
|
"<none>:<none>"
|
||||||
|
],
|
||||||
|
"ParentId": "27cf784147099545",
|
||||||
|
"Id": "b750fe79269d2ec9a3c593ef05b4332b1d1a02a62b4accb2c21d589ff2f5f2dc",
|
||||||
|
"Created": 1364102658,
|
||||||
|
"Size": 24653,
|
||||||
|
"VirtualSize": 180116135
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
with asyncio_patch("gns3server.modules.docker.Docker.query", return_value=response) as mock:
|
||||||
|
images = loop.run_until_complete(asyncio.async(Docker.instance().list_images()))
|
||||||
|
mock.assert_called_with("GET", "images/json", params={"all": 0})
|
||||||
|
assert images == [
|
||||||
|
{"image": "ubuntu:12.04"},
|
||||||
|
{"image": "ubuntu:precise"},
|
||||||
|
{"image": "ubuntu:latest"},
|
||||||
|
{"image": "ubuntu:12.10"},
|
||||||
|
{"image": "ubuntu:quantal"}
|
||||||
|
]
|
Loading…
Reference in new issue