diff --git a/pytest.ini b/pytest.ini index 45a6328..0bf3b48 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,2 +1,2 @@ [pytest] -addopts = -n auto -ra +addopts = -n auto -ra diff --git a/tests/test_service.py b/tests/test_service.py index 56816c6..3b2aca5 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -30,7 +30,6 @@ def _start_proxy( def _stop_and_delete_proxy( container_name=CONTAINER_NAME, - socket_proxy=SOCKET_PROXY, ): logger.info(f"Removing {container_name}...") docker( @@ -55,8 +54,10 @@ def _query_docker_with_proxy(socket_proxy=SOCKET_PROXY, extra_args=None): return stdout + stderr -def _check_permission(assertion, extra_args=None): - if "forbidden" in _query_docker_with_proxy(extra_args=extra_args): +def _check_permission(assertion, socket_proxy=SOCKET_PROXY, extra_args=None): + if "forbidden" in _query_docker_with_proxy( + socket_proxy=socket_proxy, extra_args=extra_args + ): result = "forbidden" else: result = "allowed" @@ -64,58 +65,148 @@ def _check_permission(assertion, extra_args=None): def test_default_permissions(): + container_name = f"{CONTAINER_NAME}_1" + socket_proxy = "127.0.0.1:2375" try: - _start_proxy() - _check_permission("allowed", extra_args="version") - _check_permission("forbidden", ["run", "--rm", "alpine"]) - _check_permission("forbidden", ["pull", "alpine"]) - _check_permission("forbidden", ["logs", CONTAINER_NAME]) - _check_permission("forbidden", ["wait", CONTAINER_NAME]) - _check_permission("forbidden", ["rm", "-f", CONTAINER_NAME]) - _check_permission("forbidden", ["restart", CONTAINER_NAME]) - _check_permission("forbidden", ["network", "ls"]) - _check_permission("forbidden", ["config", "ls"]) - _check_permission("forbidden", ["service", "ls"]) - _check_permission("forbidden", ["stack", "ls"]) - _check_permission("forbidden", ["secret", "ls"]) - _check_permission("forbidden", ["plugin", "ls"]) - _check_permission("forbidden", ["info"]) - _check_permission("forbidden", ["system", "info"]) - _check_permission("forbidden", ["build", "."]) - _check_permission("forbidden", ["swarm", "init"]) + _start_proxy(container_name=container_name, socket_proxy=socket_proxy) + _check_permission("allowed", socket_proxy=socket_proxy, extra_args="version") + _check_permission( + "forbidden", socket_proxy=socket_proxy, extra_args=["run", "--rm", "alpine"] + ) + _check_permission( + "forbidden", socket_proxy=socket_proxy, extra_args=["pull", "alpine"] + ) + _check_permission( + "forbidden", socket_proxy=socket_proxy, extra_args=["logs", container_name] + ) + _check_permission( + "forbidden", socket_proxy=socket_proxy, extra_args=["wait", container_name] + ) + _check_permission( + "forbidden", + socket_proxy=socket_proxy, + extra_args=["rm", "-f", container_name], + ) + _check_permission( + "forbidden", + socket_proxy=socket_proxy, + extra_args=["restart", container_name], + ) + _check_permission( + "forbidden", socket_proxy=socket_proxy, extra_args=["network", "ls"] + ) + _check_permission( + "forbidden", socket_proxy=socket_proxy, extra_args=["config", "ls"] + ) + _check_permission( + "forbidden", socket_proxy=socket_proxy, extra_args=["service", "ls"] + ) + _check_permission( + "forbidden", socket_proxy=socket_proxy, extra_args=["stack", "ls"] + ) + _check_permission( + "forbidden", socket_proxy=socket_proxy, extra_args=["secret", "ls"] + ) + _check_permission( + "forbidden", socket_proxy=socket_proxy, extra_args=["plugin", "ls"] + ) + _check_permission("forbidden", socket_proxy=socket_proxy, extra_args=["info"]) + _check_permission( + "forbidden", socket_proxy=socket_proxy, extra_args=["system", "info"] + ) + _check_permission( + "forbidden", socket_proxy=socket_proxy, extra_args=["build", "."] + ) + _check_permission( + "forbidden", socket_proxy=socket_proxy, extra_args=["swarm", "init"] + ) finally: - _stop_and_delete_proxy() + _stop_and_delete_proxy(container_name=container_name) def test_container_permissions(): + container_name = f"{CONTAINER_NAME}_2" + socket_proxy = "127.0.0.1:2376" try: - _start_proxy(extra_args=["-e", "CONTAINERS=1"]) - _check_permission("allowed", ["logs", CONTAINER_NAME]) - _check_permission("allowed", ["inspect", CONTAINER_NAME]) - _check_permission("forbidden", ["wait", CONTAINER_NAME]) - _check_permission("forbidden", ["run", "--rm", "alpine"]) - _check_permission("forbidden", ["rm", "-f", CONTAINER_NAME]) - _check_permission("forbidden", ["restart", CONTAINER_NAME]) + _start_proxy( + container_name=container_name, + socket_proxy=socket_proxy, + extra_args=["-e", "CONTAINERS=1"], + ) + _check_permission( + "allowed", socket_proxy=socket_proxy, extra_args=["logs", container_name] + ) + _check_permission( + "allowed", socket_proxy=socket_proxy, extra_args=["inspect", container_name] + ) + _check_permission( + "forbidden", socket_proxy=socket_proxy, extra_args=["wait", container_name] + ) + _check_permission( + "forbidden", socket_proxy=socket_proxy, extra_args=["run", "--rm", "alpine"] + ) + _check_permission( + "forbidden", + socket_proxy=socket_proxy, + extra_args=["rm", "-f", container_name], + ) + _check_permission( + "forbidden", + socket_proxy=socket_proxy, + extra_args=["restart", container_name], + ) finally: - _stop_and_delete_proxy() + _stop_and_delete_proxy(container_name=container_name) def test_post_permissions(): + container_name = f"{CONTAINER_NAME}_3" + socket_proxy = "127.0.0.1:2377" try: - _start_proxy(extra_args=["-e", "POST=1"]) - _check_permission("forbidden", ["rm", "-f", CONTAINER_NAME]) - _check_permission("forbidden", ["pull", "alpine"]) - _check_permission("forbidden", ["run", "--rm", "alpine"]) - _check_permission("forbidden", ["network", "create", "foobar"]) + _start_proxy( + container_name=container_name, + socket_proxy=socket_proxy, + extra_args=["-e", "POST=1"], + ) + _check_permission( + "forbidden", + socket_proxy=socket_proxy, + extra_args=["rm", "-f", container_name], + ) + _check_permission( + "forbidden", socket_proxy=socket_proxy, extra_args=["pull", "alpine"] + ) + _check_permission( + "forbidden", socket_proxy=socket_proxy, extra_args=["run", "--rm", "alpine"] + ) + _check_permission( + "forbidden", + socket_proxy=socket_proxy, + extra_args=["network", "create", "foobar"], + ) finally: - _stop_and_delete_proxy() + _stop_and_delete_proxy(container_name=container_name) def test_network_post_permissions(): + container_name = f"{CONTAINER_NAME}_4" + socket_proxy = "127.0.0.1:2378" try: - _start_proxy(extra_args=["-e", "POST=1", "-e", "NETWORKS=1"]) - _check_permission("allowed", ["network", "ls"]) - _check_permission("allowed", ["network", "create", "foo"]) - _check_permission("allowed", ["network", "rm", "foo"]) + _start_proxy( + container_name=container_name, + socket_proxy=socket_proxy, + extra_args=["-e", "POST=1", "-e", "NETWORKS=1"], + ) + _check_permission( + "allowed", socket_proxy=socket_proxy, extra_args=["network", "ls"] + ) + _check_permission( + "allowed", + socket_proxy=socket_proxy, + extra_args=["network", "create", "foo"], + ) + _check_permission( + "allowed", socket_proxy=socket_proxy, extra_args=["network", "rm", "foo"] + ) finally: - _stop_and_delete_proxy() + _stop_and_delete_proxy(container_name=container_name)