Spaces:
Sleeping
Sleeping
| # ============================================================= | |
| # π USTP Student Handbook Assistant (2023 Edition) | |
| # ============================================================= | |
| # Enhanced: dynamic model selection + real (printed) page numbering | |
| import os | |
| import glob | |
| import json | |
| import time | |
| from typing import List, Dict, Any | |
| import numpy as np | |
| import streamlit as st | |
| import PyPDF2 | |
| import requests | |
| from dotenv import load_dotenv | |
| from huggingface_hub import InferenceClient, login | |
| from streamlit_chat import message as st_message | |
| # Optional: FAISS for fast vector search | |
| try: | |
| import faiss | |
| except ImportError: | |
| faiss = None | |
| # ============================================================= | |
| # π Startup Fix for PermissionError | |
| # ============================================================= | |
| os.environ["STREAMLIT_HOME"] = "/tmp/.streamlit" | |
| os.makedirs("/tmp/.streamlit", exist_ok=True) | |
| # ============================================================= | |
| # βοΈ Streamlit Page Setup | |
| # ============================================================= | |
| st.set_page_config(page_title="π Handbook Assistant", page_icon="π", layout="wide") | |
| st.title("π USTP Student Handbook Assistant (2023 Edition)") | |
| st.caption("Answers sourced only from the official *USTP Student Handbook 2023 Edition.pdf*.") | |
| load_dotenv() | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| if not HF_TOKEN: | |
| st.warning("β οΈ No Hugging Face API token found in .env file. Online models will be unavailable.") | |
| else: | |
| try: | |
| login(HF_TOKEN) | |
| except Exception: | |
| pass | |
| hf_client = InferenceClient(token=HF_TOKEN) if HF_TOKEN else None | |
| # ============================================================= | |
| # βοΈ Sidebar Configuration | |
| # ============================================================= | |
| with st.sidebar: | |
| st.header("βοΈ Settings") | |
| model_options = { | |
| "Qwen 2.5 14B Instruct": "Qwen/Qwen2.5-14B-Instruct", | |
| "Mistral 7B Instruct": "mistralai/Mistral-7B-Instruct-v0.3", | |
| "Llama 3 8B Instruct": "meta-llama/Meta-Llama-3-8B-Instruct", | |
| "Mixtral 8x7B Instruct": "mistralai/Mixtral-8x7B-Instruct-v0.1", | |
| "Falcon 7B Instruct": "tiiuae/falcon-7b-instruct", | |
| } | |
| model_choice = st.selectbox("Select reasoning model", list(model_options.keys()), index=0) | |
| DEFAULT_MODEL = model_options[model_choice] | |
| st.markdown("---") | |
| similarity_threshold = st.slider("Similarity threshold", 0.3, 1.0, 0.6, 0.01) | |
| top_k = st.slider("Top K retrieved chunks", 1, 10, 4) | |
| chunk_size_chars = st.number_input("Chunk size (chars)", 400, 2500, 1200, 100) | |
| chunk_overlap = st.number_input("Chunk overlap (chars)", 20, 600, 150, 10) | |
| front_matter_pages = st.number_input( | |
| "Pages before main content (e.g. table of contents, cover)", min_value=0, max_value=50, value=12 | |
| ) | |
| regenerate_index = st.button("π Rebuild handbook index") | |
| # ============================================================= | |
| # π File Config | |
| # ============================================================= | |
| INDEX_FILE = "handbook_faiss.index" | |
| META_FILE = "handbook_metadata.json" | |
| EMB_DIM_FILE = "handbook_emb_dim.json" | |
| EMBED_MODEL = "sentence-transformers/all-mpnet-base-v2" | |
| # ============================================================= | |
| # π§© Utility Functions | |
| # ============================================================= | |
| def find_handbook() -> List[str]: | |
| preferred = "USTP Student Handbook 2023 Edition.pdf" | |
| pdfs = glob.glob("*.pdf") | |
| for f in pdfs: | |
| if preferred.lower() in f.lower(): | |
| st.success(f"π Found handbook: {f}") | |
| return [f] | |
| if pdfs: | |
| st.warning(f"β οΈ Preferred handbook not found. Using {os.path.basename(pdfs[0])}.") | |
| return [pdfs[0]] | |
| st.error("β No PDF found in current folder.") | |
| return [] | |
| def load_pdf_texts(pdf_paths: List[str]) -> List[Dict[str, Any]]: | |
| """Extract page text while adjusting page numbering to printed handbook numbers.""" | |
| pages = [] | |
| for path in pdf_paths: | |
| with open(path, "rb") as f: | |
| reader = PyPDF2.PdfReader(f) | |
| for i, page in enumerate(reader.pages): | |
| text = page.extract_text() or "" | |
| if text.strip(): | |
| # Adjust logical page number to printed numbering | |
| logical_page = i + 1 | |
| printed_page = logical_page - front_matter_pages | |
| if printed_page < 1: | |
| printed_page = 1 | |
| pages.append({ | |
| "filename": os.path.basename(path), | |
| "page": printed_page, | |
| "text": text.strip() | |
| }) | |
| return pages | |
| def chunk_text(pages: List[Dict[str, Any]], size: int, overlap: int) -> List[Dict[str, Any]]: | |
| chunks = [] | |
| for p in pages: | |
| text = p["text"] | |
| start = 0 | |
| while start < len(text): | |
| end = start + size | |
| chunk = text[start:end] | |
| chunks.append({ | |
| "filename": p["filename"], | |
| "page": p["page"], | |
| "content": chunk.strip() | |
| }) | |
| start += size - overlap | |
| return chunks | |
| def embed_texts(texts: List[str]) -> np.ndarray: | |
| """Generate embeddings using Hugging Face feature extraction.""" | |
| if not HF_TOKEN or not hf_client: | |
| st.error("β Missing Hugging Face token or client.") | |
| return np.zeros((len(texts), 768)) | |
| try: | |
| embeddings = hf_client.feature_extraction(texts, model=EMBED_MODEL) | |
| if isinstance(embeddings[0][0], list): | |
| embeddings = [np.mean(np.array(e), axis=0) for e in embeddings] | |
| return np.array(embeddings) | |
| except Exception as e1: | |
| st.warning(f"β οΈ feature_extraction failed, using REST API fallback: {e1}") | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| resp = requests.post( | |
| f"https://api-inference.huggingface.co/models/{EMBED_MODEL}", | |
| headers=headers, | |
| json={"inputs": texts} | |
| ) | |
| data = resp.json() | |
| if isinstance(data[0][0], list): | |
| data = [np.mean(np.array(e), axis=0) for e in data] | |
| return np.array(data) | |
| def build_faiss_index(chunks: List[Dict[str, Any]]): | |
| """Build FAISS index for chunks.""" | |
| texts = [c["content"] for c in chunks] | |
| embeddings = embed_texts(texts) | |
| if embeddings.size == 0: | |
| st.error("β Embedding generation failed.") | |
| return | |
| dim = embeddings.shape[1] | |
| index = faiss.IndexFlatL2(dim) | |
| index.add(embeddings.astype("float32")) | |
| faiss.write_index(index, INDEX_FILE) | |
| with open(META_FILE, "w") as f: | |
| json.dump(chunks, f) | |
| with open(EMB_DIM_FILE, "w") as f: | |
| json.dump({"dim": dim}, f) | |
| st.success(f"β Indexed {len(chunks)} chunks.") | |
| def load_faiss_index(): | |
| if not os.path.exists(INDEX_FILE) or not os.path.exists(META_FILE): | |
| return None, None | |
| index = faiss.read_index(INDEX_FILE) | |
| with open(META_FILE) as f: | |
| meta = json.load(f) | |
| return index, meta | |
| def search_index(query: str, index, meta, top_k: int, threshold: float): | |
| query_emb = embed_texts([query]) | |
| distances, indices = index.search(query_emb.astype("float32"), top_k) | |
| results = [] | |
| for i, dist in zip(indices[0], distances[0]): | |
| if i < len(meta): | |
| r = meta[i] | |
| r["distance"] = float(dist) | |
| results.append(r) | |
| return results | |
| def generate_answer(context: str, query: str) -> str: | |
| """Generate model-based answer using selected open-source model.""" | |
| prompt = f""" | |
| You are a precise academic assistant specialized in university policy. | |
| Use only the *USTP Student Handbook 2023 Edition* below. | |
| If the answer is not in the text, reply: | |
| "The handbook does not specify that." | |
| --- | |
| π Context: | |
| {context} | |
| --- | |
| π§ Question: | |
| {query} | |
| --- | |
| π― Instructions: | |
| - Be factual and concise. | |
| - Cite the correct printed page number. | |
| - Never make assumptions. | |
| """ | |
| try: | |
| response = hf_client.text_generation( | |
| model=DEFAULT_MODEL, | |
| prompt=prompt, | |
| max_new_tokens=400, | |
| temperature=0.25 | |
| ) | |
| return response if isinstance(response, str) else str(response) | |
| except Exception as e1: | |
| try: | |
| chat_response = hf_client.chat.completions.create( | |
| model=DEFAULT_MODEL, | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=400 | |
| ) | |
| return chat_response.choices[0].message["content"] | |
| except Exception as e2: | |
| return f"β οΈ Error generating answer: {e2}" | |
| def ensure_index(): | |
| """Ensure FAISS index exists or rebuild.""" | |
| if regenerate_index or not os.path.exists(INDEX_FILE): | |
| pdfs = find_handbook() | |
| if not pdfs: | |
| st.stop() | |
| st.info("π Extracting handbook text...") | |
| pages = load_pdf_texts(pdfs) | |
| chunks = chunk_text(pages, chunk_size_chars, chunk_overlap) | |
| build_faiss_index(chunks) | |
| index, meta = load_faiss_index() | |
| if index is None or meta is None: | |
| st.error("β Could not load FAISS index.") | |
| st.stop() | |
| return index, meta | |
| # ============================================================= | |
| # π¬ Chat Interface | |
| # ============================================================= | |
| st.divider() | |
| st.subheader("π¬ Ask about the Handbook") | |
| if "history" not in st.session_state: | |
| st.session_state.history = [] | |
| user_query = st.text_input("Enter your question:") | |
| index, meta = ensure_index() | |
| if st.button("Ask") and user_query.strip(): | |
| results = search_index(user_query, index, meta, top_k, similarity_threshold) | |
| if not results: | |
| st.warning("No relevant section found in the handbook.") | |
| else: | |
| context = "\n\n".join( | |
| [f"(π Page {r['page']})\n{r['content']}" for r in results] | |
| ) | |
| answer = generate_answer(context, user_query) | |
| st.session_state.history.append({ | |
| "user": user_query, | |
| "assistant": answer, | |
| "timestamp": time.time() | |
| }) | |
| # β Ensure unique keys to prevent StreamlitDuplicateElementId | |
| for i, chat in enumerate(st.session_state.history): | |
| st_message(chat["user"], is_user=True, key=f"user_{i}") | |
| st_message(chat["assistant"], key=f"assistant_{i}") | |
| st.caption("β‘ Powered by FAISS + Open Source Models + Accurate Page Referencing") | |