mirror of
https://github.com/Tecnativa/docker-socket-proxy
synced 2025-01-03 04:10:55 +00:00
127 lines
3.9 KiB
Python
127 lines
3.9 KiB
Python
|
|
import pytest
|
|
import logging
|
|
|
|
from plumbum import ProcessExecutionError, local
|
|
from plumbum.cmd import docker
|
|
from plumbum.machines.local import LocalCommand
|
|
|
|
logger = logging.getLogger()
|
|
|
|
CONTAINER_NAME = "docksockprox_test"
|
|
SOCKET_PROXY = "127.0.0.1:2375"
|
|
|
|
|
|
def _start_proxy(
|
|
container_name=CONTAINER_NAME,
|
|
socket_proxy=SOCKET_PROXY,
|
|
extra_args=None
|
|
):
|
|
logger.info(f"Starting {container_name} with args: {extra_args}...")
|
|
docker(
|
|
"run",
|
|
"-d",
|
|
"--name", container_name,
|
|
"--privileged",
|
|
"-v", "/var/run/docker.sock:/var/run/docker.sock",
|
|
"-p", f"{socket_proxy}:2375",
|
|
extra_args,
|
|
"tecnativa/docker-socket-proxy",
|
|
)
|
|
|
|
|
|
def _stop_and_delete_proxy(
|
|
container_name=CONTAINER_NAME,
|
|
socket_proxy=SOCKET_PROXY,
|
|
):
|
|
logger.info(f"Removing {container_name}...")
|
|
docker(
|
|
"rm",
|
|
"-f",
|
|
container_name,
|
|
)
|
|
|
|
|
|
def _query_docker_with_proxy(socket_proxy=SOCKET_PROXY, extra_args=None):
|
|
try:
|
|
_ret_code, stdout, stderr = docker.run(
|
|
(
|
|
"--host",
|
|
socket_proxy,
|
|
extra_args,
|
|
)
|
|
)
|
|
except ProcessExecutionError as result:
|
|
stdout = result.stdout
|
|
stderr = result.stderr
|
|
return stdout + stderr
|
|
|
|
|
|
def _check_permission(assertion, extra_args=None):
|
|
if "forbidden" in _query_docker_with_proxy(extra_args=extra_args):
|
|
result = "forbidden"
|
|
else:
|
|
result = "allowed"
|
|
assert result == assertion
|
|
|
|
|
|
def test_default_permissions():
|
|
try:
|
|
_start_proxy()
|
|
_check_permission("allowed", extra_args="version")
|
|
_check_permission("forbidden", ["run", "--rm", "alpine"])
|
|
_check_permission("forbidden", ["pull", "alpine"])
|
|
_check_permission("forbidden", ["logs", CONTAINER_NAME])
|
|
_check_permission("forbidden", ["wait", CONTAINER_NAME])
|
|
_check_permission("forbidden", ["rm", "-f", CONTAINER_NAME])
|
|
_check_permission("forbidden", ["restart", CONTAINER_NAME])
|
|
_check_permission("forbidden", ["network", "ls"])
|
|
_check_permission("forbidden", ["config", "ls"])
|
|
_check_permission("forbidden", ["service", "ls"])
|
|
_check_permission("forbidden", ["stack", "ls"])
|
|
_check_permission("forbidden", ["secret", "ls"])
|
|
_check_permission("forbidden", ["plugin", "ls"])
|
|
_check_permission("forbidden", ["info"])
|
|
_check_permission("forbidden", ["system", "info"])
|
|
_check_permission("forbidden", ["build", "."])
|
|
_check_permission("forbidden", ["swarm", "init"])
|
|
finally:
|
|
pass
|
|
_stop_and_delete_proxy()
|
|
|
|
|
|
def test_container_permissions():
|
|
try:
|
|
_start_proxy(extra_args=["-e", "CONTAINERS=1"])
|
|
_check_permission("allowed", ["logs", CONTAINER_NAME])
|
|
_check_permission("allowed", ["inspect", CONTAINER_NAME])
|
|
_check_permission("forbidden", ["wait", CONTAINER_NAME])
|
|
_check_permission("forbidden", ["run", "--rm", "alpine"])
|
|
_check_permission("forbidden", ["rm", "-f", CONTAINER_NAME])
|
|
_check_permission("forbidden", ["restart", CONTAINER_NAME])
|
|
finally:
|
|
pass
|
|
_stop_and_delete_proxy()
|
|
|
|
|
|
def test_post_permissions():
|
|
try:
|
|
_start_proxy(extra_args=["-e", "POST=1"])
|
|
_check_permission("forbidden", ["rm", "-f", CONTAINER_NAME])
|
|
_check_permission("forbidden", ["pull", "alpine"])
|
|
_check_permission("forbidden", ["run", "--rm", "alpine"])
|
|
_check_permission("forbidden", ["network", "create", "foobar"])
|
|
finally:
|
|
pass
|
|
_stop_and_delete_proxy()
|
|
|
|
|
|
def test_network_post_permissions():
|
|
try:
|
|
_start_proxy(extra_args=["-e", "POST=1", "-e", "NETWORKS=1"])
|
|
_check_permission("allowed", ["network", "ls"])
|
|
_check_permission("allowed", ["network", "create", "foo"])
|
|
_check_permission("allowed", ["network", "rm", "foo"])
|
|
finally:
|
|
pass
|
|
_stop_and_delete_proxy() |