mirror of https://github.com/GNS3/gns3-server
commit
b0c4fc17ad
@ -0,0 +1,250 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (C) 2023 GNS3 Technologies Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
API routes for ACL.
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from fastapi import APIRouter, Depends, Request, status
|
||||
from fastapi.routing import APIRoute
|
||||
from uuid import UUID
|
||||
from typing import List
|
||||
|
||||
|
||||
from gns3server import schemas
|
||||
from gns3server.controller.controller_error import (
|
||||
ControllerBadRequestError,
|
||||
ControllerNotFoundError
|
||||
)
|
||||
|
||||
from gns3server.controller import Controller
|
||||
from gns3server.db.repositories.users import UsersRepository
|
||||
from gns3server.db.repositories.rbac import RbacRepository
|
||||
from gns3server.db.repositories.images import ImagesRepository
|
||||
from gns3server.db.repositories.templates import TemplatesRepository
|
||||
from .dependencies.database import get_repository
|
||||
from .dependencies.rbac import has_privilege
|
||||
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get(
|
||||
"/endpoints",
|
||||
status_code=status.HTTP_201_CREATED,
|
||||
dependencies=[Depends(has_privilege("ACE.Audit"))]
|
||||
)
|
||||
async def endpoints(
|
||||
users_repo: UsersRepository = Depends(get_repository(UsersRepository)),
|
||||
rbac_repo: RbacRepository = Depends(get_repository(RbacRepository)),
|
||||
images_repo: ImagesRepository = Depends(get_repository(ImagesRepository)),
|
||||
templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository))
|
||||
) -> List[dict]:
|
||||
"""
|
||||
List all endpoints to be used in ACL entries.
|
||||
"""
|
||||
|
||||
controller = Controller.instance()
|
||||
endpoints = [{"endpoint": "/", "name": "All endpoints", "endpoint_type": "root"}]
|
||||
|
||||
def add_to_endpoints(endpoint: str, name: str, endpoint_type: str) -> None:
|
||||
if endpoint not in endpoints:
|
||||
endpoints.append({"endpoint": endpoint, "name": name, "endpoint_type": endpoint_type})
|
||||
|
||||
# projects
|
||||
add_to_endpoints("/projects", "All projects", "project")
|
||||
projects = [p for p in controller.projects.values()]
|
||||
for project in projects:
|
||||
add_to_endpoints(f"/projects/{project.id}", f'Project "{project.name}"', "project")
|
||||
|
||||
# nodes
|
||||
add_to_endpoints(f"/projects/{project.id}/nodes", f'All nodes in project "{project.name}"', "node")
|
||||
for node in project.nodes.values():
|
||||
add_to_endpoints(
|
||||
f"/projects/{project.id}/nodes/{node['node_id']}",
|
||||
f'Node "{node["name"]}" in project "{project.name}"',
|
||||
endpoint_type="node"
|
||||
)
|
||||
|
||||
# links
|
||||
add_to_endpoints(f"/projects/{project.id}/links", f'All links in project "{project.name}"', "link")
|
||||
for link in project.links.values():
|
||||
node_id_1 = link["nodes"][0]["node_id"]
|
||||
node_id_2 = link["nodes"][1]["node_id"]
|
||||
node_name_1 = project.nodes[node_id_1]["name"]
|
||||
node_name_2 = project.nodes[node_id_2]["name"]
|
||||
add_to_endpoints(
|
||||
f"/projects/{project.id}/links/{link['link_id']}",
|
||||
f'Link from "{node_name_1}" to "{node_name_2}" in project "{project.name}"',
|
||||
endpoint_type="link"
|
||||
)
|
||||
|
||||
# users
|
||||
add_to_endpoints("/users", "All users", "user")
|
||||
users = await users_repo.get_users()
|
||||
for user in users:
|
||||
add_to_endpoints(f"/users/{user.user_id}", f'User "{user.username}"', "user")
|
||||
|
||||
# groups
|
||||
add_to_endpoints("/groups", "All groups", "group")
|
||||
groups = await users_repo.get_user_groups()
|
||||
for group in groups:
|
||||
add_to_endpoints(f"/groups/{group.user_group_id}", f'Group "{group.name}"', "group")
|
||||
|
||||
# roles
|
||||
add_to_endpoints("/roles", "All roles", "role")
|
||||
roles = await rbac_repo.get_roles()
|
||||
for role in roles:
|
||||
add_to_endpoints(f"/roles/{role.role_id}", f'Role "{role.name}"', "role")
|
||||
|
||||
# images
|
||||
add_to_endpoints("/images", "All images", "image")
|
||||
images = await images_repo.get_images()
|
||||
for image in images:
|
||||
add_to_endpoints(f"/images/{image.filename}", f'Image "{image.filename}"', "image")
|
||||
|
||||
# templates
|
||||
add_to_endpoints("/templates", "All templates", "template")
|
||||
templates = await templates_repo.get_templates()
|
||||
for template in templates:
|
||||
add_to_endpoints(f"/templates/{template.template_id}", f'Template "{template.name}"', "template")
|
||||
|
||||
return endpoints
|
||||
|
||||
|
||||
@router.get(
|
||||
"",
|
||||
response_model=List[schemas.ACE],
|
||||
dependencies=[Depends(has_privilege("ACE.Audit"))]
|
||||
)
|
||||
async def get_aces(
|
||||
rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
|
||||
) -> List[schemas.ACE]:
|
||||
"""
|
||||
Get all ACL entries.
|
||||
|
||||
Required privilege: ACE.Audit
|
||||
"""
|
||||
|
||||
return await rbac_repo.get_aces()
|
||||
|
||||
|
||||
@router.post(
|
||||
"",
|
||||
response_model=schemas.ACE,
|
||||
status_code=status.HTTP_201_CREATED,
|
||||
dependencies=[Depends(has_privilege("ACE.Allocate"))]
|
||||
)
|
||||
async def create_ace(
|
||||
request: Request,
|
||||
ace_create: schemas.ACECreate,
|
||||
rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
|
||||
) -> schemas.ACE:
|
||||
"""
|
||||
Create a new ACL entry.
|
||||
|
||||
Required privilege: ACE.Allocate
|
||||
"""
|
||||
|
||||
for route in request.app.routes:
|
||||
if isinstance(route, APIRoute):
|
||||
|
||||
# remove the prefix (e.g. "/v3") from the route path
|
||||
route_path = re.sub(r"^/v[0-9]", "", route.path)
|
||||
# replace route path ID parameters by a UUID regex
|
||||
route_path = re.sub(r"{\w+_id}", "[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}", route_path)
|
||||
# replace remaining route path parameters by a word matching regex
|
||||
route_path = re.sub(r"/{[\w:]+}", r"/\\w+", route_path)
|
||||
|
||||
if re.fullmatch(route_path, ace_create.path):
|
||||
log.info("Creating ACE for route path", ace_create.path, route_path)
|
||||
return await rbac_repo.create_ace(ace_create)
|
||||
|
||||
raise ControllerBadRequestError(f"Path '{ace_create.path}' doesn't match any existing endpoint")
|
||||
|
||||
|
||||
@router.get(
|
||||
"/{ace_id}",
|
||||
response_model=schemas.ACE,
|
||||
dependencies=[Depends(has_privilege("ACE.Audit"))]
|
||||
)
|
||||
async def get_ace(
|
||||
ace_id: UUID,
|
||||
rbac_repo: RbacRepository = Depends(get_repository(RbacRepository)),
|
||||
) -> schemas.ACE:
|
||||
"""
|
||||
Get an ACL entry.
|
||||
|
||||
Required privilege: ACE.Audit
|
||||
"""
|
||||
|
||||
ace = await rbac_repo.get_ace(ace_id)
|
||||
if not ace:
|
||||
raise ControllerNotFoundError(f"ACL entry '{ace_id}' not found")
|
||||
return ace
|
||||
|
||||
|
||||
@router.put(
|
||||
"/{ace_id}",
|
||||
response_model=schemas.ACE,
|
||||
dependencies=[Depends(has_privilege("ACE.Modify"))]
|
||||
)
|
||||
async def update_ace(
|
||||
ace_id: UUID,
|
||||
ace_update: schemas.ACEUpdate,
|
||||
rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
|
||||
) -> schemas.ACE:
|
||||
"""
|
||||
Update an ACL entry.
|
||||
|
||||
Required privilege: ACE.Modify
|
||||
"""
|
||||
|
||||
ace = await rbac_repo.get_ace(ace_id)
|
||||
if not ace:
|
||||
raise ControllerNotFoundError(f"ACL entry '{ace_id}' not found")
|
||||
|
||||
return await rbac_repo.update_ace(ace_id, ace_update)
|
||||
|
||||
|
||||
@router.delete(
|
||||
"/{ace_id}",
|
||||
status_code=status.HTTP_204_NO_CONTENT,
|
||||
dependencies=[Depends(has_privilege("ACE.Allocate"))]
|
||||
)
|
||||
async def delete_ace(
|
||||
ace_id: UUID,
|
||||
rbac_repo: RbacRepository = Depends(get_repository(RbacRepository)),
|
||||
) -> None:
|
||||
"""
|
||||
Delete an ACL entry.
|
||||
|
||||
Required privilege: ACE.Allocate
|
||||
"""
|
||||
|
||||
ace = await rbac_repo.get_ace(ace_id)
|
||||
if not ace:
|
||||
raise ControllerNotFoundError(f"ACL entry '{ace_id}' not found")
|
||||
|
||||
success = await rbac_repo.delete_ace(ace_id)
|
||||
if not success:
|
||||
raise ControllerNotFoundError(f"ACL entry '{ace_id}' could not be deleted")
|
@ -0,0 +1,78 @@
|
||||
#
|
||||
# Copyright (C) 2023 GNS3 Technologies Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import re
|
||||
|
||||
from fastapi import Request, WebSocket, Depends, HTTPException
|
||||
from gns3server import schemas
|
||||
from gns3server.db.repositories.rbac import RbacRepository
|
||||
from .authentication import get_current_active_user, get_current_active_user_from_websocket
|
||||
from .database import get_repository
|
||||
|
||||
import logging
|
||||
|
||||
log = logging.getLogger()
|
||||
|
||||
|
||||
def has_privilege(
|
||||
privilege_name: str
|
||||
):
|
||||
async def get_user_and_check_privilege(
|
||||
request: Request,
|
||||
current_user: schemas.User = Depends(get_current_active_user),
|
||||
rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
|
||||
):
|
||||
if not current_user.is_superadmin:
|
||||
path = re.sub(r"^/v[0-9]", "", request.url.path) # remove the prefix (e.g. "/v3") from URL path
|
||||
log.debug(f"Checking user {current_user.username} has privilege {privilege_name} on '{path}'")
|
||||
if not await rbac_repo.check_user_has_privilege(current_user.user_id, path, privilege_name):
|
||||
raise HTTPException(status_code=403, detail=f"Permission denied (privilege {privilege_name} is required)")
|
||||
return current_user
|
||||
return get_user_and_check_privilege
|
||||
|
||||
|
||||
def has_privilege_on_websocket(
|
||||
privilege_name: str
|
||||
):
|
||||
async def get_user_and_check_privilege(
|
||||
websocket: WebSocket,
|
||||
current_user: schemas.User = Depends(get_current_active_user_from_websocket),
|
||||
rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
|
||||
):
|
||||
if not current_user.is_superadmin:
|
||||
path = re.sub(r"^/v[0-9]", "", websocket.url.path) # remove the prefix (e.g. "/v3") from URL path
|
||||
log.debug(f"Checking user {current_user.username} has privilege {privilege_name} on '{path}'")
|
||||
if not await rbac_repo.check_user_has_privilege(current_user.user_id, path, privilege_name):
|
||||
raise HTTPException(status_code=403, detail=f"Permission denied (privilege {privilege_name} is required)")
|
||||
return current_user
|
||||
return get_user_and_check_privilege
|
||||
|
||||
# class PrivilegeChecker:
|
||||
#
|
||||
# def __init__(self, required_privilege: str) -> None:
|
||||
# self._required_privilege = required_privilege
|
||||
#
|
||||
# async def __call__(
|
||||
# self,
|
||||
# current_user: schemas.User = Depends(get_current_active_user),
|
||||
# rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
|
||||
# ) -> bool:
|
||||
#
|
||||
# if not await rbac_repo.check_user_has_privilege(current_user.user_id, "/projects", self._required_privilege):
|
||||
# raise HTTPException(status_code=403, detail=f"Permission denied (privilege {self._required_privilege} is required)")
|
||||
# return True
|
||||
|
||||
# Depends(PrivilegeChecker("Project.Audit"))
|
@ -1,161 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (C) 2021 GNS3 Technologies Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
API routes for permissions.
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from fastapi import APIRouter, Depends, Response, Request, status
|
||||
from fastapi.routing import APIRoute
|
||||
from uuid import UUID
|
||||
from typing import List
|
||||
|
||||
|
||||
from gns3server import schemas
|
||||
from gns3server.controller.controller_error import (
|
||||
ControllerBadRequestError,
|
||||
ControllerNotFoundError,
|
||||
ControllerForbiddenError,
|
||||
)
|
||||
|
||||
from gns3server.db.repositories.rbac import RbacRepository
|
||||
from .dependencies.database import get_repository
|
||||
from .dependencies.authentication import get_current_active_user
|
||||
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("", response_model=List[schemas.Permission])
|
||||
async def get_permissions(
|
||||
rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
|
||||
) -> List[schemas.Permission]:
|
||||
"""
|
||||
Get all permissions.
|
||||
"""
|
||||
|
||||
return await rbac_repo.get_permissions()
|
||||
|
||||
|
||||
@router.post("", response_model=schemas.Permission, status_code=status.HTTP_201_CREATED)
|
||||
async def create_permission(
|
||||
request: Request,
|
||||
permission_create: schemas.PermissionCreate,
|
||||
current_user: schemas.User = Depends(get_current_active_user),
|
||||
rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
|
||||
) -> schemas.Permission:
|
||||
"""
|
||||
Create a new permission.
|
||||
"""
|
||||
|
||||
# TODO: should we prevent having multiple permissions with same methods/path?
|
||||
#if await rbac_repo.check_permission_exists(permission_create):
|
||||
# raise ControllerBadRequestError(f"Permission '{permission_create.methods} {permission_create.path} "
|
||||
# f"{permission_create.action}' already exists")
|
||||
|
||||
for route in request.app.routes:
|
||||
if isinstance(route, APIRoute):
|
||||
|
||||
# remove the prefix (e.g. "/v3") from the route path
|
||||
route_path = re.sub(r"^/v[0-9]", "", route.path)
|
||||
# replace route path ID parameters by an UUID regex
|
||||
route_path = re.sub(r"{\w+_id}", "[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}", route_path)
|
||||
# replace remaining route path parameters by an word matching regex
|
||||
route_path = re.sub(r"/{[\w:]+}", r"/\\w+", route_path)
|
||||
|
||||
# the permission can match multiple routes
|
||||
if permission_create.path.endswith("/*"):
|
||||
route_path += r"/.*"
|
||||
|
||||
if re.fullmatch(route_path, permission_create.path):
|
||||
for method in permission_create.methods:
|
||||
if method in list(route.methods):
|
||||
# check user has the right to add the permission (i.e has already to right on the path)
|
||||
if not await rbac_repo.check_user_is_authorized(current_user.user_id, method, permission_create.path):
|
||||
raise ControllerForbiddenError(f"User '{current_user.username}' doesn't have the rights to "
|
||||
f"add a permission on {method} {permission_create.path} or "
|
||||
f"the endpoint doesn't exist")
|
||||
return await rbac_repo.create_permission(permission_create)
|
||||
|
||||
raise ControllerBadRequestError(f"Permission '{permission_create.methods} {permission_create.path}' "
|
||||
f"doesn't match any existing endpoint")
|
||||
|
||||
|
||||
@router.get("/{permission_id}", response_model=schemas.Permission)
|
||||
async def get_permission(
|
||||
permission_id: UUID,
|
||||
rbac_repo: RbacRepository = Depends(get_repository(RbacRepository)),
|
||||
) -> schemas.Permission:
|
||||
"""
|
||||
Get a permission.
|
||||
"""
|
||||
|
||||
permission = await rbac_repo.get_permission(permission_id)
|
||||
if not permission:
|
||||
raise ControllerNotFoundError(f"Permission '{permission_id}' not found")
|
||||
return permission
|
||||
|
||||
|
||||
@router.put("/{permission_id}", response_model=schemas.Permission)
|
||||
async def update_permission(
|
||||
permission_id: UUID,
|
||||
permission_update: schemas.PermissionUpdate,
|
||||
rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
|
||||
) -> schemas.Permission:
|
||||
"""
|
||||
Update a permission.
|
||||
"""
|
||||
|
||||
permission = await rbac_repo.get_permission(permission_id)
|
||||
if not permission:
|
||||
raise ControllerNotFoundError(f"Permission '{permission_id}' not found")
|
||||
|
||||
return await rbac_repo.update_permission(permission_id, permission_update)
|
||||
|
||||
|
||||
@router.delete("/{permission_id}", status_code=status.HTTP_204_NO_CONTENT)
|
||||
async def delete_permission(
|
||||
permission_id: UUID,
|
||||
rbac_repo: RbacRepository = Depends(get_repository(RbacRepository)),
|
||||
) -> None:
|
||||
"""
|
||||
Delete a permission.
|
||||
"""
|
||||
|
||||
permission = await rbac_repo.get_permission(permission_id)
|
||||
if not permission:
|
||||
raise ControllerNotFoundError(f"Permission '{permission_id}' not found")
|
||||
|
||||
success = await rbac_repo.delete_permission(permission_id)
|
||||
if not success:
|
||||
raise ControllerNotFoundError(f"Permission '{permission_id}' could not be deleted")
|
||||
|
||||
|
||||
@router.post("/prune", status_code=status.HTTP_204_NO_CONTENT)
|
||||
async def prune_permissions(
|
||||
rbac_repo: RbacRepository = Depends(get_repository(RbacRepository))
|
||||
) -> None:
|
||||
"""
|
||||
Prune orphaned permissions.
|
||||
"""
|
||||
|
||||
await rbac_repo.prune_permissions()
|
@ -0,0 +1,46 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (C) 2023 GNS3 Technologies Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from sqlalchemy import Column, String, Boolean, ForeignKey, CheckConstraint
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from .base import BaseTable, generate_uuid, GUID
|
||||
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ACE(BaseTable):
|
||||
|
||||
__tablename__ = "acl"
|
||||
|
||||
ace_id = Column(GUID, primary_key=True, default=generate_uuid)
|
||||
ace_type: str = Column(String)
|
||||
path = Column(String)
|
||||
propagate = Column(Boolean, default=True)
|
||||
allowed = Column(Boolean, default=True)
|
||||
user_id = Column(GUID, ForeignKey('users.user_id', ondelete="CASCADE"))
|
||||
user = relationship("User", back_populates="acl_entries")
|
||||
group_id = Column(GUID, ForeignKey('user_groups.user_group_id', ondelete="CASCADE"))
|
||||
group = relationship("UserGroup", back_populates="acl_entries")
|
||||
role_id = Column(GUID, ForeignKey('roles.role_id', ondelete="CASCADE"))
|
||||
role = relationship("Role", back_populates="acl_entries")
|
||||
|
||||
__table_args__ = (
|
||||
CheckConstraint("(user_id IS NOT NULL AND ace_type = 'user') OR (group_id IS NOT NULL AND ace_type = 'group')"),
|
||||
)
|
@ -1,128 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (C) 2021 GNS3 Technologies Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from sqlalchemy import Table, Column, String, ForeignKey, event
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from .base import Base, BaseTable, generate_uuid, GUID, ListType
|
||||
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
permission_role_link = Table(
|
||||
"permissions_roles_link",
|
||||
Base.metadata,
|
||||
Column("permission_id", GUID, ForeignKey("permissions.permission_id", ondelete="CASCADE")),
|
||||
Column("role_id", GUID, ForeignKey("roles.role_id", ondelete="CASCADE"))
|
||||
|
||||
)
|
||||
|
||||
|
||||
class Permission(BaseTable):
|
||||
|
||||
__tablename__ = "permissions"
|
||||
|
||||
permission_id = Column(GUID, primary_key=True, default=generate_uuid)
|
||||
description = Column(String)
|
||||
methods = Column(ListType)
|
||||
path = Column(String)
|
||||
action = Column(String)
|
||||
user_id = Column(GUID, ForeignKey('users.user_id', ondelete="CASCADE"))
|
||||
roles = relationship("Role", secondary=permission_role_link, back_populates="permissions")
|
||||
|
||||
|
||||
@event.listens_for(Permission.__table__, 'after_create')
|
||||
def create_default_roles(target, connection, **kw):
|
||||
|
||||
default_permissions = [
|
||||
{
|
||||
"description": "Allow access to all endpoints",
|
||||
"methods": ["GET", "POST", "PUT", "DELETE"],
|
||||
"path": "/",
|
||||
"action": "ALLOW"
|
||||
},
|
||||
{
|
||||
"description": "Allow to receive controller notifications",
|
||||
"methods": ["GET"],
|
||||
"path": "/notifications",
|
||||
"action": "ALLOW"
|
||||
},
|
||||
{
|
||||
"description": "Allow to create and list projects",
|
||||
"methods": ["GET", "POST"],
|
||||
"path": "/projects",
|
||||
"action": "ALLOW"
|
||||
},
|
||||
{
|
||||
"description": "Allow to create and list templates",
|
||||
"methods": ["GET", "POST"],
|
||||
"path": "/templates",
|
||||
"action": "ALLOW"
|
||||
},
|
||||
{
|
||||
"description": "Allow to list computes",
|
||||
"methods": ["GET"],
|
||||
"path": "/computes/*",
|
||||
"action": "ALLOW"
|
||||
},
|
||||
{
|
||||
"description": "Allow access to all symbol endpoints",
|
||||
"methods": ["GET", "POST"],
|
||||
"path": "/symbols/*",
|
||||
"action": "ALLOW"
|
||||
},
|
||||
]
|
||||
|
||||
stmt = target.insert().values(default_permissions)
|
||||
connection.execute(stmt)
|
||||
connection.commit()
|
||||
log.debug("The default permissions have been created in the database")
|
||||
|
||||
|
||||
@event.listens_for(permission_role_link, 'after_create')
|
||||
def add_permissions_to_role(target, connection, **kw):
|
||||
|
||||
from .roles import Role
|
||||
roles_table = Role.__table__
|
||||
stmt = roles_table.select().where(roles_table.c.name == "Administrator")
|
||||
result = connection.execute(stmt)
|
||||
role_id = result.first().role_id
|
||||
|
||||
permissions_table = Permission.__table__
|
||||
stmt = permissions_table.select().where(permissions_table.c.path == "/")
|
||||
result = connection.execute(stmt)
|
||||
permission_id = result.first().permission_id
|
||||
|
||||
# add root path to the "Administrator" role
|
||||
stmt = target.insert().values(permission_id=permission_id, role_id=role_id)
|
||||
connection.execute(stmt)
|
||||
|
||||
stmt = roles_table.select().where(roles_table.c.name == "User")
|
||||
result = connection.execute(stmt)
|
||||
role_id = result.first().role_id
|
||||
|
||||
# add minimum required paths to the "User" role
|
||||
for path in ("/notifications", "/projects", "/templates", "/computes/*", "/symbols/*"):
|
||||
stmt = permissions_table.select().where(permissions_table.c.path == path)
|
||||
result = connection.execute(stmt)
|
||||
permission_id = result.first().permission_id
|
||||
stmt = target.insert().values(permission_id=permission_id, role_id=role_id)
|
||||
connection.execute(stmt)
|
||||
|
||||
connection.commit()
|
@ -0,0 +1,347 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (C) 2023 GNS3 Technologies Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from sqlalchemy import Table, Column, String, ForeignKey, event
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from .base import Base, BaseTable, generate_uuid, GUID
|
||||
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
privilege_role_map = Table(
|
||||
"privilege_role_map",
|
||||
Base.metadata,
|
||||
Column("privilege_id", GUID, ForeignKey("privileges.privilege_id", ondelete="CASCADE")),
|
||||
Column("role_id", GUID, ForeignKey("roles.role_id", ondelete="CASCADE"))
|
||||
)
|
||||
|
||||
|
||||
class Privilege(BaseTable):
|
||||
|
||||
__tablename__ = "privileges"
|
||||
|
||||
privilege_id = Column(GUID, primary_key=True, default=generate_uuid)
|
||||
name = Column(String)
|
||||
description = Column(String)
|
||||
roles = relationship("Role", secondary=privilege_role_map, back_populates="privileges")
|
||||
|
||||
|
||||
@event.listens_for(Privilege.__table__, 'after_create')
|
||||
def create_default_roles(target, connection, **kw):
|
||||
|
||||
default_privileges = [
|
||||
{
|
||||
"description": "Create or delete a user",
|
||||
"name": "User.Allocate"
|
||||
},
|
||||
{
|
||||
"description": "View a user",
|
||||
"name": "User.Audit"
|
||||
},
|
||||
{
|
||||
"description": "Update a user",
|
||||
"name": "User.Modify"
|
||||
},
|
||||
{
|
||||
"description": "Create or delete a group",
|
||||
"name": "Group.Allocate"
|
||||
},
|
||||
{
|
||||
"description": "View a group",
|
||||
"name": "Group.Audit"
|
||||
},
|
||||
{
|
||||
"description": "Update a group",
|
||||
"name": "Group.Modify"
|
||||
},
|
||||
{
|
||||
"description": "Create or delete a role",
|
||||
"name": "Role.Allocate"
|
||||
},
|
||||
{
|
||||
"description": "View a role",
|
||||
"name": "Role.Audit"
|
||||
},
|
||||
{
|
||||
"description": "Update a role",
|
||||
"name": "Role.Modify"
|
||||
},
|
||||
{
|
||||
"description": "Create or delete an ACE",
|
||||
"name": "ACE.Allocate"
|
||||
},
|
||||
{
|
||||
"description": "View an ACE",
|
||||
"name": "ACE.Audit"
|
||||
},
|
||||
{
|
||||
"description": "Update an ACE",
|
||||
"name": "ACE.Modify"
|
||||
},
|
||||
{
|
||||
"description": "Create or delete a template",
|
||||
"name": "Template.Allocate"
|
||||
},
|
||||
{
|
||||
"description": "View a template",
|
||||
"name": "Template.Audit"
|
||||
},
|
||||
{
|
||||
"description": "Update a template",
|
||||
"name": "Template.Modify"
|
||||
},
|
||||
{
|
||||
"description": "Create or delete a project",
|
||||
"name": "Project.Allocate"
|
||||
},
|
||||
{
|
||||
"description": "View a project",
|
||||
"name": "Project.Audit"
|
||||
},
|
||||
{
|
||||
"description": "Update a project",
|
||||
"name": "Project.Modify"
|
||||
},
|
||||
{
|
||||
"description": "Create or delete project snapshots",
|
||||
"name": "Snapshot.Allocate"
|
||||
},
|
||||
{
|
||||
"description": "Restore a snapshot",
|
||||
"name": "Snapshot.Restore"
|
||||
},
|
||||
{
|
||||
"description": "View a snapshot",
|
||||
"name": "Snapshot.Audit"
|
||||
},
|
||||
{
|
||||
"description": "Create or delete a node",
|
||||
"name": "Node.Allocate"
|
||||
},
|
||||
{
|
||||
"description": "View a node",
|
||||
"name": "Node.Audit"
|
||||
},
|
||||
{
|
||||
"description": "Update a node",
|
||||
"name": "Node.Modify"
|
||||
},
|
||||
{
|
||||
"description": "Console access to a node",
|
||||
"name": "Node.Console"
|
||||
},
|
||||
{
|
||||
"description": "Power management for a node",
|
||||
"name": "Node.PowerMgmt"
|
||||
},
|
||||
{
|
||||
"description": "Create or delete a link",
|
||||
"name": "Link.Allocate"
|
||||
},
|
||||
{
|
||||
"description": "View a link",
|
||||
"name": "Link.Audit"
|
||||
},
|
||||
{
|
||||
"description": "Update a link",
|
||||
"name": "Link.Modify"
|
||||
},
|
||||
{
|
||||
"description": "Capture packets on a link",
|
||||
"name": "Link.Capture"
|
||||
},
|
||||
{
|
||||
"description": "Create or delete a drawing",
|
||||
"name": "Drawing.Allocate"
|
||||
},
|
||||
{
|
||||
"description": "View a drawing",
|
||||
"name": "Drawing.Audit"
|
||||
},
|
||||
{
|
||||
"description": "Update a drawing",
|
||||
"name": "Drawing.Modify"
|
||||
},
|
||||
{
|
||||
"description": "Create or delete a symbol",
|
||||
"name": "Symbol.Allocate"
|
||||
},
|
||||
{
|
||||
"description": "View a symbol",
|
||||
"name": "Symbol.Audit"
|
||||
},
|
||||
{
|
||||
"description": "Create or delete an image",
|
||||
"name": "Image.Allocate"
|
||||
},
|
||||
{
|
||||
"description": "View an image",
|
||||
"name": "Image.Audit"
|
||||
},
|
||||
{
|
||||
"description": "Create or delete a compute",
|
||||
"name": "Compute.Allocate"
|
||||
},
|
||||
{
|
||||
"description": "Update a compute",
|
||||
"name": "Compute.Modify"
|
||||
},
|
||||
{
|
||||
"description": "View a compute",
|
||||
"name": "Compute.Audit"
|
||||
},
|
||||
{
|
||||
"description": "Install an appliance",
|
||||
"name": "Appliance.Allocate"
|
||||
},
|
||||
{
|
||||
"description": "View an appliance",
|
||||
"name": "Appliance.Audit"
|
||||
}
|
||||
]
|
||||
|
||||
stmt = target.insert().values(default_privileges)
|
||||
connection.execute(stmt)
|
||||
connection.commit()
|
||||
log.debug("The default privileges have been created in the database")
|
||||
|
||||
|
||||
def add_privileges_to_role(target, connection, role, privileges):
|
||||
|
||||
from .roles import Role
|
||||
roles_table = Role.__table__
|
||||
privileges_table = Privilege.__table__
|
||||
|
||||
stmt = roles_table.select().where(roles_table.c.name == role)
|
||||
result = connection.execute(stmt)
|
||||
role_id = result.first().role_id
|
||||
for privilege_name in privileges:
|
||||
stmt = privileges_table.select().where(privileges_table.c.name == privilege_name)
|
||||
result = connection.execute(stmt)
|
||||
privilege_id = result.first().privilege_id
|
||||
stmt = target.insert().values(privilege_id=privilege_id, role_id=role_id)
|
||||
connection.execute(stmt)
|
||||
|
||||
|
||||
@event.listens_for(privilege_role_map, 'after_create')
|
||||
def add_privileges_to_default_roles(target, connection, **kw):
|
||||
|
||||
from .roles import Role
|
||||
roles_table = Role.__table__
|
||||
stmt = roles_table.select().where(roles_table.c.name == "Administrator")
|
||||
result = connection.execute(stmt)
|
||||
role_id = result.first().role_id
|
||||
|
||||
# add all privileges to the "Administrator" role
|
||||
privileges_table = Privilege.__table__
|
||||
stmt = privileges_table.select()
|
||||
result = connection.execute(stmt)
|
||||
for row in result:
|
||||
privilege_id = row.privilege_id
|
||||
stmt = target.insert().values(privilege_id=privilege_id, role_id=role_id)
|
||||
connection.execute(stmt)
|
||||
|
||||
# add required privileges to the "User" role
|
||||
user_privileges = (
|
||||
"Project.Allocate",
|
||||
"Project.Audit",
|
||||
"Project.Modify",
|
||||
"Snapshot.Allocate",
|
||||
"Snapshot.Audit",
|
||||
"Snapshot.Restore",
|
||||
"Node.Allocate",
|
||||
"Node.Audit",
|
||||
"Node.Modify",
|
||||
"Node.Console",
|
||||
"Node.PowerMgmt",
|
||||
"Link.Allocate",
|
||||
"Link.Audit",
|
||||
"Link.Modify",
|
||||
"Link.Capture",
|
||||
"Drawing.Allocate",
|
||||
"Drawing.Audit",
|
||||
"Drawing.Modify",
|
||||
"Template.Audit",
|
||||
"Symbol.Audit",
|
||||
"Image.Audit",
|
||||
"Compute.Audit",
|
||||
"Appliance.Allocate",
|
||||
"Appliance.Audit"
|
||||
)
|
||||
|
||||
add_privileges_to_role(target, connection, "User", user_privileges)
|
||||
|
||||
# add required privileges to the "Auditor" role
|
||||
auditor_privileges = (
|
||||
"Project.Audit",
|
||||
"Snapshot.Audit",
|
||||
"Node.Audit",
|
||||
"Link.Audit",
|
||||
"Drawing.Audit",
|
||||
"Template.Audit",
|
||||
"Symbol.Audit",
|
||||
"Image.Audit",
|
||||
"Compute.Audit",
|
||||
"Appliance.Audit"
|
||||
)
|
||||
|
||||
add_privileges_to_role(target, connection, "Auditor", auditor_privileges)
|
||||
|
||||
# add required privileges to the "Template manager" role
|
||||
template_manager_privileges = (
|
||||
"Template.Allocate",
|
||||
"Template.Audit",
|
||||
"Template.Modify",
|
||||
"Symbol.Allocate",
|
||||
"Symbol.Audit",
|
||||
"Image.Allocate",
|
||||
"Image.Audit",
|
||||
"Appliance.Allocate",
|
||||
"Appliance.Audit"
|
||||
)
|
||||
|
||||
add_privileges_to_role(target, connection, "Template manager", template_manager_privileges)
|
||||
|
||||
# add required privileges to the "User manager" role
|
||||
user_manager_privileges = (
|
||||
"User.Allocate",
|
||||
"User.Audit",
|
||||
"User.Modify",
|
||||
"Group.Allocate",
|
||||
"Group.Audit",
|
||||
"Group.Modify"
|
||||
)
|
||||
|
||||
add_privileges_to_role(target, connection, "User manager", user_manager_privileges)
|
||||
|
||||
# add required privileges to the "ACL manager" role
|
||||
acl_manager_privileges = (
|
||||
"Role.Allocate",
|
||||
"Role.Audit",
|
||||
"Role.Modify",
|
||||
"ACE.Allocate",
|
||||
"ACE.Audit",
|
||||
"ACE.Modify"
|
||||
)
|
||||
|
||||
add_privileges_to_role(target, connection, "ACL manager", acl_manager_privileges)
|
||||
|
||||
connection.commit()
|
||||
log.debug("Privileges have been added to the default roles in the database")
|
@ -0,0 +1,52 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (C) 2023 GNS3 Technologies Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from sqlalchemy import Table, Column, String, ForeignKey
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from .base import Base, BaseTable, generate_uuid, GUID
|
||||
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
resource_pool_map = Table(
|
||||
"resource_pool_map",
|
||||
Base.metadata,
|
||||
Column("resource_id", GUID, ForeignKey("resources.resource_id", ondelete="CASCADE")),
|
||||
Column("resource_pool_id", GUID, ForeignKey("resource_pools.resource_pool_id", ondelete="CASCADE"))
|
||||
)
|
||||
|
||||
|
||||
class Resource(BaseTable):
|
||||
|
||||
__tablename__ = "resources"
|
||||
|
||||
resource_id = Column(GUID, primary_key=True)
|
||||
name = Column(String, unique=True, index=True)
|
||||
resource_type = Column(String)
|
||||
resource_pools = relationship("ResourcePool", secondary=resource_pool_map, back_populates="resources")
|
||||
|
||||
|
||||
class ResourcePool(BaseTable):
|
||||
|
||||
__tablename__ = "resource_pools"
|
||||
|
||||
resource_pool_id = Column(GUID, primary_key=True, default=generate_uuid)
|
||||
name = Column(String, unique=True, index=True)
|
||||
resources = relationship("Resource", secondary=resource_pool_map, back_populates="resource_pools")
|
@ -0,0 +1,212 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (C) 2023 GNS3 Technologies Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
|
||||
from fastapi import FastAPI, status
|
||||
from httpx import AsyncClient
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from gns3server.db.repositories.users import UsersRepository
|
||||
from gns3server.db.repositories.rbac import RbacRepository
|
||||
from gns3server.schemas.controller.users import User
|
||||
from gns3server.schemas.controller.rbac import ACECreate
|
||||
from gns3server.controller import Controller
|
||||
|
||||
pytestmark = pytest.mark.asyncio
|
||||
|
||||
|
||||
class TestACLRoutes:
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def group_id(self, db_session: AsyncSession) -> str:
|
||||
|
||||
users_repo = UsersRepository(db_session)
|
||||
group_in_db = await users_repo.get_user_group_by_name("Users")
|
||||
group_id = str(group_in_db.user_group_id)
|
||||
return group_id
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def role_id(self, db_session: AsyncSession) -> str:
|
||||
|
||||
rbac_repo = RbacRepository(db_session)
|
||||
role_in_db = await rbac_repo.get_role_by_name("User")
|
||||
role_id = str(role_in_db.role_id)
|
||||
return role_id
|
||||
|
||||
async def test_create_ace(
|
||||
self,
|
||||
app: FastAPI,
|
||||
authorized_client: AsyncClient,
|
||||
db_session: AsyncSession,
|
||||
test_user: User,
|
||||
role_id: str
|
||||
) -> None:
|
||||
|
||||
# allow the user to create an ACE
|
||||
rbac_repo = RbacRepository(db_session)
|
||||
admin_role_id = (await rbac_repo.get_role_by_name("Administrator")).role_id
|
||||
ace = ACECreate(
|
||||
path="/acl",
|
||||
ace_type="user",
|
||||
user_id=test_user.user_id,
|
||||
role_id=admin_role_id
|
||||
)
|
||||
await rbac_repo.create_ace(ace)
|
||||
|
||||
# add an ACE on /projects to allow user to create a project
|
||||
path = f"/projects"
|
||||
new_ace = {
|
||||
"path": path,
|
||||
"ace_type": "user",
|
||||
"user_id": str(test_user.user_id),
|
||||
"role_id": role_id
|
||||
}
|
||||
|
||||
response = await authorized_client.post(app.url_path_for("create_ace"), json=new_ace)
|
||||
assert response.status_code == status.HTTP_201_CREATED
|
||||
|
||||
rbac_repo = RbacRepository(db_session)
|
||||
assert await rbac_repo.check_user_has_privilege(test_user.user_id, path, "Project.Allocate") is True
|
||||
|
||||
response = await authorized_client.post(app.url_path_for("create_project"), json={"name": "test"})
|
||||
assert response.status_code == status.HTTP_201_CREATED
|
||||
|
||||
async def test_create_ace_not_existing_endpoint(
|
||||
self,
|
||||
app: FastAPI,
|
||||
client: AsyncClient,
|
||||
group_id: str,
|
||||
role_id: str
|
||||
) -> None:
|
||||
|
||||
new_ace = {
|
||||
"path": "/projects/invalid",
|
||||
"ace_type": "group",
|
||||
"group_id": group_id,
|
||||
"role_id": role_id
|
||||
}
|
||||
response = await client.post(app.url_path_for("create_ace"), json=new_ace)
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
|
||||
# async def test_create_ace_non_existing_resource(
|
||||
# self,
|
||||
# app: FastAPI,
|
||||
# client: AsyncClient,
|
||||
# group_id: str,
|
||||
# role_id: str
|
||||
# ) -> None:
|
||||
#
|
||||
# new_ace = {
|
||||
# "path": f"/projects/{str(uuid.uuid4())}",
|
||||
# "ace_type": "group",
|
||||
# "group_id": group_id,
|
||||
# "role_id": role_id
|
||||
# }
|
||||
# response = await client.post(app.url_path_for("create_ace"), json=new_ace)
|
||||
# assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
|
||||
async def test_get_ace(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession) -> None:
|
||||
|
||||
rbac_repo = RbacRepository(db_session)
|
||||
ace_in_db = await rbac_repo.get_ace_by_path(f"/projects")
|
||||
response = await client.get(app.url_path_for("get_ace", ace_id=ace_in_db.ace_id))
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert response.json()["ace_id"] == str(ace_in_db.ace_id)
|
||||
|
||||
async def test_list_aces(self, app: FastAPI, client: AsyncClient) -> None:
|
||||
|
||||
response = await client.get(app.url_path_for("get_aces"))
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert len(response.json()) == 2
|
||||
|
||||
async def test_update_ace(
|
||||
self, app: FastAPI,
|
||||
client: AsyncClient,
|
||||
db_session: AsyncSession,
|
||||
test_user: User,
|
||||
role_id: str
|
||||
) -> None:
|
||||
|
||||
rbac_repo = RbacRepository(db_session)
|
||||
ace_in_db = await rbac_repo.get_ace_by_path(f"/projects")
|
||||
|
||||
update_ace = {
|
||||
"path": f"/appliances",
|
||||
"ace_type": "user",
|
||||
"user_id": str(test_user.user_id),
|
||||
"role_id": role_id
|
||||
}
|
||||
response = await client.put(
|
||||
app.url_path_for("update_ace", ace_id=ace_in_db.ace_id),
|
||||
json=update_ace
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
updated_ace_in_db = await rbac_repo.get_ace(ace_in_db.ace_id)
|
||||
assert updated_ace_in_db.path == f"/appliances"
|
||||
|
||||
async def test_delete_ace(
|
||||
self,
|
||||
app: FastAPI,
|
||||
client: AsyncClient,
|
||||
db_session: AsyncSession,
|
||||
) -> None:
|
||||
|
||||
rbac_repo = RbacRepository(db_session)
|
||||
ace_in_db = await rbac_repo.get_ace_by_path(f"/appliances")
|
||||
response = await client.delete(app.url_path_for("delete_ace", ace_id=ace_in_db.ace_id))
|
||||
assert response.status_code == status.HTTP_204_NO_CONTENT
|
||||
|
||||
async def test_ace_cleanup(
|
||||
self,
|
||||
app: FastAPI,
|
||||
authorized_client: AsyncClient,
|
||||
db_session: AsyncSession,
|
||||
test_user: User,
|
||||
role_id: str,
|
||||
) -> None:
|
||||
|
||||
# allow the user to create projects
|
||||
rbac_repo = RbacRepository(db_session)
|
||||
ace = ACECreate(
|
||||
path="/projects",
|
||||
ace_type="user",
|
||||
user_id=test_user.user_id,
|
||||
role_id=role_id
|
||||
)
|
||||
await rbac_repo.create_ace(ace)
|
||||
|
||||
response = await authorized_client.post(app.url_path_for("create_project"), json={"name": "test2"})
|
||||
assert response.status_code == status.HTTP_201_CREATED
|
||||
project_id = response.json()["project_id"]
|
||||
|
||||
path = f"/projects/{project_id}"
|
||||
ace = ACECreate(
|
||||
path=path,
|
||||
ace_type="user",
|
||||
user_id=test_user.user_id,
|
||||
role_id=role_id
|
||||
)
|
||||
await rbac_repo.create_ace(ace)
|
||||
assert await rbac_repo.get_ace_by_path(path)
|
||||
|
||||
response = await authorized_client.delete(app.url_path_for("delete_project", project_id=project_id))
|
||||
assert response.status_code == status.HTTP_204_NO_CONTENT
|
||||
|
||||
# the ACE should have been deleted after deleting the project
|
||||
assert not await rbac_repo.get_ace_by_path(path)
|
@ -1,136 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (C) 2021 GNS3 Technologies Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
import uuid
|
||||
|
||||
from fastapi import FastAPI, status
|
||||
from httpx import AsyncClient
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from gns3server.db.repositories.rbac import RbacRepository
|
||||
from gns3server.controller import Controller
|
||||
from gns3server.controller.project import Project
|
||||
|
||||
pytestmark = pytest.mark.asyncio
|
||||
|
||||
|
||||
class TestPermissionRoutes:
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def project(self, app: FastAPI, client: AsyncClient, controller: Controller) -> Project:
|
||||
|
||||
project_uuid = str(uuid.uuid4())
|
||||
params = {"name": "test", "project_id": project_uuid}
|
||||
await client.post(app.url_path_for("create_project"), json=params)
|
||||
return controller.get_project(project_uuid)
|
||||
|
||||
async def test_create_permission(self, app: FastAPI, client: AsyncClient, project: Project) -> None:
|
||||
|
||||
new_permission = {
|
||||
"methods": ["GET"],
|
||||
"path": f"/projects/{project.id}",
|
||||
"action": "ALLOW"
|
||||
}
|
||||
response = await client.post(app.url_path_for("create_permission"), json=new_permission)
|
||||
assert response.status_code == status.HTTP_201_CREATED
|
||||
|
||||
async def test_create_wildcard_permission(self, app: FastAPI, client: AsyncClient, project: Project) -> None:
|
||||
|
||||
new_permission = {
|
||||
"methods": ["POST"],
|
||||
"path": f"/projects/{project.id}/*",
|
||||
"action": "ALLOW"
|
||||
}
|
||||
|
||||
response = await client.post(app.url_path_for("create_permission"), json=new_permission)
|
||||
assert response.status_code == status.HTTP_201_CREATED
|
||||
|
||||
async def test_create_permission_not_existing_endpoint(self, app: FastAPI, client: AsyncClient) -> None:
|
||||
|
||||
new_permission = {
|
||||
"methods": ["GET"],
|
||||
"path": "/projects/invalid",
|
||||
"action": "ALLOW"
|
||||
}
|
||||
response = await client.post(app.url_path_for("create_permission"), json=new_permission)
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
|
||||
async def test_create_permission_not_existing_object(self, app: FastAPI, client: AsyncClient) -> None:
|
||||
|
||||
new_permission = {
|
||||
"methods": ["GET"],
|
||||
"path": f"/projects/{str(uuid.uuid4())}/*",
|
||||
"action": "ALLOW"
|
||||
}
|
||||
response = await client.post(app.url_path_for("create_permission"), json=new_permission)
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
|
||||
async def test_get_permission(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession, project: Project) -> None:
|
||||
|
||||
rbac_repo = RbacRepository(db_session)
|
||||
permission_in_db = await rbac_repo.get_permission_by_path(f"/projects/{project.id}/*")
|
||||
response = await client.get(app.url_path_for("get_permission", permission_id=permission_in_db.permission_id))
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert response.json()["permission_id"] == str(permission_in_db.permission_id)
|
||||
|
||||
async def test_list_permissions(self, app: FastAPI, client: AsyncClient) -> None:
|
||||
|
||||
response = await client.get(app.url_path_for("get_permissions"))
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert len(response.json()) == 11 # 6 default permissions + 5 custom permissions
|
||||
|
||||
async def test_update_permission(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession, project: Project) -> None:
|
||||
|
||||
rbac_repo = RbacRepository(db_session)
|
||||
permission_in_db = await rbac_repo.get_permission_by_path(f"/projects/{project.id}/*")
|
||||
|
||||
update_permission = {
|
||||
"methods": ["GET"],
|
||||
"path": f"/projects/{project.id}/*",
|
||||
"action": "ALLOW"
|
||||
}
|
||||
response = await client.put(
|
||||
app.url_path_for("update_permission", permission_id=permission_in_db.permission_id),
|
||||
json=update_permission
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
updated_permission_in_db = await rbac_repo.get_permission(permission_in_db.permission_id)
|
||||
assert updated_permission_in_db.path == f"/projects/{project.id}/*"
|
||||
|
||||
async def test_delete_permission(
|
||||
self,
|
||||
app: FastAPI,
|
||||
client: AsyncClient,
|
||||
db_session: AsyncSession,
|
||||
project: Project,
|
||||
) -> None:
|
||||
|
||||
rbac_repo = RbacRepository(db_session)
|
||||
permission_in_db = await rbac_repo.get_permission_by_path(f"/projects/{project.id}/*")
|
||||
response = await client.delete(app.url_path_for("delete_permission", permission_id=permission_in_db.permission_id))
|
||||
assert response.status_code == status.HTTP_204_NO_CONTENT
|
||||
|
||||
async def test_prune_permissions(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession) -> None:
|
||||
|
||||
response = await client.post(app.url_path_for("prune_permissions"))
|
||||
assert response.status_code == status.HTTP_204_NO_CONTENT
|
||||
|
||||
rbac_repo = RbacRepository(db_session)
|
||||
permissions_in_db = await rbac_repo.get_permissions()
|
||||
assert len(permissions_in_db) == 10 # 6 default permissions + 4 custom permissions
|
Loading…
Reference in new issue