Check if user has the right to add a permission

pull/1970/head
grossmj 3 years ago
parent 4c6135fe88
commit 07d4892faf

@ -36,6 +36,7 @@ from gns3server.controller.controller_error import (
from gns3server.db.repositories.rbac import RbacRepository from gns3server.db.repositories.rbac import RbacRepository
from .dependencies.database import get_repository from .dependencies.database import get_repository
from .dependencies.authentication import get_current_active_user
import logging import logging
@ -59,6 +60,7 @@ async def get_permissions(
async def create_permission( async def create_permission(
request: Request, request: Request,
permission_create: schemas.PermissionCreate, permission_create: schemas.PermissionCreate,
current_user: schemas.User = Depends(get_current_active_user),
rbac_repo: RbacRepository = Depends(get_repository(RbacRepository)) rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
) -> schemas.Permission: ) -> schemas.Permission:
""" """
@ -82,11 +84,16 @@ async def create_permission(
# the permission can match multiple routes # the permission can match multiple routes
if permission_create.path.endswith("/*"): if permission_create.path.endswith("/*"):
route_path += r"/\*" route_path += r"/.*"
if re.fullmatch(route_path, permission_create.path): if re.fullmatch(route_path, permission_create.path):
for method in permission_create.methods: for method in permission_create.methods:
if method in list(route.methods): if method in list(route.methods):
# check user has the right to add the permission (i.e has already to right on the path)
if not await rbac_repo.check_user_is_authorized(current_user.user_id, method, permission_create.path):
raise ControllerForbiddenError(f"User '{current_user.username}' doesn't have the rights to "
f"add a permission on {method} {permission_create.path} or "
f"the endpoint doesn't exist")
return await rbac_repo.create_permission(permission_create) return await rbac_repo.create_permission(permission_create)
raise ControllerBadRequestError(f"Permission '{permission_create.methods} {permission_create.path}' " raise ControllerBadRequestError(f"Permission '{permission_create.methods} {permission_create.path}' "

@ -16,52 +16,74 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest import pytest
import uuid
from fastapi import FastAPI, status from fastapi import FastAPI, status
from httpx import AsyncClient from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from gns3server.db.repositories.rbac import RbacRepository from gns3server.db.repositories.rbac import RbacRepository
from gns3server.controller import Controller
from gns3server.controller.project import Project
pytestmark = pytest.mark.asyncio pytestmark = pytest.mark.asyncio
class TestPermissionRoutes: class TestPermissionRoutes:
async def test_create_permission(self, app: FastAPI, client: AsyncClient) -> None: @pytest.fixture()
async def project(self, app: FastAPI, client: AsyncClient, controller: Controller) -> Project:
project_uuid = str(uuid.uuid4())
params = {"name": "test", "project_id": project_uuid}
await client.post(app.url_path_for("create_project"), json=params)
return controller.get_project(project_uuid)
async def test_create_permission(self, app: FastAPI, client: AsyncClient, project: Project) -> None:
new_permission = { new_permission = {
"methods": ["GET"], "methods": ["GET"],
"path": "/templates/f6113095-a703-4967-b039-ab95ac3eb4f5", "path": f"/projects/{project.id}",
"action": "ALLOW" "action": "ALLOW"
} }
response = await client.post(app.url_path_for("create_permission"), json=new_permission) response = await client.post(app.url_path_for("create_permission"), json=new_permission)
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED
async def test_create_wildcard_permission(self, app: FastAPI, client: AsyncClient) -> None: async def test_create_wildcard_permission(self, app: FastAPI, client: AsyncClient, project: Project) -> None:
new_permission = { new_permission = {
"methods": ["GET"], "methods": ["POST"],
"path": "/templates/*", "path": f"/projects/{project.id}/*",
"action": "ALLOW" "action": "ALLOW"
} }
response = await client.post(app.url_path_for("create_permission"), json=new_permission) response = await client.post(app.url_path_for("create_permission"), json=new_permission)
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED
async def test_create_invalid_permission(self, app: FastAPI, client: AsyncClient) -> None: async def test_create_permission_not_existing_endpoint(self, app: FastAPI, client: AsyncClient) -> None:
new_permission = { new_permission = {
"methods": ["GET"], "methods": ["GET"],
"path": "/templates/invalid", "path": "/projects/invalid",
"action": "ALLOW" "action": "ALLOW"
} }
response = await client.post(app.url_path_for("create_permission"), json=new_permission) response = await client.post(app.url_path_for("create_permission"), json=new_permission)
assert response.status_code == status.HTTP_400_BAD_REQUEST assert response.status_code == status.HTTP_400_BAD_REQUEST
async def test_get_permission(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession) -> None: async def test_create_permission_not_existing_object(self, app: FastAPI, client: AsyncClient) -> None:
new_permission = {
"methods": ["GET"],
"path": f"/projects/{str(uuid.uuid4())}/*",
"action": "ALLOW"
}
response = await client.post(app.url_path_for("create_permission"), json=new_permission)
assert response.status_code == status.HTTP_403_FORBIDDEN
async def test_get_permission(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession, project: Project) -> None:
rbac_repo = RbacRepository(db_session) rbac_repo = RbacRepository(db_session)
permission_in_db = await rbac_repo.get_permission_by_path("/templates") permission_in_db = await rbac_repo.get_permission_by_path(f"/projects/{project.id}/*")
response = await client.get(app.url_path_for("get_permission", permission_id=permission_in_db.permission_id)) response = await client.get(app.url_path_for("get_permission", permission_id=permission_in_db.permission_id))
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
assert response.json()["permission_id"] == str(permission_in_db.permission_id) assert response.json()["permission_id"] == str(permission_in_db.permission_id)
@ -70,16 +92,16 @@ class TestPermissionRoutes:
response = await client.get(app.url_path_for("get_permissions")) response = await client.get(app.url_path_for("get_permissions"))
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
assert len(response.json()) == 7 # 5 default permissions + 2 custom permissions assert len(response.json()) == 10 # 5 default permissions + 5 custom permissions
async def test_update_permission(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession) -> None: async def test_update_permission(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession, project: Project) -> None:
rbac_repo = RbacRepository(db_session) rbac_repo = RbacRepository(db_session)
permission_in_db = await rbac_repo.get_permission_by_path("/templates/*") permission_in_db = await rbac_repo.get_permission_by_path(f"/projects/{project.id}/*")
update_permission = { update_permission = {
"methods": ["GET"], "methods": ["GET"],
"path": "/appliances", "path": f"/projects/{project.id}/*",
"action": "ALLOW" "action": "ALLOW"
} }
response = await client.put( response = await client.put(
@ -88,17 +110,18 @@ class TestPermissionRoutes:
) )
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
updated_permission_in_db = await rbac_repo.get_permission(permission_in_db.permission_id) updated_permission_in_db = await rbac_repo.get_permission(permission_in_db.permission_id)
assert updated_permission_in_db.path == "/appliances" assert updated_permission_in_db.path == f"/projects/{project.id}/*"
async def test_delete_permission( async def test_delete_permission(
self, self,
app: FastAPI, app: FastAPI,
client: AsyncClient, client: AsyncClient,
db_session: AsyncSession db_session: AsyncSession,
project: Project,
) -> None: ) -> None:
rbac_repo = RbacRepository(db_session) rbac_repo = RbacRepository(db_session)
permission_in_db = await rbac_repo.get_permission_by_path("/appliances") permission_in_db = await rbac_repo.get_permission_by_path(f"/projects/{project.id}/*")
response = await client.delete(app.url_path_for("delete_permission", permission_id=permission_in_db.permission_id)) response = await client.delete(app.url_path_for("delete_permission", permission_id=permission_in_db.permission_id))
assert response.status_code == status.HTTP_204_NO_CONTENT assert response.status_code == status.HTTP_204_NO_CONTENT
@ -109,4 +132,4 @@ class TestPermissionRoutes:
rbac_repo = RbacRepository(db_session) rbac_repo = RbacRepository(db_session)
permissions_in_db = await rbac_repo.get_permissions() permissions_in_db = await rbac_repo.get_permissions()
assert len(permissions_in_db) == 5 # 5 default permissions assert len(permissions_in_db) == 9 # 5 default permissions + 4 custom permissions

@ -358,6 +358,7 @@ class TestUserMe:
) )
assert response.status_code == status_code assert response.status_code == status_code
class TestSuperAdmin: class TestSuperAdmin:
async def test_super_admin_exists( async def test_super_admin_exists(

Loading…
Cancel
Save