Check a permission matches an existing route before it is allowed to be created.

pull/1970/head
grossmj 3 years ago
parent 183033dad8
commit 9df586d5d5

@ -61,11 +61,7 @@ async def get_current_active_user(
)
# remove the prefix (e.g. "/v3") from URL path
match = re.search(r"^(/v[0-9]+).*", request.url.path)
if match:
path = request.url.path[len(match.group(1)):]
else:
path = request.url.path
path = re.sub(r"^/v[0-9]", "", request.url.path)
# special case: always authorize access to the "/users/me" endpoint
if path == "/users/me":

@ -19,10 +19,14 @@
API routes for permissions.
"""
from fastapi import APIRouter, Depends, Response, status
import re
from fastapi import APIRouter, Depends, Response, Request, status
from fastapi.routing import APIRoute
from uuid import UUID
from typing import List
from gns3server import schemas
from gns3server.controller.controller_error import (
ControllerBadRequestError,
@ -53,6 +57,7 @@ async def get_permissions(
@router.post("", response_model=schemas.Permission, status_code=status.HTTP_201_CREATED)
async def create_permission(
request: Request,
permission_create: schemas.PermissionCreate,
rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
) -> schemas.Permission:
@ -64,7 +69,27 @@ async def create_permission(
raise ControllerBadRequestError(f"Permission '{permission_create.methods} {permission_create.path} "
f"{permission_create.action}' already exists")
return await rbac_repo.create_permission(permission_create)
for route in request.app.routes:
if isinstance(route, APIRoute):
# remove the prefix (e.g. "/v3") from the route path
route_path = re.sub(r"^/v[0-9]", "", route.path)
# replace route path ID parameters by an UUID regex
route_path = re.sub(r"{\w+_id}", "[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}", route_path)
# replace remaining route path parameters by an word matching regex
route_path = re.sub(r"/{[\w:]+}", r"/\\w+", route_path)
# the permission can match multiple routes
if permission_create.path.endswith("/*"):
route_path += r"/\*"
if re.fullmatch(route_path, permission_create.path):
for method in permission_create.methods:
if method in list(route.methods):
return await rbac_repo.create_permission(permission_create)
raise ControllerBadRequestError(f"Permission '{permission_create.methods} {permission_create.path}' "
f"doesn't match any existing endpoint")
@router.get("/{permission_id}", response_model=schemas.Permission)

@ -38,6 +38,26 @@ class TestPermissionRoutes:
response = await client.post(app.url_path_for("create_permission"), json=new_permission)
assert response.status_code == status.HTTP_201_CREATED
async def test_create_wildcard_permission(self, app: FastAPI, client: AsyncClient) -> None:
new_permission = {
"methods": ["GET"],
"path": "/templates/*",
"action": "ALLOW"
}
response = await client.post(app.url_path_for("create_permission"), json=new_permission)
assert response.status_code == status.HTTP_201_CREATED
async def test_create_invalid_permission(self, app: FastAPI, client: AsyncClient) -> None:
new_permission = {
"methods": ["GET"],
"path": "/templates/invalid",
"action": "ALLOW"
}
response = await client.post(app.url_path_for("create_permission"), json=new_permission)
assert response.status_code == status.HTTP_400_BAD_REQUEST
async def test_get_permission(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession) -> None:
rbac_repo = RbacRepository(db_session)
@ -50,7 +70,7 @@ class TestPermissionRoutes:
response = await client.get(app.url_path_for("get_permissions"))
assert response.status_code == status.HTTP_200_OK
assert len(response.json()) == 6 # 5 default permissions + 1 custom permission
assert len(response.json()) == 7 # 5 default permissions + 2 custom permissions
async def test_update_permission(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession) -> None:

Loading…
Cancel
Save