mirror of
https://github.com/GNS3/gns3-server
synced 2024-11-15 12:59:06 +00:00
496 lines
18 KiB
Python
496 lines
18 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# Copyright (C) 2020 GNS3 Technologies Inc.
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import pytest
|
|
|
|
from typing import Optional
|
|
from fastapi import FastAPI, HTTPException, status
|
|
from sqlalchemy import update
|
|
from httpx import AsyncClient
|
|
from jose import jwt
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from gns3server.db.repositories.users import UsersRepository
|
|
from gns3server.db.repositories.rbac import RbacRepository
|
|
from gns3server.schemas.controller.rbac import Permission, HTTPMethods, PermissionAction
|
|
from gns3server.services import auth_service
|
|
from gns3server.config import Config
|
|
from gns3server.schemas.controller.users import User
|
|
from gns3server import schemas
|
|
import gns3server.db.models as models
|
|
|
|
pytestmark = pytest.mark.asyncio
|
|
|
|
|
|
class TestUserRoutes:
|
|
|
|
async def test_route_exist(self, app: FastAPI, client: AsyncClient) -> None:
|
|
|
|
new_user = {"username": "user1", "email": "user1@email.com", "password": "test_password"}
|
|
response = await client.post(app.url_path_for("create_user"), json=new_user)
|
|
assert response.status_code == status.HTTP_201_CREATED
|
|
|
|
async def test_users_can_register_successfully(
|
|
self,
|
|
app: FastAPI,
|
|
client: AsyncClient,
|
|
db_session: AsyncSession
|
|
) -> None:
|
|
|
|
user_repo = UsersRepository(db_session)
|
|
params = {"username": "user2", "email": "user2@email.com", "password": "test_password"}
|
|
|
|
# make sure the user doesn't exist in the database
|
|
user_in_db = await user_repo.get_user_by_username(params["username"])
|
|
assert user_in_db is None
|
|
|
|
# register the user
|
|
response = await client.post(app.url_path_for("create_user"), json=params)
|
|
assert response.status_code == status.HTTP_201_CREATED
|
|
|
|
# make sure the user does exists in the database now
|
|
user_in_db = await user_repo.get_user_by_username(params["username"])
|
|
assert user_in_db is not None
|
|
assert user_in_db.email == params["email"]
|
|
assert user_in_db.username == params["username"]
|
|
|
|
# check that the user returned in the response is equal to the user in the database
|
|
created_user = User(**response.json()).json()
|
|
assert created_user == User.from_orm(user_in_db).json()
|
|
|
|
@pytest.mark.parametrize(
|
|
"attr, value, status_code",
|
|
(
|
|
("email", "user2@email.com", status.HTTP_400_BAD_REQUEST),
|
|
("username", "user2", status.HTTP_400_BAD_REQUEST),
|
|
("email", "invalid_email@one@two.io", status.HTTP_422_UNPROCESSABLE_ENTITY),
|
|
("password", "short", status.HTTP_422_UNPROCESSABLE_ENTITY),
|
|
("username", "user2@#$%^<>", status.HTTP_422_UNPROCESSABLE_ENTITY),
|
|
("username", "ab", status.HTTP_422_UNPROCESSABLE_ENTITY),
|
|
)
|
|
)
|
|
async def test_user_registration_fails_when_credentials_are_taken(
|
|
self,
|
|
app: FastAPI,
|
|
client: AsyncClient,
|
|
attr: str,
|
|
value: str,
|
|
status_code: int,
|
|
) -> None:
|
|
|
|
new_user = {"email": "not_taken@email.com", "username": "not_taken_username", "password": "test_password"}
|
|
new_user[attr] = value
|
|
response = await client.post(app.url_path_for("create_user"), json=new_user)
|
|
assert response.status_code == status_code
|
|
|
|
@pytest.mark.parametrize(
|
|
"attr, value, status_code",
|
|
(
|
|
("email", "user@email.com", status.HTTP_200_OK),
|
|
("email", "user@email.com", status.HTTP_400_BAD_REQUEST),
|
|
("username", "user2", status.HTTP_400_BAD_REQUEST),
|
|
("email", "invalid_email@one@two.io", status.HTTP_422_UNPROCESSABLE_ENTITY),
|
|
("password", "short", status.HTTP_422_UNPROCESSABLE_ENTITY),
|
|
("username", "user2@#$%^<>", status.HTTP_422_UNPROCESSABLE_ENTITY),
|
|
("username", "ab", status.HTTP_422_UNPROCESSABLE_ENTITY),
|
|
("full_name", "John Doe", status.HTTP_200_OK),
|
|
("password", "password123", status.HTTP_200_OK),
|
|
("is_active", True, status.HTTP_200_OK),
|
|
)
|
|
)
|
|
async def test_update_user(
|
|
self,
|
|
app: FastAPI,
|
|
client: AsyncClient,
|
|
db_session: AsyncSession,
|
|
attr: str,
|
|
value: str,
|
|
status_code: int,
|
|
) -> None:
|
|
|
|
user_repo = UsersRepository(db_session)
|
|
user_in_db = await user_repo.get_user_by_username("user2")
|
|
update_user = {}
|
|
update_user[attr] = value
|
|
response = await client.put(app.url_path_for("update_user", user_id=user_in_db.user_id), json=update_user)
|
|
assert response.status_code == status_code
|
|
|
|
async def test_users_saved_password_is_hashed(
|
|
self,
|
|
app: FastAPI,
|
|
client: AsyncClient,
|
|
db_session: AsyncSession
|
|
) -> None:
|
|
|
|
user_repo = UsersRepository(db_session)
|
|
new_user = {"username": "user3", "email": "user3@email.com", "password": "test_password"}
|
|
|
|
# send post request to create user and ensure it is successful
|
|
response = await client.post(app.url_path_for("create_user"), json=new_user)
|
|
assert response.status_code == status.HTTP_201_CREATED
|
|
|
|
# ensure that the users password is hashed in the db
|
|
# and that we can verify it using our auth service
|
|
user_in_db = await user_repo.get_user_by_username(new_user["username"])
|
|
assert user_in_db is not None
|
|
assert user_in_db.hashed_password != new_user["password"]
|
|
assert auth_service.verify_password(new_user["password"], user_in_db.hashed_password)
|
|
|
|
async def test_get_users(self, app: FastAPI, client: AsyncClient) -> None:
|
|
|
|
response = await client.get(app.url_path_for("get_users"))
|
|
assert response.status_code == status.HTTP_200_OK
|
|
assert len(response.json()) == 4 # admin, user1, user2 and user3 should exist
|
|
|
|
|
|
class TestAuthTokens:
|
|
|
|
async def test_can_create_token_successfully(
|
|
self,
|
|
app: FastAPI,
|
|
client: AsyncClient,
|
|
test_user: User,
|
|
config: Config
|
|
) -> None:
|
|
|
|
jwt_secret = config.settings.Controller.jwt_secret_key
|
|
token = auth_service.create_access_token(test_user.username)
|
|
payload = jwt.decode(token, jwt_secret, algorithms=["HS256"])
|
|
username = payload.get("sub")
|
|
assert username == test_user.username
|
|
|
|
async def test_token_missing_user_is_invalid(self, app: FastAPI, client: AsyncClient, config: Config) -> None:
|
|
|
|
jwt_secret = config.settings.Controller.jwt_secret_key
|
|
token = auth_service.create_access_token(None)
|
|
with pytest.raises(jwt.JWTError):
|
|
jwt.decode(token, jwt_secret, algorithms=["HS256"])
|
|
|
|
async def test_can_retrieve_username_from_token(
|
|
self,
|
|
app: FastAPI,
|
|
client: AsyncClient,
|
|
test_user: User
|
|
) -> None:
|
|
|
|
token = auth_service.create_access_token(test_user.username)
|
|
username = auth_service.get_username_from_token(token)
|
|
assert username == test_user.username
|
|
|
|
@pytest.mark.parametrize(
|
|
"wrong_secret, wrong_token",
|
|
(
|
|
("use correct secret", "asdf"), # use wrong token
|
|
("use correct secret", ""), # use wrong token
|
|
("ABC123", "use correct token"), # use wrong secret
|
|
),
|
|
)
|
|
async def test_error_when_token_or_secret_is_wrong(
|
|
self,
|
|
app: FastAPI,
|
|
client: AsyncClient,
|
|
test_user: User,
|
|
wrong_secret: str,
|
|
wrong_token: Optional[str],
|
|
config,
|
|
) -> None:
|
|
|
|
token = auth_service.create_access_token(test_user.username)
|
|
if wrong_secret == "use correct secret":
|
|
wrong_secret = config.settings.Controller.jwt_secret_key
|
|
if wrong_token == "use correct token":
|
|
wrong_token = token
|
|
with pytest.raises(HTTPException):
|
|
auth_service.get_username_from_token(wrong_token, secret_key=wrong_secret)
|
|
|
|
|
|
class TestUserLogin:
|
|
|
|
async def test_user_can_login_successfully_and_receives_valid_token(
|
|
self,
|
|
app: FastAPI,
|
|
unauthorized_client: AsyncClient,
|
|
test_user: User,
|
|
config: Config
|
|
) -> None:
|
|
|
|
jwt_secret = config.settings.Controller.jwt_secret_key
|
|
unauthorized_client.headers["content-type"] = "application/x-www-form-urlencoded"
|
|
login_data = {
|
|
"username": test_user.username,
|
|
"password": "user1_password",
|
|
}
|
|
response = await unauthorized_client.post(app.url_path_for("login"), data=login_data)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
# check that token exists in response and has user encoded within it
|
|
token = response.json().get("access_token")
|
|
payload = jwt.decode(token, jwt_secret, algorithms=["HS256"])
|
|
assert "sub" in payload
|
|
username = payload.get("sub")
|
|
assert username == test_user.username
|
|
|
|
# check that token is proper type
|
|
assert "token_type" in response.json()
|
|
assert response.json().get("token_type") == "bearer"
|
|
|
|
async def test_user_can_authenticate_using_json(
|
|
self,
|
|
app: FastAPI,
|
|
unauthorized_client: AsyncClient,
|
|
test_user: User,
|
|
config: Config
|
|
) -> None:
|
|
|
|
credentials = {
|
|
"username": test_user.username,
|
|
"password": "user1_password",
|
|
}
|
|
response = await unauthorized_client.post(app.url_path_for("authenticate"), json=credentials)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
assert response.json().get("access_token")
|
|
|
|
@pytest.mark.parametrize(
|
|
"username, password, status_code",
|
|
(
|
|
("wrong_username", "user1_password", status.HTTP_401_UNAUTHORIZED),
|
|
("user1", "wrong_password", status.HTTP_401_UNAUTHORIZED),
|
|
("user1", None, status.HTTP_422_UNPROCESSABLE_ENTITY),
|
|
),
|
|
)
|
|
async def test_user_with_wrong_creds_doesnt_receive_token(
|
|
self,
|
|
app: FastAPI,
|
|
unauthorized_client: AsyncClient,
|
|
test_user: User,
|
|
username: str,
|
|
password: str,
|
|
status_code: int,
|
|
) -> None:
|
|
|
|
unauthorized_client.headers["content-type"] = "application/x-www-form-urlencoded"
|
|
login_data = {
|
|
"username": username,
|
|
"password": password,
|
|
}
|
|
response = await unauthorized_client.post(app.url_path_for("login"), data=login_data)
|
|
assert response.status_code == status_code
|
|
assert "access_token" not in response.json()
|
|
|
|
|
|
class TestUserMe:
|
|
|
|
async def test_authenticated_user_can_retrieve_own_data(
|
|
self,
|
|
app: FastAPI,
|
|
authorized_client: AsyncClient,
|
|
test_user: User,
|
|
) -> None:
|
|
|
|
response = await authorized_client.get(app.url_path_for("get_logged_in_user"))
|
|
assert response.status_code == status.HTTP_200_OK
|
|
user = User(**response.json())
|
|
assert user.username == test_user.username
|
|
assert user.email == test_user.email
|
|
assert user.user_id == test_user.user_id
|
|
|
|
async def test_user_cannot_access_own_data_if_not_authenticated(
|
|
self, app: FastAPI,
|
|
unauthorized_client: AsyncClient,
|
|
test_user: User,
|
|
) -> None:
|
|
|
|
response = await unauthorized_client.get(app.url_path_for("get_logged_in_user"))
|
|
assert response.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
async def test_authenticated_user_can_update_own_data(
|
|
self,
|
|
app: FastAPI,
|
|
authorized_client: AsyncClient,
|
|
test_user: User,
|
|
) -> None:
|
|
|
|
response = await authorized_client.get(app.url_path_for("get_logged_in_user"))
|
|
assert response.status_code == status.HTTP_200_OK
|
|
user = User(**response.json())
|
|
assert user.username == test_user.username
|
|
assert user.email == test_user.email
|
|
assert user.user_id == test_user.user_id
|
|
|
|
# logged in users can only change their email, full name and password
|
|
@pytest.mark.parametrize(
|
|
"attr, value, status_code",
|
|
(
|
|
("email", "user42@email.com", status.HTTP_200_OK),
|
|
("email", "user42@email.com", status.HTTP_400_BAD_REQUEST),
|
|
("full_name", "John Doe", status.HTTP_200_OK),
|
|
("password", "password123", status.HTTP_200_OK),
|
|
)
|
|
)
|
|
async def test_authenticated_user_can_update_own_data(
|
|
self,
|
|
app: FastAPI,
|
|
authorized_client: AsyncClient,
|
|
attr: str,
|
|
value: str,
|
|
status_code: int,
|
|
) -> None:
|
|
|
|
update_user = {}
|
|
update_user[attr] = value
|
|
response = await authorized_client.put(
|
|
app.url_path_for("update_logged_in_user"),
|
|
json=update_user
|
|
)
|
|
assert response.status_code == status_code
|
|
|
|
|
|
class TestSuperAdmin:
|
|
|
|
async def test_super_admin_exists(
|
|
self,
|
|
app: FastAPI,
|
|
client: AsyncClient,
|
|
db_session: AsyncSession
|
|
) -> None:
|
|
|
|
user_repo = UsersRepository(db_session)
|
|
admin_in_db = await user_repo.get_user_by_username("admin")
|
|
assert admin_in_db is not None
|
|
assert auth_service.verify_password("admin", admin_in_db.hashed_password)
|
|
|
|
async def test_cannot_delete_super_admin(
|
|
self,
|
|
app: FastAPI,
|
|
client: AsyncClient,
|
|
db_session: AsyncSession
|
|
) -> None:
|
|
|
|
user_repo = UsersRepository(db_session)
|
|
admin_in_db = await user_repo.get_user_by_username("admin")
|
|
response = await client.delete(app.url_path_for("delete_user", user_id=admin_in_db.user_id))
|
|
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
|
|
async def test_admin_can_login_after_password_recovery(
|
|
self,
|
|
app: FastAPI,
|
|
unauthorized_client: AsyncClient,
|
|
db_session: AsyncSession
|
|
) -> None:
|
|
|
|
# set the admin password to null in the database
|
|
query = update(models.User).where(models.User.username == "admin").values(hashed_password=None)
|
|
await db_session.execute(query)
|
|
await db_session.commit()
|
|
|
|
unauthorized_client.headers["content-type"] = "application/x-www-form-urlencoded"
|
|
login_data = {
|
|
"username": "admin",
|
|
"password": "whatever",
|
|
}
|
|
response = await unauthorized_client.post(app.url_path_for("login"), data=login_data)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
# async def test_super_admin_belongs_to_admin_group(
|
|
# self,
|
|
# app: FastAPI,
|
|
# client: AsyncClient,
|
|
# db_session: AsyncSession
|
|
# ) -> None:
|
|
#
|
|
# user_repo = UsersRepository(db_session)
|
|
# admin_in_db = await user_repo.get_user_by_username("admin")
|
|
# response = await client.get(app.url_path_for("get_user_memberships", user_id=admin_in_db.user_id))
|
|
# assert response.status_code == status.HTTP_200_OK
|
|
# assert len(response.json()) == 1
|
|
|
|
@pytest.fixture
|
|
async def test_permission(db_session: AsyncSession) -> Permission:
|
|
|
|
new_permission = schemas.PermissionCreate(
|
|
methods=[HTTPMethods.get],
|
|
path="/statistics",
|
|
action=PermissionAction.allow
|
|
)
|
|
rbac_repo = RbacRepository(db_session)
|
|
existing_permission = await rbac_repo.get_permission_by_path("/statistics")
|
|
if existing_permission:
|
|
return existing_permission
|
|
return await rbac_repo.create_permission(new_permission)
|
|
|
|
|
|
class TestUserPermissionsRoutes:
|
|
|
|
async def test_add_permission_to_user(
|
|
self,
|
|
app: FastAPI,
|
|
client: AsyncClient,
|
|
test_user: User,
|
|
test_permission: Permission,
|
|
db_session: AsyncSession
|
|
) -> None:
|
|
|
|
response = await client.put(
|
|
app.url_path_for(
|
|
"add_permission_to_user",
|
|
user_id=str(test_user.user_id),
|
|
permission_id=str(test_permission.permission_id)
|
|
)
|
|
)
|
|
assert response.status_code == status.HTTP_204_NO_CONTENT
|
|
rbac_repo = RbacRepository(db_session)
|
|
permissions = await rbac_repo.get_user_permissions(test_user.user_id)
|
|
assert len(permissions) == 1
|
|
assert permissions[0].permission_id == test_permission.permission_id
|
|
|
|
async def test_get_user_permissions(
|
|
self,
|
|
app: FastAPI,
|
|
client: AsyncClient,
|
|
test_user: User,
|
|
db_session: AsyncSession
|
|
) -> None:
|
|
|
|
response = await client.get(
|
|
app.url_path_for(
|
|
"get_user_permissions",
|
|
user_id=str(test_user.user_id))
|
|
)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
assert len(response.json()) == 1
|
|
|
|
async def test_remove_permission_from_user(
|
|
self,
|
|
app: FastAPI,
|
|
client: AsyncClient,
|
|
test_user: User,
|
|
test_permission: Permission,
|
|
db_session: AsyncSession
|
|
) -> None:
|
|
|
|
response = await client.delete(
|
|
app.url_path_for(
|
|
"remove_permission_from_user",
|
|
user_id=str(test_user.user_id),
|
|
permission_id=str(test_permission.permission_id)
|
|
),
|
|
)
|
|
assert response.status_code == status.HTTP_204_NO_CONTENT
|
|
rbac_repo = RbacRepository(db_session)
|
|
permissions = await rbac_repo.get_user_permissions(test_user.user_id)
|
|
assert len(permissions) == 0
|