mirror of
https://github.com/GNS3/gns3-server
synced 2025-01-13 01:20:58 +00:00
Replace python-jose library by joserfc
This commit is contained in:
parent
8b57fbaa0a
commit
9d6cea665a
@ -14,8 +14,9 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
from jose import JWTError, jwt
|
||||
from joserfc import jwt
|
||||
from joserfc.jwk import OctKey
|
||||
from joserfc.errors import JoseError
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import bcrypt
|
||||
|
||||
@ -23,7 +24,6 @@ from typing import Optional
|
||||
from fastapi import HTTPException, status
|
||||
from gns3server.schemas.controller.tokens import TokenData
|
||||
from gns3server.config import Config
|
||||
from pydantic import ValidationError
|
||||
|
||||
import logging
|
||||
|
||||
@ -56,7 +56,8 @@ class AuthService:
|
||||
secret_key = DEFAULT_JWT_SECRET_KEY
|
||||
log.error("A JWT secret key must be configured to secure the server, using an unsecured default key!")
|
||||
algorithm = Config.instance().settings.Controller.jwt_algorithm
|
||||
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=algorithm)
|
||||
key = OctKey.import_key(secret_key)
|
||||
encoded_jwt = jwt.encode({"alg": algorithm}, to_encode, key)
|
||||
return encoded_jwt
|
||||
|
||||
def get_username_from_token(self, token: str, secret_key: str = None) -> Optional[str]:
|
||||
@ -73,11 +74,12 @@ class AuthService:
|
||||
secret_key = DEFAULT_JWT_SECRET_KEY
|
||||
log.error("A JWT secret key must be configured to secure the server, using an unsecured default key!")
|
||||
algorithm = Config.instance().settings.Controller.jwt_algorithm
|
||||
payload = jwt.decode(token, secret_key, algorithms=[algorithm])
|
||||
username: str = payload.get("sub")
|
||||
key = OctKey.import_key(secret_key)
|
||||
payload = jwt.decode(token, key, algorithms=[algorithm])
|
||||
username: str = payload.claims.get("sub")
|
||||
if username is None:
|
||||
raise credentials_exception
|
||||
token_data = TokenData(username=username)
|
||||
except (JWTError, ValidationError):
|
||||
except (JoseError, ValueError):
|
||||
raise credentials_exception
|
||||
return token_data.username
|
||||
|
@ -15,7 +15,7 @@ sqlalchemy==2.0.36
|
||||
aiosqlite==0.20.0
|
||||
alembic==1.14.0
|
||||
bcrypt==4.2.1
|
||||
python-jose[cryptography]==3.3.0
|
||||
joserfc==1.0.1
|
||||
email-validator==2.2.0
|
||||
watchfiles==1.0.3
|
||||
zstandard==0.23.0
|
||||
|
@ -21,7 +21,8 @@ from typing import Optional
|
||||
from fastapi import FastAPI, HTTPException, status
|
||||
from sqlalchemy import update
|
||||
from httpx import AsyncClient
|
||||
from jose import jwt
|
||||
from joserfc import jwt
|
||||
from joserfc.jwk import OctKey
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from gns3server.db.repositories.users import UsersRepository
|
||||
@ -166,16 +167,23 @@ class TestAuthTokens:
|
||||
|
||||
jwt_secret = config.settings.Controller.jwt_secret_key
|
||||
token = auth_service.create_access_token(test_user.username)
|
||||
payload = jwt.decode(token, jwt_secret, algorithms=["HS256"])
|
||||
username = payload.get("sub")
|
||||
key = OctKey.import_key(jwt_secret)
|
||||
payload = jwt.decode(token, key, algorithms=["HS256"])
|
||||
username = payload.claims.get("sub")
|
||||
assert username == test_user.username
|
||||
|
||||
async def test_token_missing_user_is_invalid(self, app: FastAPI, client: AsyncClient, config: Config) -> None:
|
||||
async def test_decode_token_with_wrong_algorithm(
|
||||
self,
|
||||
app: FastAPI,
|
||||
client: AsyncClient,
|
||||
test_user: User,
|
||||
config: Config
|
||||
) -> None:
|
||||
|
||||
jwt_secret = config.settings.Controller.jwt_secret_key
|
||||
token = auth_service.create_access_token(None)
|
||||
with pytest.raises(jwt.JWTError):
|
||||
jwt.decode(token, jwt_secret, algorithms=["HS256"])
|
||||
token = auth_service.create_access_token(test_user.username)
|
||||
with pytest.raises(ValueError):
|
||||
jwt.decode(token, jwt_secret, algorithms=["ES256"])
|
||||
|
||||
async def test_can_retrieve_username_from_token(
|
||||
self,
|
||||
@ -236,9 +244,10 @@ class TestUserLogin:
|
||||
|
||||
# check that token exists in response and has user encoded within it
|
||||
token = response.json().get("access_token")
|
||||
payload = jwt.decode(token, jwt_secret, algorithms=["HS256"])
|
||||
assert "sub" in payload
|
||||
username = payload.get("sub")
|
||||
key = OctKey.import_key(jwt_secret)
|
||||
payload = jwt.decode(token, key, algorithms=["HS256"])
|
||||
assert "sub" in payload.claims
|
||||
username = payload.claims.get("sub")
|
||||
assert username == test_user.username
|
||||
|
||||
# check that token is proper type
|
||||
|
Loading…
Reference in New Issue
Block a user