diff --git a/gns3server/api/routes/controller/permissions.py b/gns3server/api/routes/controller/permissions.py index 8667903c..504f2c59 100644 --- a/gns3server/api/routes/controller/permissions.py +++ b/gns3server/api/routes/controller/permissions.py @@ -36,6 +36,7 @@ from gns3server.controller.controller_error import ( from gns3server.db.repositories.rbac import RbacRepository from .dependencies.database import get_repository +from .dependencies.authentication import get_current_active_user import logging @@ -59,6 +60,7 @@ async def get_permissions( async def create_permission( request: Request, permission_create: schemas.PermissionCreate, + current_user: schemas.User = Depends(get_current_active_user), rbac_repo: RbacRepository = Depends(get_repository(RbacRepository)) ) -> schemas.Permission: """ @@ -82,11 +84,16 @@ async def create_permission( # the permission can match multiple routes if permission_create.path.endswith("/*"): - route_path += r"/\*" + route_path += r"/.*" if re.fullmatch(route_path, permission_create.path): for method in permission_create.methods: if method in list(route.methods): + # check user has the right to add the permission (i.e has already to right on the path) + if not await rbac_repo.check_user_is_authorized(current_user.user_id, method, permission_create.path): + raise ControllerForbiddenError(f"User '{current_user.username}' doesn't have the rights to " + f"add a permission on {method} {permission_create.path} or " + f"the endpoint doesn't exist") return await rbac_repo.create_permission(permission_create) raise ControllerBadRequestError(f"Permission '{permission_create.methods} {permission_create.path}' " diff --git a/tests/api/routes/controller/test_permissions.py b/tests/api/routes/controller/test_permissions.py index 60744b9f..e5494091 100644 --- a/tests/api/routes/controller/test_permissions.py +++ b/tests/api/routes/controller/test_permissions.py @@ -16,52 +16,74 @@ # along with this program. If not, see . import pytest +import uuid from fastapi import FastAPI, status from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from gns3server.db.repositories.rbac import RbacRepository +from gns3server.controller import Controller +from gns3server.controller.project import Project pytestmark = pytest.mark.asyncio class TestPermissionRoutes: - async def test_create_permission(self, app: FastAPI, client: AsyncClient) -> None: + @pytest.fixture() + async def project(self, app: FastAPI, client: AsyncClient, controller: Controller) -> Project: + + project_uuid = str(uuid.uuid4()) + params = {"name": "test", "project_id": project_uuid} + await client.post(app.url_path_for("create_project"), json=params) + return controller.get_project(project_uuid) + + async def test_create_permission(self, app: FastAPI, client: AsyncClient, project: Project) -> None: new_permission = { "methods": ["GET"], - "path": "/templates/f6113095-a703-4967-b039-ab95ac3eb4f5", + "path": f"/projects/{project.id}", "action": "ALLOW" } response = await client.post(app.url_path_for("create_permission"), json=new_permission) assert response.status_code == status.HTTP_201_CREATED - async def test_create_wildcard_permission(self, app: FastAPI, client: AsyncClient) -> None: + async def test_create_wildcard_permission(self, app: FastAPI, client: AsyncClient, project: Project) -> None: new_permission = { - "methods": ["GET"], - "path": "/templates/*", + "methods": ["POST"], + "path": f"/projects/{project.id}/*", "action": "ALLOW" } + response = await client.post(app.url_path_for("create_permission"), json=new_permission) assert response.status_code == status.HTTP_201_CREATED - async def test_create_invalid_permission(self, app: FastAPI, client: AsyncClient) -> None: + async def test_create_permission_not_existing_endpoint(self, app: FastAPI, client: AsyncClient) -> None: new_permission = { "methods": ["GET"], - "path": "/templates/invalid", + "path": "/projects/invalid", "action": "ALLOW" } response = await client.post(app.url_path_for("create_permission"), json=new_permission) assert response.status_code == status.HTTP_400_BAD_REQUEST - async def test_get_permission(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession) -> None: + async def test_create_permission_not_existing_object(self, app: FastAPI, client: AsyncClient) -> None: + + new_permission = { + "methods": ["GET"], + "path": f"/projects/{str(uuid.uuid4())}/*", + "action": "ALLOW" + } + response = await client.post(app.url_path_for("create_permission"), json=new_permission) + assert response.status_code == status.HTTP_403_FORBIDDEN + + async def test_get_permission(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession, project: Project) -> None: rbac_repo = RbacRepository(db_session) - permission_in_db = await rbac_repo.get_permission_by_path("/templates") + permission_in_db = await rbac_repo.get_permission_by_path(f"/projects/{project.id}/*") response = await client.get(app.url_path_for("get_permission", permission_id=permission_in_db.permission_id)) assert response.status_code == status.HTTP_200_OK assert response.json()["permission_id"] == str(permission_in_db.permission_id) @@ -70,16 +92,16 @@ class TestPermissionRoutes: response = await client.get(app.url_path_for("get_permissions")) assert response.status_code == status.HTTP_200_OK - assert len(response.json()) == 7 # 5 default permissions + 2 custom permissions + assert len(response.json()) == 10 # 5 default permissions + 5 custom permissions - async def test_update_permission(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession) -> None: + async def test_update_permission(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession, project: Project) -> None: rbac_repo = RbacRepository(db_session) - permission_in_db = await rbac_repo.get_permission_by_path("/templates/*") + permission_in_db = await rbac_repo.get_permission_by_path(f"/projects/{project.id}/*") update_permission = { "methods": ["GET"], - "path": "/appliances", + "path": f"/projects/{project.id}/*", "action": "ALLOW" } response = await client.put( @@ -88,17 +110,18 @@ class TestPermissionRoutes: ) assert response.status_code == status.HTTP_200_OK updated_permission_in_db = await rbac_repo.get_permission(permission_in_db.permission_id) - assert updated_permission_in_db.path == "/appliances" + assert updated_permission_in_db.path == f"/projects/{project.id}/*" async def test_delete_permission( self, app: FastAPI, client: AsyncClient, - db_session: AsyncSession + db_session: AsyncSession, + project: Project, ) -> None: rbac_repo = RbacRepository(db_session) - permission_in_db = await rbac_repo.get_permission_by_path("/appliances") + permission_in_db = await rbac_repo.get_permission_by_path(f"/projects/{project.id}/*") response = await client.delete(app.url_path_for("delete_permission", permission_id=permission_in_db.permission_id)) assert response.status_code == status.HTTP_204_NO_CONTENT @@ -109,4 +132,4 @@ class TestPermissionRoutes: rbac_repo = RbacRepository(db_session) permissions_in_db = await rbac_repo.get_permissions() - assert len(permissions_in_db) == 5 # 5 default permissions + assert len(permissions_in_db) == 9 # 5 default permissions + 4 custom permissions diff --git a/tests/api/routes/controller/test_users.py b/tests/api/routes/controller/test_users.py index ba97ce88..56bf0b17 100644 --- a/tests/api/routes/controller/test_users.py +++ b/tests/api/routes/controller/test_users.py @@ -358,6 +358,7 @@ class TestUserMe: ) assert response.status_code == status_code + class TestSuperAdmin: async def test_super_admin_exists(