diff --git a/gns3server/api/routes/controller/dependencies/authentication.py b/gns3server/api/routes/controller/dependencies/authentication.py
index 36e35d47..0af058d7 100644
--- a/gns3server/api/routes/controller/dependencies/authentication.py
+++ b/gns3server/api/routes/controller/dependencies/authentication.py
@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
+import re
from fastapi import Request, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
@@ -60,11 +61,16 @@ async def get_current_active_user(
)
# remove the prefix (e.g. "/v3") from URL path
- if request.url.path.startswith("/v3"):
- path = request.url.path[len("/v3"):]
+ match = re.search(r"^(/v[0-9]+).*", request.url.path)
+ if match:
+ path = request.url.path[len(match.group(1)):]
else:
path = request.url.path
+ # special case: always authorize access to the "/users/me" endpoint
+ if path == "/users/me":
+ return current_user
+
authorized = await rbac_repo.check_user_is_authorized(current_user.user_id, request.method, path)
if not authorized:
raise HTTPException(
diff --git a/gns3server/api/routes/controller/groups.py b/gns3server/api/routes/controller/groups.py
index 84c980a6..20b17ae4 100644
--- a/gns3server/api/routes/controller/groups.py
+++ b/gns3server/api/routes/controller/groups.py
@@ -100,7 +100,7 @@ async def update_user_group(
raise ControllerNotFoundError(f"User group '{user_group_id}' not found")
if user_group.builtin:
- raise ControllerForbiddenError(f"User group '{user_group_id}' cannot be updated")
+ raise ControllerForbiddenError(f"Built-in user group '{user_group_id}' cannot be updated")
return await users_repo.update_user_group(user_group_id, user_group_update)
@@ -122,7 +122,7 @@ async def delete_user_group(
raise ControllerNotFoundError(f"User group '{user_group_id}' not found")
if user_group.builtin:
- raise ControllerForbiddenError(f"User group '{user_group_id}' cannot be deleted")
+ raise ControllerForbiddenError(f"Built-in user group '{user_group_id}' cannot be deleted")
success = await users_repo.delete_user_group(user_group_id)
if not success:
diff --git a/gns3server/api/routes/controller/projects.py b/gns3server/api/routes/controller/projects.py
index 1d7aa2a5..55252c41 100644
--- a/gns3server/api/routes/controller/projects.py
+++ b/gns3server/api/routes/controller/projects.py
@@ -70,13 +70,25 @@ CHUNK_SIZE = 1024 * 8 # 8KB
@router.get("", response_model=List[schemas.Project], response_model_exclude_unset=True)
-def get_projects() -> List[schemas.Project]:
+async def get_projects(
+ current_user: schemas.User = Depends(get_current_active_user),
+ rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
+) -> List[schemas.Project]:
"""
Return all projects.
"""
controller = Controller.instance()
- return [p.asdict() for p in controller.projects.values()]
+ if current_user.is_superadmin:
+ return [p.asdict() for p in controller.projects.values()]
+ else:
+ user_projects = []
+ for project in controller.projects.values():
+ authorized = await rbac_repo.check_user_is_authorized(
+ current_user.user_id, "GET", f"/projects/{project.id}")
+ if authorized:
+ user_projects.append(project.asdict())
+ return user_projects
@router.post(
@@ -97,7 +109,7 @@ async def create_project(
controller = Controller.instance()
project = await controller.add_project(**jsonable_encoder(project_data, exclude_unset=True))
- await rbac_repo.add_permission_to_user(current_user.user_id, f"/projects/{project.id}/*")
+ await rbac_repo.add_permission_to_user_with_path(current_user.user_id, f"/projects/{project.id}/*")
return project.asdict()
@@ -135,7 +147,7 @@ async def delete_project(
controller = Controller.instance()
await project.delete()
controller.remove_project(project)
- await rbac_repo.delete_all_permissions_matching_path(f"/projects/{project.id}")
+ await rbac_repo.delete_all_permissions_with_path(f"/projects/{project.id}")
@router.get("/{project_id}/stats")
@@ -357,7 +369,9 @@ async def import_project(
)
async def duplicate_project(
project_data: schemas.ProjectDuplicate,
- project: Project = Depends(dep_project)
+ project: Project = Depends(dep_project),
+ current_user: schemas.User = Depends(get_current_active_user),
+ rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
) -> schemas.Project:
"""
Duplicate a project.
@@ -374,6 +388,7 @@ async def duplicate_project(
new_project = await project.duplicate(
name=project_data.name, location=location, reset_mac_addresses=reset_mac_addresses
)
+ await rbac_repo.add_permission_to_user_with_path(current_user.user_id, f"/projects/{new_project.id}/*")
return new_project.asdict()
diff --git a/gns3server/api/routes/controller/roles.py b/gns3server/api/routes/controller/roles.py
index 1d013f97..c96feb64 100644
--- a/gns3server/api/routes/controller/roles.py
+++ b/gns3server/api/routes/controller/roles.py
@@ -96,7 +96,7 @@ async def update_role(
raise ControllerNotFoundError(f"Role '{role_id}' not found")
if role.builtin:
- raise ControllerForbiddenError(f"Role '{role_id}' cannot be updated")
+ raise ControllerForbiddenError(f"Built-in role '{role_id}' cannot be updated")
return await rbac_repo.update_role(role_id, role_update)
@@ -115,7 +115,7 @@ async def delete_role(
raise ControllerNotFoundError(f"Role '{role_id}' not found")
if role.builtin:
- raise ControllerForbiddenError(f"Role '{role_id}' cannot be deleted")
+ raise ControllerForbiddenError(f"Built-in role '{role_id}' cannot be deleted")
success = await rbac_repo.delete_role(role_id)
if not success:
diff --git a/gns3server/api/routes/controller/templates.py b/gns3server/api/routes/controller/templates.py
index ee54fdc7..a346545a 100644
--- a/gns3server/api/routes/controller/templates.py
+++ b/gns3server/api/routes/controller/templates.py
@@ -33,6 +33,9 @@ from gns3server import schemas
from gns3server.controller import Controller
from gns3server.db.repositories.templates import TemplatesRepository
from gns3server.services.templates import TemplatesService
+from gns3server.db.repositories.rbac import RbacRepository
+
+from .dependencies.authentication import get_current_active_user
from .dependencies.database import get_repository
responses = {404: {"model": schemas.ErrorMessage, "description": "Could not find template"}}
@@ -44,12 +47,17 @@ router = APIRouter(responses=responses)
async def create_template(
template_create: schemas.TemplateCreate,
templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository)),
+ current_user: schemas.User = Depends(get_current_active_user),
+ rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
) -> schemas.Template:
"""
Create a new template.
"""
- return await TemplatesService(templates_repo).create_template(template_create)
+ template = await TemplatesService(templates_repo).create_template(template_create)
+ template_id = template.get("template_id")
+ await rbac_repo.add_permission_to_user_with_path(current_user.user_id, f"/templates/{template_id}/*")
+ return template
@router.get("/templates/{template_id}", response_model=schemas.Template, response_model_exclude_unset=True)
@@ -92,35 +100,58 @@ async def update_template(
status_code=status.HTTP_204_NO_CONTENT,
)
async def delete_template(
- template_id: UUID, templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository))
+ template_id: UUID,
+ templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository)),
+ rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
) -> None:
"""
Delete a template.
"""
await TemplatesService(templates_repo).delete_template(template_id)
+ await rbac_repo.delete_all_permissions_with_path(f"/templates/{template_id}")
@router.get("/templates", response_model=List[schemas.Template], response_model_exclude_unset=True)
async def get_templates(
- templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository)),
+ templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository)),
+ current_user: schemas.User = Depends(get_current_active_user),
+ rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
) -> List[schemas.Template]:
"""
Return all templates.
"""
- return await TemplatesService(templates_repo).get_templates()
+ templates = await TemplatesService(templates_repo).get_templates()
+ if current_user.is_superadmin:
+ return templates
+ else:
+ user_templates = []
+ for template in templates:
+ if template.get("builtin") is True:
+ user_templates.append(template)
+ continue
+ template_id = template.get("template_id")
+ authorized = await rbac_repo.check_user_is_authorized(
+ current_user.user_id, "GET", f"/templates/{template_id}")
+ if authorized:
+ user_templates.append(template)
+ return user_templates
@router.post("/templates/{template_id}/duplicate", response_model=schemas.Template, status_code=status.HTTP_201_CREATED)
async def duplicate_template(
- template_id: UUID, templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository))
+ template_id: UUID, templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository)),
+ current_user: schemas.User = Depends(get_current_active_user),
+ rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
) -> schemas.Template:
"""
Duplicate a template.
"""
- return await TemplatesService(templates_repo).duplicate_template(template_id)
+ template = await TemplatesService(templates_repo).duplicate_template(template_id)
+ await rbac_repo.add_permission_to_user_with_path(current_user.user_id, f"/templates/{template_id}/*")
+ return template
@router.post(
diff --git a/gns3server/api/routes/controller/users.py b/gns3server/api/routes/controller/users.py
index df279e20..edf53b9a 100644
--- a/gns3server/api/routes/controller/users.py
+++ b/gns3server/api/routes/controller/users.py
@@ -32,6 +32,7 @@ from gns3server.controller.controller_error import (
)
from gns3server.db.repositories.users import UsersRepository
+from gns3server.db.repositories.rbac import RbacRepository
from gns3server.services import auth_service
from .dependencies.authentication import get_current_active_user
@@ -210,3 +211,65 @@ async def get_user_memberships(
"""
return await users_repo.get_user_memberships(user_id)
+
+
+@router.get(
+ "/{user_id}/permissions",
+ dependencies=[Depends(get_current_active_user)],
+ response_model=List[schemas.Permission]
+)
+async def get_user_permissions(
+ user_id: UUID,
+ rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
+) -> List[schemas.Permission]:
+ """
+ Get user permissions.
+ """
+
+ return await rbac_repo.get_user_permissions(user_id)
+
+
+@router.put(
+ "/{user_id}/permissions/{permission_id}",
+ dependencies=[Depends(get_current_active_user)],
+ status_code=status.HTTP_204_NO_CONTENT
+)
+async def add_permission_to_user(
+ user_id: UUID,
+ permission_id: UUID,
+ rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
+) -> None:
+ """
+ Add a permission to an user.
+ """
+
+ permission = await rbac_repo.get_permission(permission_id)
+ if not permission:
+ raise ControllerNotFoundError(f"Permission '{permission_id}' not found")
+
+ user = await rbac_repo.add_permission_to_user(user_id, permission)
+ if not user:
+ raise ControllerNotFoundError(f"User '{user_id}' not found")
+
+
+@router.delete(
+ "/{user_id}/permissions/{permission_id}",
+ dependencies=[Depends(get_current_active_user)],
+ status_code=status.HTTP_204_NO_CONTENT
+)
+async def remove_permission_from_user(
+ user_id: UUID,
+ permission_id: UUID,
+ rbac_repo: RbacRepository = Depends(get_repository(RbacRepository)),
+) -> None:
+ """
+ Remove permission from an user.
+ """
+
+ permission = await rbac_repo.get_permission(permission_id)
+ if not permission:
+ raise ControllerNotFoundError(f"Permission '{permission_id}' not found")
+
+ user = await rbac_repo.remove_permission_from_user(user_id, permission)
+ if not user:
+ raise ControllerNotFoundError(f"User '{user_id}' not found")
diff --git a/gns3server/db/models/permissions.py b/gns3server/db/models/permissions.py
index 534f2274..4779b6af 100644
--- a/gns3server/db/models/permissions.py
+++ b/gns3server/db/models/permissions.py
@@ -58,21 +58,27 @@ def create_default_roles(target, connection, **kw):
"action": "ALLOW"
},
{
- "description": "Allow access to the logged in user",
- "methods": ["GET"],
- "path": "/users/me",
- "action": "ALLOW"
- },
- {
- "description": "Allow to create a project or list projects",
- "methods": ["GET", "POST"],
+ "description": "Allow to create and list projects",
+ "methods": ["GET", "HEAD", "POST"],
"path": "/projects",
"action": "ALLOW"
},
{
- "description": "Allow to access to all symbol endpoints",
- "methods": ["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH"],
- "path": "/symbols",
+ "description": "Allow to create and list templates",
+ "methods": ["GET", "HEAD", "POST"],
+ "path": "/templates",
+ "action": "ALLOW"
+ },
+ {
+ "description": "Allow to list computes",
+ "methods": ["GET"],
+ "path": "/computes/*",
+ "action": "ALLOW"
+ },
+ {
+ "description": "Allow access to all symbol endpoints",
+ "methods": ["GET", "HEAD", "POST"],
+ "path": "/symbols/*",
"action": "ALLOW"
},
]
@@ -106,7 +112,7 @@ def add_permissions_to_role(target, connection, **kw):
role_id = result.first().role_id
# add minimum required paths to the "User" role
- for path in ("/projects", "/symbols", "/users/me"):
+ for path in ("/projects", "/templates", "/computes/*", "/symbols/*"):
stmt = permissions_table.select().where(permissions_table.c.path == path)
result = connection.execute(stmt)
permission_id = result.first().permission_id
diff --git a/gns3server/db/repositories/rbac.py b/gns3server/db/repositories/rbac.py
index 39d52320..6e1096c9 100644
--- a/gns3server/db/repositories/rbac.py
+++ b/gns3server/db/repositories/rbac.py
@@ -216,6 +216,7 @@ class RbacRepository(BaseRepository):
"""
db_permission = models.Permission(
+ description=permission_create.description,
methods=permission_create.methods,
path=permission_create.path,
action=permission_create.action,
@@ -270,60 +271,76 @@ class RbacRepository(BaseRepository):
log.debug(f"RBAC: checking permission {permission.methods} {permission.path} {permission.action}")
if method not in permission.methods:
continue
- if permission.path.endswith("*") and path.startswith(permission.path[:-1]):
+ if permission.path.endswith("/*") and path.startswith(permission.path[:-2]):
return permission
elif permission.path == path:
return permission
- async def check_user_is_authorized(self, user_id: UUID, method: str, path: str) -> bool:
+ async def get_user_permissions(self, user_id: UUID):
"""
- Check if an user is authorized to access a resource.
+ Get all permissions from an user.
"""
- query = select(models.Permission).\
- join(models.Permission.roles). \
- join(models.Role.groups). \
- join(models.UserGroup.users). \
- filter(models.User.user_id == user_id).\
- order_by(models.Permission.path)
-
- result = await self._db_session.execute(query)
- permissions = result.scalars().all()
- log.debug(f"RBAC: checking authorization for '{user_id}' on {method} '{path}'")
- matched_permission = self._match_permission(permissions, method, path)
- if matched_permission:
- log.debug(f"RBAC: matched role permission {matched_permission.methods} "
- f"{matched_permission.path} {matched_permission.action}")
- if matched_permission.action == "DENY":
- return False
- return True
-
- log.debug(f"RBAC: could not find a role permission, checking user permissions...")
query = select(models.Permission).\
join(models.User.permissions). \
filter(models.User.user_id == user_id).\
order_by(models.Permission.path)
result = await self._db_session.execute(query)
- permissions = result.scalars().all()
- matched_permission = self._match_permission(permissions, method, path)
- if matched_permission:
- log.debug(f"RBAC: matched user permission {matched_permission.methods} "
- f"{matched_permission.path} {matched_permission.action}")
- if matched_permission.action == "DENY":
- return False
- return True
+ return result.scalars().all()
- return False
-
- async def add_permission_to_user(self, user_id: UUID, path: str) -> Union[None, models.User]:
+ async def add_permission_to_user(
+ self,
+ user_id: UUID,
+ permission: models.Permission
+ ) -> Union[None, models.User]:
"""
Add a permission to an user.
"""
- # Create a new permission with full rights
+ query = select(models.User).\
+ options(selectinload(models.User.permissions)).\
+ where(models.User.user_id == user_id)
+ result = await self._db_session.execute(query)
+ user_db = result.scalars().first()
+ if not user_db:
+ return None
+
+ user_db.permissions.append(permission)
+ await self._db_session.commit()
+ await self._db_session.refresh(user_db)
+ return user_db
+
+ async def remove_permission_from_user(
+ self,
+ user_id: UUID,
+ permission: models.Permission
+ ) -> Union[None, models.User]:
+ """
+ Remove a permission from a role.
+ """
+
+ query = select(models.User).\
+ options(selectinload(models.User.permissions)).\
+ where(models.User.user_id == user_id)
+ result = await self._db_session.execute(query)
+ user_db = result.scalars().first()
+ if not user_db:
+ return None
+
+ user_db.permissions.remove(permission)
+ await self._db_session.commit()
+ await self._db_session.refresh(user_db)
+ return user_db
+
+ async def add_permission_to_user_with_path(self, user_id: UUID, path: str) -> Union[None, models.User]:
+ """
+ Add a permission to an user.
+ """
+
+ # Create a new permission with full rights on path
new_permission = schemas.PermissionCreate(
- description=f"Allow access to project {path}",
+ description=f"Allow access to {path}",
methods=[HTTPMethods.get, HTTPMethods.head, HTTPMethods.post, HTTPMethods.put, HTTPMethods.delete],
path=path,
action=PermissionAction.allow
@@ -345,9 +362,9 @@ class RbacRepository(BaseRepository):
await self._db_session.refresh(user_db)
return user_db
- async def delete_all_permissions_matching_path(self, path: str) -> None:
+ async def delete_all_permissions_with_path(self, path: str) -> None:
"""
- Delete all permissions matching with path.
+ Delete all permissions with path.
"""
query = delete(models.Permission).\
@@ -355,3 +372,38 @@ class RbacRepository(BaseRepository):
execution_options(synchronize_session=False)
result = await self._db_session.execute(query)
log.debug(f"{result.rowcount} permission(s) have been deleted")
+
+ async def check_user_is_authorized(self, user_id: UUID, method: str, path: str) -> bool:
+ """
+ Check if an user is authorized to access a resource.
+ """
+
+ query = select(models.Permission).\
+ join(models.Permission.roles). \
+ join(models.Role.groups). \
+ join(models.UserGroup.users). \
+ filter(models.User.user_id == user_id).\
+ order_by(models.Permission.path)
+
+ result = await self._db_session.execute(query)
+ permissions = result.scalars().all()
+ log.debug(f"RBAC: checking authorization for user '{user_id}' on {method} '{path}'")
+ matched_permission = self._match_permission(permissions, method, path)
+ if matched_permission:
+ log.debug(f"RBAC: matched role permission {matched_permission.methods} "
+ f"{matched_permission.path} {matched_permission.action}")
+ if matched_permission.action == "DENY":
+ return False
+ return True
+
+ log.debug(f"RBAC: could not find a role permission, checking user permissions...")
+ permissions = await self.get_user_permissions(user_id)
+ matched_permission = self._match_permission(permissions, method, path)
+ if matched_permission:
+ log.debug(f"RBAC: matched user permission {matched_permission.methods} "
+ f"{matched_permission.path} {matched_permission.action}")
+ if matched_permission.action == "DENY":
+ return False
+ return True
+
+ return False
diff --git a/tests/api/routes/controller/test_permissions.py b/tests/api/routes/controller/test_permissions.py
index fc15087f..1bd4e0ff 100644
--- a/tests/api/routes/controller/test_permissions.py
+++ b/tests/api/routes/controller/test_permissions.py
@@ -50,7 +50,7 @@ class TestPermissionRoutes:
response = await client.get(app.url_path_for("get_permissions"))
assert response.status_code == status.HTTP_200_OK
- assert len(response.json()) == 5 # 4 default permissions + 1 custom permission
+ assert len(response.json()) == 6 # 5 default permissions + 1 custom permission
async def test_update_permission(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession) -> None:
diff --git a/tests/api/routes/controller/test_roles.py b/tests/api/routes/controller/test_roles.py
index 93b21004..20646d92 100644
--- a/tests/api/routes/controller/test_roles.py
+++ b/tests/api/routes/controller/test_roles.py
@@ -64,21 +64,21 @@ class TestRolesRoutes:
updated_role_in_db = await rbac_repo.get_role(role_in_db.role_id)
assert updated_role_in_db.name == "role42"
- # async def test_cannot_update_admin_group(
- # self,
- # app: FastAPI,
- # client: AsyncClient,
- # db_session: AsyncSession
- # ) -> None:
- #
- # user_repo = UsersRepository(db_session)
- # group_in_db = await user_repo.get_user_group_by_name("Administrators")
- # update_group = {"name": "Hackers"}
- # response = await client.put(
- # app.url_path_for("update_user_group", user_group_id=group_in_db.user_group_id),
- # json=update_group
- # )
- # assert response.status_code == status.HTTP_403_FORBIDDEN
+ async def test_cannot_update_builtin_user_role(
+ self,
+ app: FastAPI,
+ client: AsyncClient,
+ db_session: AsyncSession
+ ) -> None:
+
+ rbac_repo = RbacRepository(db_session)
+ role_in_db = await rbac_repo.get_role_by_name("User")
+ update_role = {"name": "Hackers"}
+ response = await client.put(
+ app.url_path_for("update_role", role_id=role_in_db.role_id),
+ json=update_role
+ )
+ assert response.status_code == status.HTTP_403_FORBIDDEN
async def test_delete_role(
self,
@@ -92,29 +92,29 @@ class TestRolesRoutes:
response = await client.delete(app.url_path_for("delete_role", role_id=role_in_db.role_id))
assert response.status_code == status.HTTP_204_NO_CONTENT
- # async def test_cannot_delete_admin_group(
- # self,
- # app: FastAPI,
- # client: AsyncClient,
- # db_session: AsyncSession
- # ) -> None:
- #
- # user_repo = UsersRepository(db_session)
- # group_in_db = await user_repo.get_user_group_by_name("Administrators")
- # response = await client.delete(app.url_path_for("delete_user_group", user_group_id=group_in_db.user_group_id))
- # assert response.status_code == status.HTTP_403_FORBIDDEN
+ async def test_cannot_delete_builtin_administrator_role(
+ self,
+ app: FastAPI,
+ client: AsyncClient,
+ db_session: AsyncSession
+ ) -> None:
+
+ rbac_repo = RbacRepository(db_session)
+ role_in_db = await rbac_repo.get_role_by_name("Administrator")
+ response = await client.delete(app.url_path_for("delete_role", role_id=role_in_db.role_id))
+ assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.fixture
async def test_permission(db_session: AsyncSession) -> Permission:
new_permission = schemas.PermissionCreate(
- methods=[HTTPMethods.get, HTTPMethods.post],
- path="/templates",
+ methods=[HTTPMethods.get],
+ path="/statistics",
action=PermissionAction.allow
)
rbac_repo = RbacRepository(db_session)
- existing_permission = await rbac_repo.get_permission_by_path("/templates")
+ existing_permission = await rbac_repo.get_permission_by_path("/statistics")
if existing_permission:
return existing_permission
return await rbac_repo.create_permission(new_permission)
@@ -142,7 +142,7 @@ class TestRolesPermissionsRoutes:
)
assert response.status_code == status.HTTP_204_NO_CONTENT
permissions = await rbac_repo.get_role_permissions(role_in_db.role_id)
- assert len(permissions) == 4 # 3 default + 1 custom permissions
+ assert len(permissions) == 5 # 4 default permissions + 1 custom permission
async def test_get_role_permissions(
self,
@@ -160,7 +160,7 @@ class TestRolesPermissionsRoutes:
role_id=role_in_db.role_id)
)
assert response.status_code == status.HTTP_200_OK
- assert len(response.json()) == 4 # 3 default + 1 custom permissions
+ assert len(response.json()) == 5 # 4 default permissions + 1 custom permission
async def test_remove_role_from_group(
self,
@@ -182,4 +182,4 @@ class TestRolesPermissionsRoutes:
)
assert response.status_code == status.HTTP_204_NO_CONTENT
permissions = await rbac_repo.get_role_permissions(role_in_db.role_id)
- assert len(permissions) == 3 # 3 default permissions
+ assert len(permissions) == 4 # 4 default permissions
diff --git a/tests/api/routes/controller/test_users.py b/tests/api/routes/controller/test_users.py
index 36b16c10..ce0d8801 100644
--- a/tests/api/routes/controller/test_users.py
+++ b/tests/api/routes/controller/test_users.py
@@ -25,9 +25,12 @@ from jose import jwt
from sqlalchemy.ext.asyncio import AsyncSession
from gns3server.db.repositories.users import UsersRepository
+from gns3server.db.repositories.rbac import RbacRepository
+from gns3server.schemas.controller.rbac import Permission, HTTPMethods, PermissionAction
from gns3server.services import auth_service
from gns3server.config import Config
from gns3server.schemas.controller.users import User
+from gns3server import schemas
import gns3server.db.models as models
pytestmark = pytest.mark.asyncio
@@ -341,3 +344,79 @@ class TestSuperAdmin:
# response = await client.get(app.url_path_for("get_user_memberships", user_id=admin_in_db.user_id))
# assert response.status_code == status.HTTP_200_OK
# assert len(response.json()) == 1
+
+@pytest.fixture
+async def test_permission(db_session: AsyncSession) -> Permission:
+
+ new_permission = schemas.PermissionCreate(
+ methods=[HTTPMethods.get],
+ path="/statistics",
+ action=PermissionAction.allow
+ )
+ rbac_repo = RbacRepository(db_session)
+ existing_permission = await rbac_repo.get_permission_by_path("/statistics")
+ if existing_permission:
+ return existing_permission
+ return await rbac_repo.create_permission(new_permission)
+
+
+class TestUserPermissionsRoutes:
+
+ async def test_add_permission_to_user(
+ self,
+ app: FastAPI,
+ client: AsyncClient,
+ test_user: User,
+ test_permission: Permission,
+ db_session: AsyncSession
+ ) -> None:
+
+ response = await client.put(
+ app.url_path_for(
+ "add_permission_to_user",
+ user_id=str(test_user.user_id),
+ permission_id=str(test_permission.permission_id)
+ )
+ )
+ assert response.status_code == status.HTTP_204_NO_CONTENT
+ rbac_repo = RbacRepository(db_session)
+ permissions = await rbac_repo.get_user_permissions(test_user.user_id)
+ assert len(permissions) == 1
+ assert permissions[0].permission_id == test_permission.permission_id
+
+ async def test_get_user_permissions(
+ self,
+ app: FastAPI,
+ client: AsyncClient,
+ test_user: User,
+ db_session: AsyncSession
+ ) -> None:
+
+ response = await client.get(
+ app.url_path_for(
+ "get_user_permissions",
+ user_id=str(test_user.user_id))
+ )
+ assert response.status_code == status.HTTP_200_OK
+ assert len(response.json()) == 1
+
+ async def test_remove_permission_from_user(
+ self,
+ app: FastAPI,
+ client: AsyncClient,
+ test_user: User,
+ test_permission: Permission,
+ db_session: AsyncSession
+ ) -> None:
+
+ response = await client.delete(
+ app.url_path_for(
+ "remove_permission_from_user",
+ user_id=str(test_user.user_id),
+ permission_id=str(test_permission.permission_id)
+ ),
+ )
+ assert response.status_code == status.HTTP_204_NO_CONTENT
+ rbac_repo = RbacRepository(db_session)
+ permissions = await rbac_repo.get_user_permissions(test_user.user_id)
+ assert len(permissions) == 0
diff --git a/tests/controller/test_rbac.py b/tests/controller/test_rbac.py
new file mode 100644
index 00000000..faa4e6df
--- /dev/null
+++ b/tests/controller/test_rbac.py
@@ -0,0 +1,203 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2021 GNS3 Technologies Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import pytest
+
+from fastapi import FastAPI, status
+from httpx import AsyncClient
+
+from sqlalchemy.ext.asyncio import AsyncSession
+from gns3server.db.repositories.rbac import RbacRepository
+from gns3server.db.models import User
+
+pytestmark = pytest.mark.asyncio
+
+
+class TestPermissions:
+
+ @pytest.mark.parametrize(
+ "method, path, result",
+ (
+ ("GET", "/users", False),
+ ("GET", "/projects", True),
+ ("GET", "/projects/e451ad73-2519-4f83-87fe-a8e821792d44", False),
+ ("POST", "/projects", True),
+ ("GET", "/templates", True),
+ ("GET", "/templates/62e92cf1-244a-4486-8dae-b95439b54da9", False),
+ ("POST", "/templates", True),
+ ("GET", "/computes", True),
+ ("GET", "/computes/local", True),
+ ("GET", "/symbols", True),
+ ("GET", "/symbols/default_symbols", True),
+ ),
+ )
+ async def test_default_permissions_user_group(
+ self,
+ app: FastAPI,
+ authorized_client: AsyncClient,
+ test_user: User,
+ db_session: AsyncSession,
+ method: str,
+ path: str,
+ result: bool
+ ) -> None:
+
+ rbac_repo = RbacRepository(db_session)
+ authorized = await rbac_repo.check_user_is_authorized(test_user.user_id, method, path)
+ assert authorized == result
+
+
+class TestProjectsWithRbac:
+
+ async def test_admin_create_project(self, app: FastAPI, client: AsyncClient):
+
+ params = {"name": "Admin project"}
+ response = await client.post(app.url_path_for("create_project"), json=params)
+ assert response.status_code == status.HTTP_201_CREATED
+
+ async def test_user_only_access_own_projects(
+ self,
+ app: FastAPI,
+ authorized_client: AsyncClient,
+ test_user: User,
+ db_session: AsyncSession
+ ) -> None:
+
+ params = {"name": "User project"}
+ response = await authorized_client.post(app.url_path_for("create_project"), json=params)
+ assert response.status_code == status.HTTP_201_CREATED
+ project_id = response.json()["project_id"]
+
+ rbac_repo = RbacRepository(db_session)
+ permissions_in_db = await rbac_repo.get_user_permissions(test_user.user_id)
+ assert len(permissions_in_db) == 1
+ assert permissions_in_db[0].path == f"/projects/{project_id}/*"
+
+ response = await authorized_client.get(app.url_path_for("get_projects"))
+ assert response.status_code == status.HTTP_200_OK
+ projects = response.json()
+ assert len(projects) == 1
+
+ async def test_admin_access_all_projects(self, app: FastAPI, client: AsyncClient):
+
+ response = await client.get(app.url_path_for("get_projects"))
+ assert response.status_code == status.HTTP_200_OK
+ projects = response.json()
+ assert len(projects) == 2
+
+ async def test_admin_user_give_permission_on_project(
+ self,
+ app: FastAPI,
+ client: AsyncClient,
+ test_user: User
+ ):
+
+ response = await client.get(app.url_path_for("get_projects"))
+ assert response.status_code == status.HTTP_200_OK
+ projects = response.json()
+ project_id = None
+ for project in projects:
+ if project["name"] == "Admin project":
+ project_id = project["project_id"]
+ break
+
+ new_permission = {
+ "methods": ["GET"],
+ "path": f"/projects/{project_id}",
+ "action": "ALLOW"
+ }
+ response = await client.post(app.url_path_for("create_permission"), json=new_permission)
+ assert response.status_code == status.HTTP_201_CREATED
+ permission_id = response.json()["permission_id"]
+
+ response = await client.put(
+ app.url_path_for(
+ "add_permission_to_user",
+ user_id=test_user.user_id,
+ permission_id=permission_id
+ )
+ )
+ assert response.status_code == status.HTTP_204_NO_CONTENT
+
+ async def test_user_access_admin_project(
+ self,
+ app: FastAPI,
+ authorized_client: AsyncClient,
+ test_user: User,
+ db_session: AsyncSession
+ ) -> None:
+
+ response = await authorized_client.get(app.url_path_for("get_projects"))
+ assert response.status_code == status.HTTP_200_OK
+ projects = response.json()
+ assert len(projects) == 2
+
+
+class TestTemplatesWithRbac:
+
+ async def test_admin_create_template(self, app: FastAPI, client: AsyncClient):
+
+ new_template = {"base_script_file": "vpcs_base_config.txt",
+ "category": "guest",
+ "console_auto_start": False,
+ "console_type": "telnet",
+ "default_name_format": "PC{0}",
+ "name": "ADMIN_VPCS_TEMPLATE",
+ "compute_id": "local",
+ "symbol": ":/symbols/vpcs_guest.svg",
+ "template_type": "vpcs"}
+
+ response = await client.post(app.url_path_for("create_template"), json=new_template)
+ assert response.status_code == status.HTTP_201_CREATED
+
+ async def test_user_only_access_own_templates(
+ self, app: FastAPI,
+ authorized_client: AsyncClient,
+ test_user: User,
+ db_session: AsyncSession
+ ) -> None:
+
+ new_template = {"base_script_file": "vpcs_base_config.txt",
+ "category": "guest",
+ "console_auto_start": False,
+ "console_type": "telnet",
+ "default_name_format": "PC{0}",
+ "name": "USER_VPCS_TEMPLATE",
+ "compute_id": "local",
+ "symbol": ":/symbols/vpcs_guest.svg",
+ "template_type": "vpcs"}
+
+ response = await authorized_client.post(app.url_path_for("create_template"), json=new_template)
+ assert response.status_code == status.HTTP_201_CREATED
+ template_id = response.json()["template_id"]
+
+ rbac_repo = RbacRepository(db_session)
+ permissions_in_db = await rbac_repo.get_user_permissions(test_user.user_id)
+ assert len(permissions_in_db) == 1
+ assert permissions_in_db[0].path == f"/templates/{template_id}/*"
+
+ response = await authorized_client.get(app.url_path_for("get_templates"))
+ assert response.status_code == status.HTTP_200_OK
+ templates = [template for template in response.json() if template["builtin"] is False]
+ assert len(templates) == 1
+
+ async def test_admin_access_all_templates(self, app: FastAPI, client: AsyncClient):
+
+ response = await client.get(app.url_path_for("get_templates"))
+ assert response.status_code == status.HTTP_200_OK
+ templates = [template for template in response.json() if template["builtin"] is False]
+ assert len(templates) == 2