From 132bc870812a931c36437bc62c75e3565c461ebe Mon Sep 17 00:00:00 2001 From: Julien Duponchelle Date: Mon, 27 Mar 2017 20:46:25 +0200 Subject: [PATCH] Dissallow parallel pull of docker images Fix #962 --- gns3server/compute/docker/__init__.py | 72 ++++++++++++++++++++------ gns3server/compute/docker/docker_vm.py | 24 ++------- scripts/run_on_gns3vm.sh | 4 +- tests/compute/docker/test_docker.py | 30 ++++++++++- tests/compute/docker/test_docker_vm.py | 27 ---------- 5 files changed, 91 insertions(+), 66 deletions(-) diff --git a/gns3server/compute/docker/__init__.py b/gns3server/compute/docker/__init__.py index 956f679a..713c7d08 100644 --- a/gns3server/compute/docker/__init__.py +++ b/gns3server/compute/docker/__init__.py @@ -24,14 +24,14 @@ import asyncio import logging import aiohttp import json -import sys from gns3server.utils import parse_version +from gns3server.utils.asyncio import locked_coroutine +from gns3server.compute.base_manager import BaseManager +from gns3server.compute.docker.docker_vm import DockerVM +from gns3server.compute.docker.docker_error import DockerError, DockerHttp304Error, DockerHttp404Error log = logging.getLogger(__name__) -from ..base_manager import BaseManager -from .docker_vm import DockerVM -from .docker_error import * DOCKER_MINIMUM_API_VERSION = "1.21" @@ -46,24 +46,21 @@ class Docker(BaseManager): self._connected = False # Allow locking during ubridge operations self.ubridge_lock = asyncio.Lock() - self._version_checked = False - self._session = None self._connector = None + self._session = None @asyncio.coroutine - def session(self): - if not self._connected or self._session.closed: + def _check_connection(self): + if not self._connected: try: self._connected = True connector = self.connector() - self._session = aiohttp.ClientSession(connector=connector) version = yield from self.query("GET", "version") except (aiohttp.errors.ClientOSError, FileNotFoundError): self._connected = False raise DockerError("Can't connect to docker daemon") if parse_version(version["ApiVersion"]) < parse_version(DOCKER_MINIMUM_API_VERSION): raise DockerError("Docker API version is {}. GNS3 requires a minimum API version of {}".format(version["ApiVersion"], DOCKER_MINIMUM_API_VERSION)) - return self._session def connector(self): if self._connector is None or self._connector.closed: @@ -79,8 +76,6 @@ class Docker(BaseManager): def unload(self): yield from super().unload() if self._connected: - if self._session and not self._session.closed: - yield from self._session.close() if self._connector and not self._connector.closed: yield from self._connector.close() @@ -97,7 +92,7 @@ class Docker(BaseManager): response = yield from self.http_query(method, path, data=data, params=params) body = yield from response.read() - if len(body): + if body and len(body): if response.headers['CONTENT-TYPE'] == 'application/json': body = json.loads(body.decode("utf-8")) else: @@ -119,9 +114,17 @@ class Docker(BaseManager): """ data = json.dumps(data) url = "http://docker/" + path + + if timeout is None: + timeout = 60 * 60 * 24 * 31 # One month timeout + try: - session = yield from self.session() - response = yield from session.request( + if path != "version": # version is use by check connection + yield from self._check_connection() + if self._session is None or self._session.closed: + connector = self.connector() + self._session = aiohttp.ClientSession(connector=connector) + response = yield from self._session.request( method, url, params=params, @@ -163,6 +166,45 @@ class Docker(BaseManager): autoping=True) return connection + @locked_coroutine + def pull_image(self, image, progress_callback=None): + """ + Pull image from docker repository + + :params image: Image name + :params progress_callback: A function that receive a log message about image download progress + """ + + try: + yield from self.query("GET", "images/{}/json".format(image)) + return # We already have the image skip the download + except DockerHttp404Error: + pass + + if progress_callback: + progress_callback("Pull {} from docker hub".format(image)) + response = yield from self.http_query("POST", "images/create", params={"fromImage": image}, timeout=None) + # The pull api will stream status via an HTTP JSON stream + content = "" + while True: + chunk = yield from response.content.read(1024) + if not chunk: + break + content += chunk.decode("utf-8") + + try: + while True: + content = content.lstrip(" \r\n\t") + answer, index = json.JSONDecoder().raw_decode(content) + if "progress" in answer and progress_callback: + progress_callback("Pulling image {}:{}: {}".format(image, answer["id"], answer["progress"])) + content = content[index:] + except ValueError: # Partial JSON + pass + response.close() + if progress_callback: + progress_callback("Success pulling image {}".format(image)) + @asyncio.coroutine def list_images(self): """Gets Docker image list. diff --git a/gns3server/compute/docker/docker_vm.py b/gns3server/compute/docker/docker_vm.py index 1a995e9a..0ac110ac 100644 --- a/gns3server/compute/docker/docker_vm.py +++ b/gns3server/compute/docker/docker_vm.py @@ -24,7 +24,6 @@ import shutil import psutil import shlex import aiohttp -import json import os from gns3server.utils.asyncio.telnet_server import AsyncioTelnetServer @@ -767,26 +766,9 @@ class DockerVM(BaseNode): """ Pull image from docker repository """ - log.info("Pull %s from docker hub", image) - response = yield from self.manager.http_query("POST", "images/create", params={"fromImage": image}, timeout=None) - # The pull api will stream status via an HTTP JSON stream - content = "" - while True: - chunk = yield from response.content.read(1024) - if not chunk: - break - content += chunk.decode("utf-8") - - try: - while True: - content = content.lstrip(" \r\n\t") - answer, index = json.JSONDecoder().raw_decode(content) - if "progress" in answer: - self.project.emit("log.info", {"message": "Pulling image {}:{}: {}".format(self._image, answer["id"], answer["progress"])}) - content = content[index:] - except ValueError: # Partial JSON - pass - self.project.emit("log.info", {"message": "Success pulling image {}".format(self._image)}) + def callback(msg): + self.project.emit("log.info", {"message": msg}) + yield from self.manager.pull_image(image, progress_callback=callback) @asyncio.coroutine def _start_ubridge_capture(self, adapter_number, output_file): diff --git a/scripts/run_on_gns3vm.sh b/scripts/run_on_gns3vm.sh index c0d451ab..4bde8eb0 100755 --- a/scripts/run_on_gns3vm.sh +++ b/scripts/run_on_gns3vm.sh @@ -12,6 +12,6 @@ then fi ssh gns3@$SERVER_ADDRESS "sudo service gns3 stop" -rsync -avz --exclude==".git/*" --exclude=='docs/*' --exclude=='tests/*' . "gns3@$SERVER_ADDRESS:gns3server" +rsync -avz --exclude==".git/*" --exclude=='docs/*' --exclude="__pycache__" --exclude=='tests/*' . "gns3@$SERVER_ADDRESS:gns3server" -ssh gns3@$SERVER_ADDRESS "cd gns3server;python3 -m gns3server" +ssh gns3@$SERVER_ADDRESS "killall python3;cd gns3server;python3 -m gns3server" diff --git a/tests/compute/docker/test_docker.py b/tests/compute/docker/test_docker.py index 1492836b..25779ca6 100644 --- a/tests/compute/docker/test_docker.py +++ b/tests/compute/docker/test_docker.py @@ -21,7 +21,7 @@ from unittest.mock import MagicMock from tests.utils import asyncio_patch, AsyncioMagicMock from gns3server.compute.docker import Docker -from gns3server.compute.docker.docker_error import DockerError +from gns3server.compute.docker.docker_error import DockerError, DockerHttp404Error @pytest.fixture @@ -134,3 +134,31 @@ def test_list_images(loop): assert {"image": "ubuntu:latest"} in images assert {"image": "ubuntu:12.10"} in images assert {"image": "ubuntu:quantal"} in images + + +def test_pull_image(loop): + class Response: + """ + Simulate a response splitted in multiple packets + """ + + def __init__(self): + self._read = -1 + + @asyncio.coroutine + def read(self, size): + self._read += 1 + if self._read == 0: + return b'{"progress": "0/100",' + elif self._read == 1: + return '"id": 42}' + else: + None + + mock_query = MagicMock() + mock_query.content.return_value = Response() + + with asyncio_patch("gns3server.compute.docker.Docker.query", side_effect=DockerHttp404Error("404")): + with asyncio_patch("gns3server.compute.docker.Docker.http_query", return_value=mock_query) as mock: + images = loop.run_until_complete(asyncio.async(Docker.instance().pull_image("ubuntu"))) + mock.assert_called_with("POST", "images/create", params={"fromImage": "ubuntu"}, timeout=None) diff --git a/tests/compute/docker/test_docker_vm.py b/tests/compute/docker/test_docker_vm.py index 41db546b..70517b5b 100644 --- a/tests/compute/docker/test_docker_vm.py +++ b/tests/compute/docker/test_docker_vm.py @@ -795,33 +795,6 @@ def test_adapter_remove_nio_binding_invalid_adapter(vm, loop): loop.run_until_complete(asyncio.async(vm.adapter_remove_nio_binding(12))) -def test_pull_image(loop, vm): - class Response: - """ - Simulate a response splitted in multiple packets - """ - - def __init__(self): - self._read = -1 - - @asyncio.coroutine - def read(self, size): - self._read += 1 - if self._read == 0: - return b'{"progress": "0/100",' - elif self._read == 1: - return '"id": 42}' - else: - None - - mock_query = MagicMock() - mock_query.content.return_value = Response() - - with asyncio_patch("gns3server.compute.docker.Docker.http_query", return_value=mock_query) as mock: - images = loop.run_until_complete(asyncio.async(vm.pull_image("ubuntu"))) - mock.assert_called_with("POST", "images/create", params={"fromImage": "ubuntu"}, timeout=None) - - def test_start_capture(vm, tmpdir, manager, free_console_port, loop): output_file = str(tmpdir / "test.pcap")