#!/usr/bin/env python # # Copyright (C) 2021 GNS3 Technologies Inc. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import pytest import pytest_asyncio from fastapi import FastAPI, status from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from gns3server.db.repositories.rbac import RbacRepository from gns3server.db.repositories.users import UsersRepository from gns3server.schemas.controller.rbac import ACECreate from gns3server.db.models import User pytestmark = pytest.mark.asyncio # @pytest_asyncio.fixture # async def project_ace(db_session: AsyncSession): # # group_id = (await UsersRepository(db_session).get_user_group_by_name("Users")).user_group_id # role_id = (await RbacRepository(db_session).get_role_by_name("User")).role_id # ace = ACECreate( # path="/projects", # ace_type="group", # propagate=False, # group_id=str(group_id), # role_id=str(role_id) # ) # await RbacRepository(db_session).create_ace(ace) class TestPrivileges: @pytest.mark.parametrize( "privilege, path, result", ( ("User.Allocate", "/users", False), ("Project.Allocate", "/projects", False), ("Project.Allocate", "/projects", True), ("Project.Audit", "/projects/e451ad73-2519-4f83-87fe-a8e821792d44", True), ("Project.Audit", "/templates", False), ("Template.Audit", "/templates", True), ("Template.Allocate", "/templates", False), ("Compute.Audit", "/computes", True), ("Compute.Audit", "/computes/local", True), ("Symbol.Audit", "/symbols", True), ("Symbol.Audit", "/symbols/default_symbols", True), ), ) async def test_default_privileges_user_group( self, test_user: User, db_session: AsyncSession, privilege: str, path: str, result: bool ) -> None: # add an ACE for path if result: group_id = (await UsersRepository(db_session).get_user_group_by_name("Users")).user_group_id role_id = (await RbacRepository(db_session).get_role_by_name("User")).role_id ace = ACECreate( path=path, ace_type="group", propagate=False, group_id=str(group_id), role_id=str(role_id) ) await RbacRepository(db_session).create_ace(ace) authorized = await RbacRepository(db_session).check_user_has_privilege(test_user.user_id, path, privilege) assert authorized == result async def test_propagate(self, test_user: User, db_session: AsyncSession): privilege = "Project.Audit" path = "/projects/44929147-47bb-460a-90ae-c782c4dbb6ef" authorized = await RbacRepository(db_session).check_user_has_privilege(test_user.user_id, path, privilege) assert authorized is False ace = await RbacRepository(db_session).get_ace_by_path("/projects") ace.propagate = True await db_session.commit() authorized = await RbacRepository(db_session).check_user_has_privilege(test_user.user_id, path, privilege) assert authorized is True async def test_allowed(self, test_user: User, db_session: AsyncSession): ace = await RbacRepository(db_session).get_ace_by_path("/projects") ace.allowed = False ace.propagate = True await db_session.commit() privilege = "Project.Audit" path = "/projects/44929147-47bb-460a-90ae-c782c4dbb6ef" authorized = await RbacRepository(db_session).check_user_has_privilege(test_user.user_id, path, privilege) assert authorized is False # privileges on deeper levels replace those inherited from an upper level. group_id = (await UsersRepository(db_session).get_user_group_by_name("Users")).user_group_id role_id = (await RbacRepository(db_session).get_role_by_name("User")).role_id ace = ACECreate( path=path, ace_type="group", propagate=False, group_id=str(group_id), role_id=str(role_id) ) await RbacRepository(db_session).create_ace(ace) authorized = await RbacRepository(db_session).check_user_has_privilege(test_user.user_id, path, privilege) assert authorized is True # class TestProjectsWithRbac: # # async def test_admin_create_project(self, app: FastAPI, client: AsyncClient): # # params = {"name": "Admin project"} # response = await client.post(app.url_path_for("create_project"), json=params) # assert response.status_code == status.HTTP_201_CREATED # # async def test_user_only_access_own_projects( # self, # app: FastAPI, # authorized_client: AsyncClient, # project_ace, # test_user: User, # db_session: AsyncSession # ) -> None: # # params = {"name": "User project"} # response = await authorized_client.post(app.url_path_for("create_project"), json=params) # assert response.status_code == status.HTTP_201_CREATED # project_id = response.json()["project_id"] # # rbac_repo = RbacRepository(db_session) # permissions_in_db = await rbac_repo.get_user_permissions(test_user.user_id) # assert len(permissions_in_db) == 1 # assert permissions_in_db[0].path == f"/projects/{project_id}/*" # # response = await authorized_client.get(app.url_path_for("get_projects")) # assert response.status_code == status.HTTP_200_OK # projects = response.json() # assert len(projects) == 1 # async def test_admin_access_all_projects(self, app: FastAPI, client: AsyncClient): # # response = await client.get(app.url_path_for("get_projects")) # assert response.status_code == status.HTTP_200_OK # projects = response.json() # assert len(projects) == 2 # # async def test_admin_user_give_permission_on_project( # self, # app: FastAPI, # client: AsyncClient, # test_user: User # ): # # response = await client.get(app.url_path_for("get_projects")) # assert response.status_code == status.HTTP_200_OK # projects = response.json() # project_id = None # for project in projects: # if project["name"] == "Admin project": # project_id = project["project_id"] # break # # new_permission = { # "methods": ["GET"], # "path": f"/projects/{project_id}", # "action": "ALLOW" # } # response = await client.post(app.url_path_for("create_permission"), json=new_permission) # assert response.status_code == status.HTTP_201_CREATED # permission_id = response.json()["permission_id"] # # response = await client.put( # app.url_path_for( # "add_permission_to_user", # user_id=test_user.user_id, # permission_id=permission_id # ) # ) # assert response.status_code == status.HTTP_204_NO_CONTENT # # async def test_user_access_admin_project( # self, # app: FastAPI, # authorized_client: AsyncClient, # test_user: User, # db_session: AsyncSession # ) -> None: # # response = await authorized_client.get(app.url_path_for("get_projects")) # assert response.status_code == status.HTTP_200_OK # projects = response.json() # assert len(projects) == 2 # # class TestTemplatesWithRbac: # # async def test_admin_create_template(self, app: FastAPI, client: AsyncClient): # # new_template = {"base_script_file": "vpcs_base_config.txt", # "category": "guest", # "console_auto_start": False, # "console_type": "telnet", # "default_name_format": "PC{0}", # "name": "ADMIN_VPCS_TEMPLATE", # "compute_id": "local", # "symbol": ":/symbols/vpcs_guest.svg", # "template_type": "vpcs"} # # response = await client.post(app.url_path_for("create_template"), json=new_template) # assert response.status_code == status.HTTP_201_CREATED # # async def test_user_only_access_own_templates( # self, app: FastAPI, # authorized_client: AsyncClient, # test_user: User, # db_session: AsyncSession # ) -> None: # # new_template = {"base_script_file": "vpcs_base_config.txt", # "category": "guest", # "console_auto_start": False, # "console_type": "telnet", # "default_name_format": "PC{0}", # "name": "USER_VPCS_TEMPLATE", # "compute_id": "local", # "symbol": ":/symbols/vpcs_guest.svg", # "template_type": "vpcs"} # # response = await authorized_client.post(app.url_path_for("create_template"), json=new_template) # assert response.status_code == status.HTTP_201_CREATED # template_id = response.json()["template_id"] # # rbac_repo = RbacRepository(db_session) # permissions_in_db = await rbac_repo.get_user_permissions(test_user.user_id) # assert len(permissions_in_db) == 1 # assert permissions_in_db[0].path == f"/templates/{template_id}/*" # # response = await authorized_client.get(app.url_path_for("get_templates")) # assert response.status_code == status.HTTP_200_OK # templates = [template for template in response.json() if template["builtin"] is False] # assert len(templates) == 1 # # async def test_admin_access_all_templates(self, app: FastAPI, client: AsyncClient): # # response = await client.get(app.url_path_for("get_templates")) # assert response.status_code == status.HTTP_200_OK # templates = [template for template in response.json() if template["builtin"] is False] # assert len(templates) == 2