diff --git a/gns3server/services/authentication.py b/gns3server/services/authentication.py index 9dbf1616..477c24a8 100644 --- a/gns3server/services/authentication.py +++ b/gns3server/services/authentication.py @@ -14,8 +14,9 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . - -from jose import JWTError, jwt +from joserfc import jwt +from joserfc.jwk import OctKey +from joserfc.errors import JoseError from datetime import datetime, timedelta, timezone import bcrypt @@ -23,7 +24,6 @@ from typing import Optional from fastapi import HTTPException, status from gns3server.schemas.controller.tokens import TokenData from gns3server.config import Config -from pydantic import ValidationError import logging @@ -56,7 +56,8 @@ class AuthService: secret_key = DEFAULT_JWT_SECRET_KEY log.error("A JWT secret key must be configured to secure the server, using an unsecured default key!") algorithm = Config.instance().settings.Controller.jwt_algorithm - encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=algorithm) + key = OctKey.import_key(secret_key) + encoded_jwt = jwt.encode({"alg": algorithm}, to_encode, key) return encoded_jwt def get_username_from_token(self, token: str, secret_key: str = None) -> Optional[str]: @@ -73,11 +74,12 @@ class AuthService: secret_key = DEFAULT_JWT_SECRET_KEY log.error("A JWT secret key must be configured to secure the server, using an unsecured default key!") algorithm = Config.instance().settings.Controller.jwt_algorithm - payload = jwt.decode(token, secret_key, algorithms=[algorithm]) - username: str = payload.get("sub") + key = OctKey.import_key(secret_key) + payload = jwt.decode(token, key, algorithms=[algorithm]) + username: str = payload.claims.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) - except (JWTError, ValidationError): + except (JoseError, ValueError): raise credentials_exception return token_data.username diff --git a/requirements.txt b/requirements.txt index 3882f22f..fb4eb280 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,7 @@ sqlalchemy==2.0.36 aiosqlite==0.20.0 alembic==1.14.0 bcrypt==4.2.1 -python-jose[cryptography]==3.3.0 +joserfc==1.0.1 email-validator==2.2.0 watchfiles==1.0.3 zstandard==0.23.0 diff --git a/tests/api/routes/controller/test_users.py b/tests/api/routes/controller/test_users.py index 88df89d9..416fe45b 100644 --- a/tests/api/routes/controller/test_users.py +++ b/tests/api/routes/controller/test_users.py @@ -21,7 +21,8 @@ from typing import Optional from fastapi import FastAPI, HTTPException, status from sqlalchemy import update from httpx import AsyncClient -from jose import jwt +from joserfc import jwt +from joserfc.jwk import OctKey from sqlalchemy.ext.asyncio import AsyncSession from gns3server.db.repositories.users import UsersRepository @@ -166,16 +167,23 @@ class TestAuthTokens: jwt_secret = config.settings.Controller.jwt_secret_key token = auth_service.create_access_token(test_user.username) - payload = jwt.decode(token, jwt_secret, algorithms=["HS256"]) - username = payload.get("sub") + key = OctKey.import_key(jwt_secret) + payload = jwt.decode(token, key, algorithms=["HS256"]) + username = payload.claims.get("sub") assert username == test_user.username - async def test_token_missing_user_is_invalid(self, app: FastAPI, client: AsyncClient, config: Config) -> None: + async def test_decode_token_with_wrong_algorithm( + self, + app: FastAPI, + client: AsyncClient, + test_user: User, + config: Config + ) -> None: jwt_secret = config.settings.Controller.jwt_secret_key - token = auth_service.create_access_token(None) - with pytest.raises(jwt.JWTError): - jwt.decode(token, jwt_secret, algorithms=["HS256"]) + token = auth_service.create_access_token(test_user.username) + with pytest.raises(ValueError): + jwt.decode(token, jwt_secret, algorithms=["ES256"]) async def test_can_retrieve_username_from_token( self, @@ -236,9 +244,10 @@ class TestUserLogin: # check that token exists in response and has user encoded within it token = response.json().get("access_token") - payload = jwt.decode(token, jwt_secret, algorithms=["HS256"]) - assert "sub" in payload - username = payload.get("sub") + key = OctKey.import_key(jwt_secret) + payload = jwt.decode(token, key, algorithms=["HS256"]) + assert "sub" in payload.claims + username = payload.claims.get("sub") assert username == test_user.username # check that token is proper type