diff --git a/gns3server/db/models/privileges.py b/gns3server/db/models/privileges.py index b3da8ebf..2bd746df 100644 --- a/gns3server/db/models/privileges.py +++ b/gns3server/db/models/privileges.py @@ -171,6 +171,14 @@ def create_default_roles(target, connection, **kw): "description": "View a compute", "name": "Compute.Audit" }, + { + "description": "Install an appliance", + "name": "Appliance.Allocate" + }, + { + "description": "View an appliance", + "name": "Appliance.Audit" + } ] stmt = target.insert().values(default_privileges) @@ -235,7 +243,9 @@ def add_privileges_to_default_roles(target, connection, **kw): "Template.Audit", "Symbol.Audit", "Image.Audit", - "Compute.Audit" + "Compute.Audit", + "Appliance.Allocate", + "Appliance.Audit" ) add_privileges_to_role(target, connection, "User", user_privileges) @@ -249,7 +259,8 @@ def add_privileges_to_default_roles(target, connection, **kw): "Template.Audit", "Symbol.Audit", "Image.Audit", - "Compute.Audit" + "Compute.Audit", + "Appliance.Audit" ) add_privileges_to_role(target, connection, "Auditor", auditor_privileges) diff --git a/gns3server/db/repositories/rbac.py b/gns3server/db/repositories/rbac.py index 6277e856..418e8775 100644 --- a/gns3server/db/repositories/rbac.py +++ b/gns3server/db/repositories/rbac.py @@ -303,18 +303,8 @@ class RbacRepository(BaseRepository): result = await self._db_session.execute(query) log.debug(f"{result.rowcount} ACE(s) have been deleted") - async def check_user_has_privilege(self, user_id: UUID, path: str, privilege_name: str) -> bool: - - #TODO: handle when user belong to one or more groups (left join?) - query = select(models.ACE.path, models.ACE.propagate, models.ACE.allowed, models.Privilege.name).\ - join(models.Privilege.roles).\ - join(models.Role.acl_entries).\ - join(models.ACE.user).\ - filter(models.User.user_id == user_id).\ - filter(models.Privilege.name == privilege_name).\ - order_by(models.ACE.path.desc()) - result = await self._db_session.execute(query) - aces = result.all() + @staticmethod + def _match_path_to_aces(path: str, aces) -> bool: parsed_url = urlparse(path) original_path = path @@ -327,7 +317,55 @@ class RbacRepository(BaseRepository): for ace_path, ace_propagate, ace_allowed, ace_privilege in aces: if ace_path == path: if not ace_allowed: - return False + raise PermissionError(f"Permission denied for {path}") if path == original_path or ace_propagate: return True # only allow if the path is the original path or the ACE is set to propagate return False + + async def check_user_has_privilege(self, user_id: UUID, path: str, privilege_name: str) -> bool: + """ + Resource paths form a file system like tree and privileges can be inherited by paths down that tree + (the propagate field is True by default) + + The following inheritance rules are used: + + * Privileges for individual users always replace group privileges. + * Privileges for groups apply when the user is member of that group. + * Privileges on deeper levels replace those inherited from an upper level. + """ + + # retrieve all user ACEs matching the user_id and privilege name + query = select(models.ACE.path, models.ACE.propagate, models.ACE.allowed, models.Privilege.name).\ + join(models.Privilege.roles).\ + join(models.Role.acl_entries).\ + join(models.ACE.user). \ + filter(models.User.user_id == user_id).\ + filter(models.Privilege.name == privilege_name).\ + order_by(models.ACE.path.desc()) + + result = await self._db_session.execute(query) + aces = result.all() + + try: + if self._match_path_to_aces(path, aces): + # the user has an ACE matching the path and privilege,there is no need to check group ACEs + return True + except PermissionError: + return False + + # retrieve all group ACEs matching the user_id and privilege name + query = select(models.ACE.path, models.ACE.propagate, models.ACE.allowed, models.Privilege.name). \ + join(models.Privilege.roles). \ + join(models.Role.acl_entries). \ + join(models.ACE.group). \ + join(models.UserGroup.users).\ + filter(models.User.user_id == user_id). \ + filter(models.Privilege.name == privilege_name) + + result = await self._db_session.execute(query) + aces = result.all() + + try: + return self._match_path_to_aces(path, aces) + except PermissionError: + return False