import json import smtplib import os import uuid from email.mime.text import MIMEText import logging import traceback from passlib.context import CryptContext from src.core.database import get_async_session from sqlmodel.ext.asyncio.session import AsyncSession from jose import jwt, JWTError from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from datetime import datetime, timedelta from cryptography.fernet import Fernet, InvalidToken from fastapi import Depends, HTTPException, status from src.core.models import Users from src.core.config import settings SECRET_KEY = settings.SECRET_KEY ALGORITHM = settings.JWT_ALGORITHM ACCESS_TOKEN_EXPIRE_MINUTES = settings.JWT_EXPIRE logger = logging.getLogger(__name__) SMTP_SERVER = settings.EMAIL_SERVER SMTP_PORT = settings.EMAIL_PORT SMTP_EMAIL = settings.EMAIL_USERNAME SMTP_PASSWORD = settings.EMAIL_PASSWORD FERNET_KEY = settings.FERNET_KEY VERIFICATION_BASE_URL = settings.VERIFICATION_BASE_URL pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def hash_password(password: str) -> str: """Encrypt plain password into hashed password""" return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """Compare plain password with stored hash""" return pwd_context.verify(plain_password, hashed_password) def create_access_token(data: dict): """Create JWT token with expiry""" to_encode = data.copy() expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt # def send_verification_email(to_email: str, token: str): # """Send verification email using smtplib with detailed debug logs.""" # subject = f"Verify your {settings.APP_NAME} Account" # verification_link = f"{VERIFICATION_BASE_URL}/auth/verify-email?token={token}" # body = f""" # Hi, # Please verify your {settings.APP_NAME} account by clicking the link below: # {verification_link} # This link will expire in 24 hours. # Regards, # {settings.APP_NAME} Team # """ # msg = MIMEText(body) # msg["Subject"] = subject # msg["From"] = SMTP_EMAIL # msg["To"] = to_email # logger.info("🟢 Starting send_verification_email()") # logger.info(f"📨 To: {to_email}") # logger.info(f"📤 SMTP Server: {SMTP_SERVER}:{SMTP_PORT}") # try: # logger.info("🔌 Connecting to SMTP server...") # with smtplib.SMTP(SMTP_SERVER, SMTP_PORT, timeout=30) as server: # logger.info("✅ Connected successfully.") # logger.info("🔒 Starting TLS...") # server.starttls() # logger.info("✅ TLS secured.") # logger.info("🔑 Logging in to SMTP server...") # server.login(SMTP_EMAIL, SMTP_PASSWORD) # logger.info("✅ Logged in successfully.") # # Send email # logger.info("📧 Sending email message...") # server.send_message(msg) # logger.info(f"✅ Email successfully sent to {to_email}") # except smtplib.SMTPAuthenticationError as e: # logger.error("❌ Authentication failed — check email or app password.") # logger.error(f"Error details: {e}") # logger.error(traceback.format_exc()) # raise # except smtplib.SMTPConnectError as e: # logger.error("❌ Could not connect to SMTP server.") # logger.error(f"Error details: {e}") # logger.error(traceback.format_exc()) # raise # except smtplib.SMTPRecipientsRefused as e: # logger.error("❌ Recipient address refused.") # logger.error(f"Error details: {e}") # logger.error(traceback.format_exc()) # raise # except smtplib.SMTPException as e: # logger.error("❌ General SMTP error occurred.") # logger.error(f"Error details: {e}") # logger.error(traceback.format_exc()) # raise # except Exception as e: # logger.error("❌ Unknown error occurred while sending verification email.") # logger.error(f"Error details: {e}") # logger.error(traceback.format_exc()) # raise fernet = Fernet(FERNET_KEY.encode()) def create_verification_token(user_id: str, expires_in_hours: int = 24) -> str: """Create encrypted token with expiry""" payload = { "sub": user_id, "exp": (datetime.utcnow() + timedelta(hours=expires_in_hours)).timestamp(), } token = fernet.encrypt(json.dumps(payload).encode()).decode() return token async def verify_verification_token(token: str) -> str: """Verify encrypted token and extract user_id""" try: decrypted = fernet.decrypt(token.encode()) data = json.loads(decrypted.decode()) exp = datetime.fromtimestamp(data["exp"]) if datetime.utcnow() > exp: raise ValueError("Verification link expired") return data["sub"] except InvalidToken: raise ValueError("Invalid verification link") bearer_scheme = HTTPBearer() def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme), ): """Decode JWT token and extract current user ID""" token = credentials.credentials try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user_id: str = payload.get("sub") if user_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token: Missing user id", ) return user_id except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", ) async def get_current_active_user( session: AsyncSession = Depends(get_async_session), user_id: str = Depends(get_current_user), ) -> Users: """Return the full user model for the currently authenticated user.""" user = await session.get(Users, uuid.UUID(user_id)) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) if not user.is_verified: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User not verified" ) return user def create_refresh_token(data: dict, expires_days: int = 7): """Create a long-lived JWT refresh token""" to_encode = data.copy() expire = datetime.utcnow() + timedelta(days=expires_days) to_encode.update({"exp": expire, "type": "refresh"}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt