1
0
mirror of https://github.com/GNS3/gns3-server synced 2025-01-13 17:40:54 +00:00

Add /permissions/prune to delete orphaned permissions

This commit is contained in:
grossmj 2021-08-17 21:55:59 +09:30
parent 9df586d5d5
commit 4c6135fe88
3 changed files with 52 additions and 13 deletions

View File

@ -65,9 +65,10 @@ async def create_permission(
Create a new permission. Create a new permission.
""" """
if await rbac_repo.check_permission_exists(permission_create): # TODO: should we prevent having multiple permissions with same methods/path?
raise ControllerBadRequestError(f"Permission '{permission_create.methods} {permission_create.path} " #if await rbac_repo.check_permission_exists(permission_create):
f"{permission_create.action}' already exists") # raise ControllerBadRequestError(f"Permission '{permission_create.methods} {permission_create.path} "
# f"{permission_create.action}' already exists")
for route in request.app.routes: for route in request.app.routes:
if isinstance(route, APIRoute): if isinstance(route, APIRoute):
@ -142,3 +143,15 @@ async def delete_permission(
raise ControllerNotFoundError(f"Permission '{permission_id}' could not be deleted") raise ControllerNotFoundError(f"Permission '{permission_id}' could not be deleted")
return Response(status_code=status.HTTP_204_NO_CONTENT) return Response(status_code=status.HTTP_204_NO_CONTENT)
@router.post("/prune", status_code=status.HTTP_204_NO_CONTENT)
async def prune_permissions(
rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
) -> Response:
"""
Prune orphaned permissions.
"""
await rbac_repo.prune_permissions()
return Response(status_code=status.HTTP_204_NO_CONTENT)

View File

@ -17,7 +17,7 @@
from uuid import UUID from uuid import UUID
from typing import Optional, List, Union from typing import Optional, List, Union
from sqlalchemy import select, update, delete from sqlalchemy import select, update, delete, null
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload from sqlalchemy.orm import selectinload
@ -194,7 +194,8 @@ class RbacRepository(BaseRepository):
Get all permissions. Get all permissions.
""" """
query = select(models.Permission) query = select(models.Permission).\
order_by(models.Permission.path.desc())
result = await self._db_session.execute(query) result = await self._db_session.execute(query)
return result.scalars().all() return result.scalars().all()
@ -257,6 +258,22 @@ class RbacRepository(BaseRepository):
await self._db_session.commit() await self._db_session.commit()
return result.rowcount > 0 return result.rowcount > 0
async def prune_permissions(self) -> int:
"""
Prune orphaned permissions.
"""
query = select(models.Permission).\
filter((~models.Permission.roles.any()) & (models.Permission.user_id == null()))
result = await self._db_session.execute(query)
permissions = result.scalars().all()
permissions_deleted = 0
for permission in permissions:
if await self.delete_permission(permission.permission_id):
permissions_deleted += 1
log.info(f"{permissions_deleted} orphaned permissions have been deleted")
return permissions_deleted
def _match_permission( def _match_permission(
self, self,
permissions: List[models.Permission], permissions: List[models.Permission],
@ -284,7 +301,7 @@ class RbacRepository(BaseRepository):
query = select(models.Permission).\ query = select(models.Permission).\
join(models.User.permissions).\ join(models.User.permissions).\
filter(models.User.user_id == user_id).\ filter(models.User.user_id == user_id).\
order_by(models.Permission.path) order_by(models.Permission.path.desc())
result = await self._db_session.execute(query) result = await self._db_session.execute(query)
return result.scalars().all() return result.scalars().all()
@ -383,7 +400,7 @@ class RbacRepository(BaseRepository):
join(models.Role.groups).\ join(models.Role.groups).\
join(models.UserGroup.users).\ join(models.UserGroup.users).\
filter(models.User.user_id == user_id).\ filter(models.User.user_id == user_id).\
order_by(models.Permission.path) order_by(models.Permission.path.desc())
result = await self._db_session.execute(query) result = await self._db_session.execute(query)
permissions = result.scalars().all() permissions = result.scalars().all()

View File

@ -32,7 +32,7 @@ class TestPermissionRoutes:
new_permission = { new_permission = {
"methods": ["GET"], "methods": ["GET"],
"path": "/templates", "path": "/templates/f6113095-a703-4967-b039-ab95ac3eb4f5",
"action": "ALLOW" "action": "ALLOW"
} }
response = await client.post(app.url_path_for("create_permission"), json=new_permission) response = await client.post(app.url_path_for("create_permission"), json=new_permission)
@ -75,7 +75,7 @@ class TestPermissionRoutes:
async def test_update_permission(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession) -> None: async def test_update_permission(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession) -> None:
rbac_repo = RbacRepository(db_session) rbac_repo = RbacRepository(db_session)
permission_in_db = await rbac_repo.get_permission_by_path("/templates") permission_in_db = await rbac_repo.get_permission_by_path("/templates/*")
update_permission = { update_permission = {
"methods": ["GET"], "methods": ["GET"],
@ -101,3 +101,12 @@ class TestPermissionRoutes:
permission_in_db = await rbac_repo.get_permission_by_path("/appliances") permission_in_db = await rbac_repo.get_permission_by_path("/appliances")
response = await client.delete(app.url_path_for("delete_permission", permission_id=permission_in_db.permission_id)) response = await client.delete(app.url_path_for("delete_permission", permission_id=permission_in_db.permission_id))
assert response.status_code == status.HTTP_204_NO_CONTENT assert response.status_code == status.HTTP_204_NO_CONTENT
async def test_prune_permissions(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession) -> None:
response = await client.post(app.url_path_for("prune_permissions"))
assert response.status_code == status.HTTP_204_NO_CONTENT
rbac_repo = RbacRepository(db_session)
permissions_in_db = await rbac_repo.get_permissions()
assert len(permissions_in_db) == 5 # 5 default permissions