Started implementing auth

This commit is contained in:
Marta Borgia Leiva 2026-02-02 11:09:27 +01:00
parent e557c25789
commit b909a23fa3
7 changed files with 279 additions and 19 deletions

View file

@ -1,3 +1,5 @@
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
@ -8,15 +10,16 @@ from fastapi import Depends
from sqlalchemy.orm import Session
from typing import Annotated
URL_DATABASE = "sqlite:///./kanban_clone.db"
engine = create_engine(URL_DATABASE)
engine = create_engine(URL_DATABASE, echo=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def init_db() -> None:
# Import models so they are registered with SQLAlchemy metadata
import models # noqa: F401
Base.metadata.create_all(bind=engine)
def get_db():

63
main.py
View file

@ -1,11 +1,33 @@
from fastapi import FastAPI, HTTPException, Depends
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from routers.projects import router as projects_router
from routers.users import router as users_router
from routers.auth import router as auth_router
from routers.me import router as me_router
from database import init_db
app = FastAPI()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Place for startup and shutdown events if needed in the future
init_db()
yield
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000", "http://localhost:5173"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(auth_router)
app.include_router(users_router)
app.include_router(me_router)
app.include_router(projects_router)
"""ping pong :)"""
@ -21,3 +43,40 @@ def source():
## TODO: Add root endpoint that gives basic info about the API
## TODO: Add more detailed error handling and logging
## TODO: Implement authentication and authorization mechanisms
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
"""Custom HTTP exception handler"""
return JSONResponse(
status_code=exc.status_code,
content={
"error": {
"message": exc.detail,
"type": "authentication_error" if exc.status_code == 401 else "authorization_error",
"status_code": exc.status_code
}
},
headers=exc.headers
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request, exc):
"""Handle validation errors"""
return JSONResponse(
status_code=422,
content={
"error": {
"message": "Validation error",
"type": "validation_error",
"details": exc.errors()
}
}
)

View file

@ -1,6 +1,6 @@
from sqlalchemy import Column, ForeignKey, String, Integer, Table
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.sqlite import BLOB
from sqlalchemy.orm import relationship
from database import Base
from typing import Optional, List
@ -14,16 +14,17 @@ project_user = Table(
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
name = Column(String, index=True)
email = Column(String, unique=True, index=True)
password_hash = Column(BLOB)
password_hash = Column(String)
password_salt = Column(String)
projects = relationship("Project", secondary=project_user, back_populates="users")
class Project(Base):
__tablename__ = "projects"
id = Column(Integer, primary_key=True, index=True)
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
name = Column(String, index=True)
description = Column(String)
users = relationship("User", secondary=project_user, back_populates="projects")
@ -32,7 +33,7 @@ class Project(Base):
class Task(Base):
__tablename__ = "tasks"
id = Column(Integer, primary_key=True, index=True)
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
title = Column(String, index=True)
description = Column(String)
status = Column(String, default="pending")

126
routers/auth.py Normal file
View file

@ -0,0 +1,126 @@
import os
from fastapi import APIRouter, Depends, HTTPException, status, Response
from database import db_dependency
from jose import JWTError, jwt
from datetime import datetime, timedelta, timezone
import models
import schemas.users as user_schemas
import routers.users as user_router
from pyargon2 import hash
router = APIRouter(prefix="/auth", tags=["auth"])
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-change-this-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 1440 # 24 hours
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
to_encode.update({"iat": datetime.now(timezone.utc)})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@router.post("/login")
def login(user_data: user_schemas.UserLogin, response: Response, db: db_dependency):
"""Login and receive JWT token in cookie"""
db_user = db.query(models.User).filter(models.User.email == user_data.email).first()
if db_user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password"
)
if not verify_user_password(getattr(db_user, "id"), user_data.password, db):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password"
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": str(db_user.id)}, expires_delta=access_token_expires
)
# Set JWT in httpOnly cookie
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
samesite="lax",
secure=False # Set to True in production with HTTPS
)
return {
"message": "Login successful",
"user": {
"id": db_user.id,
"name": db_user.name,
"email": db_user.email
}
}
@router.post("/logout")
def logout(response: Response):
"""Logout by clearing the JWT cookie"""
response.delete_cookie(key="access_token")
return {"message": "Logout successful"}
def verify_jwt_token(token: str):
"""Verify and decode a JWT token"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("sub")
if user_id is None:
raise credentials_exception
return user_id
except JWTError:
raise credentials_exception
def get_current_user(request, db: db_dependency):
"""Get current authenticated user from cookie"""
token = request.cookies.get("access_token")
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated"
)
user_id = verify_jwt_token(token)
user = db.query(models.User).filter(models.User.id == int(user_id)).first()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
return user
def verify_user_password(user_id: int, password: str, db: db_dependency) -> bool:
"""Verify user's password"""
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if db_user is None:
return False
hashed_password = hash(password=password, salt=str(getattr(db_user,"password_salt")), variant="id")
if hashed_password != db_user.password_hash:
return False
return True

45
routers/me.py Normal file
View file

@ -0,0 +1,45 @@
from fastapi import APIRouter, Depends, HTTPException, status, Response, Request
from database import db_dependency
from jose import JWTError, jwt
from datetime import datetime, timedelta, timezone
import models
import os
from routers import auth
import schemas.users as user_schemas
import routers.users as user_router
router = APIRouter(prefix="/me", tags=["me"])
@router.get("/", response_model=user_schemas.UserBase)
def get_me(request: Request, db: db_dependency):
"""Get current authenticated user"""
token = request.cookies.get("access_token")
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not logged in"
)
try:
payload = jwt.decode(token, auth.SECRET_KEY, algorithms=[auth.ALGORITHM])
user_id: str = str(payload.get("sub"))
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not logged in"
)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)
db_user = db.query(models.User).filter(models.User.id == int(user_id)).first()
if db_user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
return db_user

View file

@ -1,3 +1,4 @@
import os
from typing import List
from fastapi import APIRouter, HTTPException, Depends
from database import db_dependency
@ -7,10 +8,11 @@ import models
import schemas.users as users
import schemas.projects as projects
from pyargon2 import hash
import pyargon2
router = APIRouter(prefix="/users", tags=["users"])
"""Get a user by ID"""
@router.get("/{user_id}", response_model=users.UserBase)
@ -28,8 +30,8 @@ def update_user(user_id: int, user: users.UserBase, db: db_dependency):
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
db_user.name = user.name
db_user.email = user.email
setattr(db_user, "name", user.name)
setattr(db_user, "email", user.email)
db.commit()
db.refresh(db_user)
return db_user
@ -52,12 +54,31 @@ def read_projects_from_user(user_id: int, db: db_dependency):
"""Create a new user"""
@router.post("/", response_model=users.UserBase)
def create_user(user: users.UserBase, db: db_dependency):
def create_user(user: users.UserCreate, db: db_dependency):
user_salt = os.urandom(32).hex()
print("Generated salt:", user_salt)
hashed_password = hash(password=user.password, salt=user_salt, variant="id")
db_user = models.User(
name=user.name,
email=user.email
email=user.email,
password_hash=hashed_password,
password_salt=user_salt
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@router.delete("/{user_id}")
def delete_user(user_id: int, db: db_dependency):
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
db.delete(db_user)
db.commit()
return {"detail": "User deleted"}

View file

@ -8,7 +8,9 @@ class UserBase(BaseModel):
name: str
email: str
class UserCreate(UserBase):
class UserCreate(BaseModel):
name: str
email: str
password: str
class UserUpdateInfo(BaseModel):
@ -19,3 +21,6 @@ class UserUpdatePassword(BaseModel):
password: str
new_password: str
class UserLogin(BaseModel):
email: str
password: str