From b34662e877862bc94e2c07444d622c29ad52fa89 Mon Sep 17 00:00:00 2001 From: Borgia Leiva Date: Tue, 3 Feb 2026 13:05:10 +0100 Subject: [PATCH] refactored auth.py to use helper methods --- routers/auth.py | 97 +++++++++++++++++++++++-------------------------- 1 file changed, 46 insertions(+), 51 deletions(-) diff --git a/routers/auth.py b/routers/auth.py index b6c8586..1264294 100644 --- a/routers/auth.py +++ b/routers/auth.py @@ -18,7 +18,7 @@ ACCESS_TOKEN_EXPIRE_MINUTES = 1440 # 24 hours def create_access_token(data: dict, expires_delta: timedelta | None = None): """Create a JWT token""" - + to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta @@ -29,38 +29,63 @@ def create_access_token(data: dict, expires_delta: timedelta | None = None): encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt +def verify_jwt_token(token: str): + """Verify and decode a JWT token""" + + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + user_id = payload.get("sub") + if user_id is None: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + ) + return user_id + + except JWTError: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + ) + def get_user_from_jwt(request: Request, db: db_dependency) -> models.User : """Helper function to check for valid JWT token in cookies""" - token = request.cookies.get("access_token") - if not token: + + get_token = request.cookies.get("access_token") + + if not get_token or get_token is None: raise HTTPException( status_code=401, detail="Not logged in" ) try: - payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) - user_id: str = str(payload.get("sub")) - if user_id is None: - request.cookies.clear() ## removing invalid auth cookie - raise HTTPException( - status_code=401, - detail="Not logged in" - ) + user_id: str = verify_jwt_token(get_token) ## verifying token validity + db_user = db.query(models.User).filter(models.User.id == int(user_id)).first() if db_user is None: request.cookies.clear() ## removing invalid auth cookie raise HTTPException( status_code=401, - detail="User not found" + detail="Could not verify credentials" ) return db_user - - except JWTError: + except HTTPException: request.cookies.clear() ## removing invalid auth cookie + raise + +def verify_user_password(user_id: int, password: str, db: db_dependency) -> None: + """Verify user's password""" + db_user = db.query(models.User).filter(models.User.id == user_id).first() + if db_user is None: raise HTTPException( status_code=401, - detail="Could not validate credentials" - ) + detail="Could not verify credentials") + + hashed_password = hash(password=password, salt=str(getattr(db_user,"password_salt")), variant="id") + if hashed_password != db_user.password_hash: + raise HTTPException( + status_code=401, + detail="Could not verify credentials") @router.post("/login") def login(user_data: user_schemas.UserLogin, request: Request, response: Response, db: db_dependency): @@ -78,8 +103,9 @@ def login(user_data: user_schemas.UserLogin, request: Request, response: Respons } } except HTTPException: - pass # Token invalid or expired, proceed to login + request.cookies.clear() ## removing invalid auth cookie + ## check if user exists db_user = db.query(models.User).filter(models.User.email == user_data.email).first() if db_user is None: raise HTTPException( @@ -87,16 +113,12 @@ def login(user_data: user_schemas.UserLogin, request: Request, response: Respons detail="Incorrect email or password" ) - if not verify_user_password(getattr(db_user, "id"), user_data.password, db): - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Incorrect email or password" - ) + verify_user_password(getattr(db_user, "id"), user_data.password, db) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( - data={"sub": str(db_user.id)}, expires_delta=access_token_expires - + data={"sub": str(db_user.id)}, + expires_delta=access_token_expires ) # Set JWT in httpOnly cookie @@ -118,30 +140,3 @@ def login(user_data: user_schemas.UserLogin, request: Request, response: Respons } } - -def verify_jwt_token(token: str): - """Verify and decode a JWT token""" - credentials_exception = HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Could not validate credentials", - ) - try: - payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) - user_id = payload.get("sub") - if user_id is None: - raise credentials_exception - return user_id - except JWTError: - raise credentials_exception - -def verify_user_password(user_id: int, password: str, db: db_dependency) -> bool: - """Verify user's password""" - db_user = db.query(models.User).filter(models.User.id == user_id).first() - if db_user is None: - return False - - hashed_password = hash(password=password, salt=str(getattr(db_user,"password_salt")), variant="id") - if hashed_password != db_user.password_hash: - return False - - return True \ No newline at end of file