refactored auth.py to use helper methods

This commit is contained in:
Marta Borgia Leiva 2026-02-03 13:05:10 +01:00
parent a3fb4903ed
commit b34662e877

View file

@ -18,7 +18,7 @@ ACCESS_TOKEN_EXPIRE_MINUTES = 1440 # 24 hours
def create_access_token(data: dict, expires_delta: timedelta | None = None): def create_access_token(data: dict, expires_delta: timedelta | None = None):
"""Create a JWT token""" """Create a JWT token"""
to_encode = data.copy() to_encode = data.copy()
if expires_delta: if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta expire = datetime.now(timezone.utc) + expires_delta
@ -29,38 +29,63 @@ def create_access_token(data: dict, expires_delta: timedelta | None = None):
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt return encoded_jwt
def verify_jwt_token(token: str):
"""Verify and decode a JWT token"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
return user_id
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
def get_user_from_jwt(request: Request, db: db_dependency) -> models.User : def get_user_from_jwt(request: Request, db: db_dependency) -> models.User :
"""Helper function to check for valid JWT token in cookies""" """Helper function to check for valid JWT token in cookies"""
token = request.cookies.get("access_token")
if not token: get_token = request.cookies.get("access_token")
if not get_token or get_token is None:
raise HTTPException( raise HTTPException(
status_code=401, status_code=401,
detail="Not logged in" detail="Not logged in"
) )
try: try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user_id: str = verify_jwt_token(get_token) ## verifying token validity
user_id: str = str(payload.get("sub"))
if user_id is None:
request.cookies.clear() ## removing invalid auth cookie
raise HTTPException(
status_code=401,
detail="Not logged in"
)
db_user = db.query(models.User).filter(models.User.id == int(user_id)).first() db_user = db.query(models.User).filter(models.User.id == int(user_id)).first()
if db_user is None: if db_user is None:
request.cookies.clear() ## removing invalid auth cookie request.cookies.clear() ## removing invalid auth cookie
raise HTTPException( raise HTTPException(
status_code=401, status_code=401,
detail="User not found" detail="Could not verify credentials"
) )
return db_user return db_user
except HTTPException:
except JWTError:
request.cookies.clear() ## removing invalid auth cookie request.cookies.clear() ## removing invalid auth cookie
raise
def verify_user_password(user_id: int, password: str, db: db_dependency) -> None:
"""Verify user's password"""
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if db_user is None:
raise HTTPException( raise HTTPException(
status_code=401, status_code=401,
detail="Could not validate credentials" detail="Could not verify credentials")
)
hashed_password = hash(password=password, salt=str(getattr(db_user,"password_salt")), variant="id")
if hashed_password != db_user.password_hash:
raise HTTPException(
status_code=401,
detail="Could not verify credentials")
@router.post("/login") @router.post("/login")
def login(user_data: user_schemas.UserLogin, request: Request, response: Response, db: db_dependency): def login(user_data: user_schemas.UserLogin, request: Request, response: Response, db: db_dependency):
@ -78,8 +103,9 @@ def login(user_data: user_schemas.UserLogin, request: Request, response: Respons
} }
} }
except HTTPException: except HTTPException:
pass # Token invalid or expired, proceed to login request.cookies.clear() ## removing invalid auth cookie
## check if user exists
db_user = db.query(models.User).filter(models.User.email == user_data.email).first() db_user = db.query(models.User).filter(models.User.email == user_data.email).first()
if db_user is None: if db_user is None:
raise HTTPException( raise HTTPException(
@ -87,16 +113,12 @@ def login(user_data: user_schemas.UserLogin, request: Request, response: Respons
detail="Incorrect email or password" detail="Incorrect email or password"
) )
if not verify_user_password(getattr(db_user, "id"), user_data.password, db): verify_user_password(getattr(db_user, "id"), user_data.password, db)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password"
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token( access_token = create_access_token(
data={"sub": str(db_user.id)}, expires_delta=access_token_expires data={"sub": str(db_user.id)},
expires_delta=access_token_expires
) )
# Set JWT in httpOnly cookie # Set JWT in httpOnly cookie
@ -118,30 +140,3 @@ def login(user_data: user_schemas.UserLogin, request: Request, response: Respons
} }
} }
def verify_jwt_token(token: str):
"""Verify and decode a JWT token"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("sub")
if user_id is None:
raise credentials_exception
return user_id
except JWTError:
raise credentials_exception
def verify_user_password(user_id: int, password: str, db: db_dependency) -> bool:
"""Verify user's password"""
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if db_user is None:
return False
hashed_password = hash(password=password, salt=str(getattr(db_user,"password_salt")), variant="id")
if hashed_password != db_user.password_hash:
return False
return True