Spaces:
Sleeping
Sleeping
| # src/payslip/utils.py | |
| from datetime import date, datetime | |
| from typing import Optional | |
| from dateutil.relativedelta import relativedelta | |
| from fastapi import Depends, HTTPException | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from jose import jwt, JWTError | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select | |
| from src.core.database import get_async_session | |
| from src.core.models import Users | |
| from src.core.config import settings | |
| from cryptography.fernet import Fernet | |
| from src.core.config import settings | |
| fernet = Fernet(settings.FERNET_KEY.encode()) | |
| def encrypt_token(token: str) -> str: | |
| """Encrypts a refresh token before saving to DB.""" | |
| return fernet.encrypt(token.encode()).decode() | |
| def decrypt_token(token: str) -> str: | |
| """Decrypts a stored refresh token when needed.""" | |
| return fernet.decrypt(token.encode()).decode() | |
| bearer_scheme = HTTPBearer() | |
| SECRET_KEY = settings.SECRET_KEY | |
| ALGORITHM = settings.JWT_ALGORITHM | |
| def _parse_month(month_str: str) -> date: | |
| """ | |
| "2024-05" -> date(2024, 5, 1) | |
| """ | |
| try: | |
| d = datetime.strptime(month_str, "%Y-%m") | |
| return date(d.year, d.month, 1) | |
| except ValueError: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid month format. Use YYYY-MM, e.g. 2024-05", | |
| ) | |
| def validate_join_date(join_date: Optional[str], period_start: date): | |
| if not join_date: | |
| join = date(2020, 4, 1) | |
| else: | |
| join = datetime.strptime(join_date, "%Y-%m-%d").date() | |
| if period_start < join: | |
| raise HTTPException( | |
| 400, | |
| f"You joined on {join}. You cannot request payslips before joining date.", | |
| ) | |
| def calculate_period(mode: str, start_month: str = None, end_month: str = None): | |
| """ | |
| mode: | |
| - "3_months" | |
| - "6_months" | |
| - "manual" + start_month, end_month in "YYYY-MM" | |
| """ | |
| today = date.today() | |
| if mode == "3_months": | |
| end = today.replace(day=1) | |
| start = end - relativedelta(months=3) | |
| return start, end | |
| if mode == "6_months": | |
| end = today.replace(day=1) | |
| start = end - relativedelta(months=6) | |
| return start, end | |
| if mode == "manual": | |
| # Validate fields | |
| if not start_month or not end_month: | |
| raise HTTPException(400, "Manual mode requires start_month and end_month") | |
| try: | |
| start = datetime.strptime(start_month, "%Y-%m").date() | |
| end = datetime.strptime(end_month, "%Y-%m").date() | |
| except ValueError: | |
| raise HTTPException(400, "Invalid month format. Use YYYY-MM") | |
| if start > end: | |
| raise HTTPException(400, "Start month cannot be after end month") | |
| return start, end | |
| # Invalid mode | |
| raise HTTPException(400, "Invalid payslip request mode") | |
| async def get_current_user_model( | |
| credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme), | |
| session: AsyncSession = Depends(get_async_session), | |
| ): | |
| token = credentials.credentials | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| user_id = payload.get("sub") | |
| if not user_id: | |
| raise HTTPException(401, "Invalid token") | |
| result = await session.execute(select(Users).where(Users.id == user_id)) | |
| user = result.scalar_one_or_none() | |
| if not user: | |
| raise HTTPException(401, "User not found") | |
| return user | |
| except JWTError: | |
| raise HTTPException(401, "Invalid or expired token") | |