mirror of
https://github.com/a-mayb3/Kanban_clone_backend.git
synced 2026-03-21 10:05:38 +01:00
145 lines
No EOL
4.7 KiB
Python
145 lines
No EOL
4.7 KiB
Python
import os
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request, status, Response
|
|
from database import db_dependency
|
|
from jose import JWTError, jwt
|
|
from datetime import datetime, timedelta, timezone
|
|
import models
|
|
|
|
import schemas.users as user_schemas
|
|
|
|
from pyargon2 import hash
|
|
|
|
router = APIRouter(prefix="/auth", tags=["auth"])
|
|
|
|
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-change-this-in-production")
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 1440 # 24 hours
|
|
|
|
def create_access_token(data: dict, expires_delta: timedelta | None = None):
|
|
to_encode = data.copy()
|
|
if expires_delta:
|
|
expire = datetime.now(timezone.utc) + expires_delta
|
|
else:
|
|
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
to_encode.update({"exp": expire})
|
|
to_encode.update({"iat": datetime.now(timezone.utc)})
|
|
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
return encoded_jwt
|
|
|
|
def check_for_valid_token(request: Request, db: db_dependency) -> models.User :
|
|
"""Helper function to check for valid JWT token in cookies"""
|
|
token = request.cookies.get("access_token")
|
|
if not token:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Not logged in"
|
|
)
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
user_id: str = str(payload.get("sub"))
|
|
if user_id is None:
|
|
request.cookies.clear() ## removing invalid auth cookie
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Not logged in"
|
|
)
|
|
db_user = db.query(models.User).filter(models.User.id == int(user_id)).first()
|
|
if db_user is None:
|
|
request.cookies.clear() ## removing invalid auth cookie
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="User not found"
|
|
)
|
|
return db_user
|
|
|
|
except JWTError:
|
|
request.cookies.clear() ## removing invalid auth cookie
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Could not validate credentials"
|
|
)
|
|
|
|
@router.post("/login")
|
|
def login(user_data: user_schemas.UserLogin, request: Request, response: Response, db: db_dependency):
|
|
"""Login and receive JWT token in cookie"""
|
|
|
|
## check if access token already exists
|
|
get_token = request.cookies.get("access_token")
|
|
if get_token:
|
|
try:
|
|
user_id = verify_jwt_token(get_token)
|
|
return {
|
|
"message": "Already logged in",
|
|
"user": {
|
|
"id": user_id
|
|
}
|
|
}
|
|
except HTTPException:
|
|
pass # Token invalid or expired, proceed to login
|
|
|
|
db_user = db.query(models.User).filter(models.User.email == user_data.email).first()
|
|
if db_user is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect email or password"
|
|
)
|
|
|
|
if not verify_user_password(getattr(db_user, "id"), user_data.password, db):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect email or password"
|
|
)
|
|
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={"sub": str(db_user.id)}, expires_delta=access_token_expires
|
|
|
|
)
|
|
|
|
# Set JWT in httpOnly cookie
|
|
response.set_cookie(
|
|
key="access_token",
|
|
value=access_token,
|
|
httponly=True,
|
|
max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
|
|
samesite="lax",
|
|
secure=False # Set to True in production with HTTPS
|
|
)
|
|
|
|
return {
|
|
"message": "Login successful",
|
|
"user": {
|
|
"id": db_user.id,
|
|
"name": db_user.name,
|
|
"email": db_user.email
|
|
}
|
|
}
|
|
|
|
|
|
def verify_jwt_token(token: str):
|
|
"""Verify and decode a JWT token"""
|
|
credentials_exception = HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Could not validate credentials",
|
|
)
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
user_id = payload.get("sub")
|
|
if user_id is None:
|
|
raise credentials_exception
|
|
return user_id
|
|
except JWTError:
|
|
raise credentials_exception
|
|
|
|
def verify_user_password(user_id: int, password: str, db: db_dependency) -> bool:
|
|
"""Verify user's password"""
|
|
db_user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if db_user is None:
|
|
return False
|
|
|
|
hashed_password = hash(password=password, salt=str(getattr(db_user,"password_salt")), variant="id")
|
|
if hashed_password != db_user.password_hash:
|
|
return False
|
|
|
|
return True |