|
|
|
@ -19,15 +19,16 @@ import pytest
|
|
|
|
|
|
|
|
|
|
from typing import Optional
|
|
|
|
|
from fastapi import FastAPI, HTTPException, status
|
|
|
|
|
from sqlalchemy import update
|
|
|
|
|
from httpx import AsyncClient
|
|
|
|
|
from jose import jwt
|
|
|
|
|
|
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
|
from gns3server.db.repositories.users import UsersRepository
|
|
|
|
|
from gns3server.services import auth_service
|
|
|
|
|
from gns3server.services.authentication import DEFAULT_JWT_SECRET_KEY
|
|
|
|
|
from gns3server.config import Config
|
|
|
|
|
from gns3server.schemas.controller.users import User
|
|
|
|
|
import gns3server.db.models as models
|
|
|
|
|
|
|
|
|
|
pytestmark = pytest.mark.asyncio
|
|
|
|
|
|
|
|
|
@ -38,7 +39,7 @@ class TestUserRoutes:
|
|
|
|
|
|
|
|
|
|
new_user = {"username": "user1", "email": "user1@email.com", "password": "test_password"}
|
|
|
|
|
response = await client.post(app.url_path_for("create_user"), json=new_user)
|
|
|
|
|
assert response.status_code != status.HTTP_404_NOT_FOUND
|
|
|
|
|
assert response.status_code == status.HTTP_201_CREATED
|
|
|
|
|
|
|
|
|
|
async def test_users_can_register_successfully(
|
|
|
|
|
self,
|
|
|
|
@ -188,18 +189,18 @@ class TestUserLogin:
|
|
|
|
|
async def test_user_can_login_successfully_and_receives_valid_token(
|
|
|
|
|
self,
|
|
|
|
|
app: FastAPI,
|
|
|
|
|
client: AsyncClient,
|
|
|
|
|
unauthorized_client: AsyncClient,
|
|
|
|
|
test_user: User,
|
|
|
|
|
config: Config
|
|
|
|
|
) -> None:
|
|
|
|
|
|
|
|
|
|
jwt_secret = config.settings.Controller.jwt_secret_key
|
|
|
|
|
client.headers["content-type"] = "application/x-www-form-urlencoded"
|
|
|
|
|
unauthorized_client.headers["content-type"] = "application/x-www-form-urlencoded"
|
|
|
|
|
login_data = {
|
|
|
|
|
"username": test_user.username,
|
|
|
|
|
"password": "user1_password",
|
|
|
|
|
}
|
|
|
|
|
res = await client.post(app.url_path_for("login"), data=login_data)
|
|
|
|
|
res = await unauthorized_client.post(app.url_path_for("login"), data=login_data)
|
|
|
|
|
assert res.status_code == status.HTTP_200_OK
|
|
|
|
|
|
|
|
|
|
# check that token exists in response and has user encoded within it
|
|
|
|
@ -224,19 +225,19 @@ class TestUserLogin:
|
|
|
|
|
async def test_user_with_wrong_creds_doesnt_receive_token(
|
|
|
|
|
self,
|
|
|
|
|
app: FastAPI,
|
|
|
|
|
client: AsyncClient,
|
|
|
|
|
unauthorized_client: AsyncClient,
|
|
|
|
|
test_user: User,
|
|
|
|
|
username: str,
|
|
|
|
|
password: str,
|
|
|
|
|
status_code: int,
|
|
|
|
|
) -> None:
|
|
|
|
|
|
|
|
|
|
client.headers["content-type"] = "application/x-www-form-urlencoded"
|
|
|
|
|
unauthorized_client.headers["content-type"] = "application/x-www-form-urlencoded"
|
|
|
|
|
login_data = {
|
|
|
|
|
"username": username,
|
|
|
|
|
"password": password,
|
|
|
|
|
}
|
|
|
|
|
res = await client.post(app.url_path_for("login"), data=login_data)
|
|
|
|
|
res = await unauthorized_client.post(app.url_path_for("login"), data=login_data)
|
|
|
|
|
assert res.status_code == status_code
|
|
|
|
|
assert "access_token" not in res.json()
|
|
|
|
|
|
|
|
|
@ -259,11 +260,11 @@ class TestUserMe:
|
|
|
|
|
|
|
|
|
|
async def test_user_cannot_access_own_data_if_not_authenticated(
|
|
|
|
|
self, app: FastAPI,
|
|
|
|
|
client: AsyncClient,
|
|
|
|
|
unauthorized_client: AsyncClient,
|
|
|
|
|
test_user: User,
|
|
|
|
|
) -> None:
|
|
|
|
|
|
|
|
|
|
res = await client.get(app.url_path_for("get_current_active_user"))
|
|
|
|
|
res = await unauthorized_client.get(app.url_path_for("get_current_active_user"))
|
|
|
|
|
assert res.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -284,11 +285,31 @@ class TestSuperAdmin:
|
|
|
|
|
async def test_cannot_delete_super_admin(
|
|
|
|
|
self,
|
|
|
|
|
app: FastAPI,
|
|
|
|
|
admin_client: AsyncClient,
|
|
|
|
|
client: AsyncClient,
|
|
|
|
|
db_session: AsyncSession
|
|
|
|
|
) -> None:
|
|
|
|
|
|
|
|
|
|
user_repo = UsersRepository(db_session)
|
|
|
|
|
admin_in_db = await user_repo.get_user_by_username("admin")
|
|
|
|
|
res = await admin_client.delete(app.url_path_for("delete_user", user_id=admin_in_db.user_id))
|
|
|
|
|
res = await client.delete(app.url_path_for("delete_user", user_id=admin_in_db.user_id))
|
|
|
|
|
assert res.status_code == status.HTTP_403_FORBIDDEN
|
|
|
|
|
|
|
|
|
|
async def test_admin_can_login_after_password_recovery(
|
|
|
|
|
self,
|
|
|
|
|
app: FastAPI,
|
|
|
|
|
unauthorized_client: AsyncClient,
|
|
|
|
|
db_session: AsyncSession
|
|
|
|
|
) -> None:
|
|
|
|
|
|
|
|
|
|
# set the admin password to null in the database
|
|
|
|
|
query = update(models.User).where(models.User.username == "admin").values(hashed_password=None)
|
|
|
|
|
await db_session.execute(query)
|
|
|
|
|
await db_session.commit()
|
|
|
|
|
|
|
|
|
|
unauthorized_client.headers["content-type"] = "application/x-www-form-urlencoded"
|
|
|
|
|
login_data = {
|
|
|
|
|
"username": "admin",
|
|
|
|
|
"password": "whatever",
|
|
|
|
|
}
|
|
|
|
|
res = await unauthorized_client.post(app.url_path_for("login"), data=login_data)
|
|
|
|
|
assert res.status_code == status.HTTP_200_OK
|
|
|
|
|