| |
| import os, re, functools, numpy as np, pandas as pd |
| import gradio as gr |
| from datasets import load_dataset |
| from sklearn.metrics.pairwise import cosine_similarity |
|
|
| |
| SAMPLE_SIZE = int(os.getenv("SAMPLE_SIZE", "3000")) |
| RANDOM_STATE = 42 |
| DEFAULT_INPUT = "I am so happy with this product" |
|
|
| |
| def clean_text(text: str) -> str: |
| text = (text or "").lower() |
| text = re.sub(r"http\S+", "", text) |
| text = re.sub(r"@\w+", "", text) |
| text = re.sub(r"#\w+", "", text) |
| text = re.sub(r"[^\w\s]", "", text) |
| text = re.sub(r"\s+", " ", text).strip() |
| return text |
|
|
| def _to_numpy(x): |
| try: |
| import torch |
| if hasattr(torch, "Tensor") and isinstance(x, torch.Tensor): |
| return x.detach().cpu().numpy() |
| except Exception: |
| pass |
| return np.asarray(x) |
|
|
| def _l2norm(x: np.ndarray) -> np.ndarray: |
| x = x.astype(np.float32, copy=False) |
| if x.ndim == 1: |
| x = x.reshape(1, -1) |
| return x / (np.linalg.norm(x, axis=1, keepdims=True) + 1e-12) |
|
|
|
|
| |
| @functools.lru_cache(maxsize=1) |
| def load_sample_df(): |
| |
| import pandas as pd |
|
|
| try: |
| from datasets import load_dataset |
| ds = load_dataset("sentiment140", split=f"train[:{SAMPLE_SIZE}]") |
| df = ds.to_pandas() |
| except Exception: |
| try: |
| from datasets import load_dataset |
| ds = load_dataset("tweet_eval", "sentiment", split=f"train[:{SAMPLE_SIZE}]") |
| df = ds.to_pandas().rename(columns={"text": "text"}) |
| except Exception: |
| fallback_texts = [ |
| "I love this product!", "This is terrible...", "Best purchase ever", |
| "Pretty good overall", "I am not happy with the service", |
| "Absolutely fantastic experience", "Could be better", "Super satisfied", |
| "Worst ever", "Not bad at all", "Amazing quality", "I will buy again" |
| ] |
| return pd.DataFrame({"text": fallback_texts, "clean_text": fallback_texts}) |
|
|
| df = df.dropna(subset=["text"]).copy() |
| df["text_length"] = df["text"].astype(str).str.len() |
| df = df[(df["text_length"] >= 5) & (df["text_length"] <= 280)].copy() |
|
|
| def clean_text(t): |
| import re |
| t = str(t).lower() |
| t = re.sub(r"http\S+", "", t) |
| t = re.sub(r"[@#]\w+", "", t) |
| t = re.sub(r"[^a-z0-9\s.,!?']", " ", t) |
| t = re.sub(r"\s+", " ", t).strip() |
| return t |
|
|
| df["clean_text"] = df["text"].apply(clean_text) |
| df = df.sample(frac=1.0, random_state=RANDOM_STATE).reset_index(drop=True) |
|
|
| return df[["text", "clean_text"]] |
|
|
|
|
| |
| @functools.lru_cache(maxsize=None) |
| def load_sentence_model(model_id: str): |
| from sentence_transformers import SentenceTransformer |
| return SentenceTransformer(model_id) |
|
|
| @functools.lru_cache(maxsize=None) |
| def load_generator(): |
| from transformers import pipeline, set_seed |
| set_seed(RANDOM_STATE) |
| return pipeline("text-generation", model="distilgpt2") |
|
|
| |
| EMBEDDERS = { |
| "MiniLM (fast)": "sentence-transformers/all-MiniLM-L6-v2", |
| "MPNet (heavier)": "sentence-transformers/all-mpnet-base-v2", |
| "DistilRoBERTa (paraphrase)": "sentence-transformers/paraphrase-distilroberta-base-v1", |
| } |
|
|
| |
| _CORPUS_CACHE = {} |
|
|
| def _encode_norm(model, texts): |
| """Encode compatibly across sentence-transformers versions; return L2-normalized numpy (n,d).""" |
| out = model.encode(texts, show_progress_bar=False) |
| out = _to_numpy(out) |
| return _l2norm(out) |
|
|
| def ensure_corpus_embeddings(model_name: str, texts: list): |
| if model_name in _CORPUS_CACHE: |
| return _CORPUS_CACHE[model_name] |
| model = load_sentence_model(EMBEDDERS[model_name]) |
| emb = _encode_norm(model, texts) |
| _CORPUS_CACHE[model_name] = emb |
| return emb |
|
|
| |
| def top3_for_each_model(user_input: str, selected_models: list): |
| df = load_sample_df() |
| texts = df["clean_text"].tolist() |
| rows = [] |
| for name in selected_models: |
| try: |
| model = load_sentence_model(EMBEDDERS[name]) |
| corpus_emb = ensure_corpus_embeddings(name, texts) |
| q = _encode_norm(model, [clean_text(user_input)]) |
| sims = cosine_similarity(q, corpus_emb)[0] |
| top_idx = sims.argsort()[-3:][::-1] |
| for rank, i in enumerate(top_idx, start=1): |
| rows.append({ |
| "Model": name, |
| "Rank": rank, |
| "Similarity": float(sims[i]), |
| "Tweet (clean)": texts[i], |
| "Tweet (orig)": df.loc[i, "text"], |
| }) |
| except Exception as e: |
| rows.append({ |
| "Model": name, "Rank": "-", "Similarity": "-", |
| "Tweet (clean)": f"[Error: {e}]", "Tweet (orig)": "" |
| }) |
| return pd.DataFrame(rows, columns=["Model","Rank","Similarity","Tweet (clean)","Tweet (orig)"]) |
|
|
| |
| def generate_and_pick_best(prompt: str, n_sequences: int, max_length: int, |
| temperature: float, scorer_model_name: str, |
| progress=gr.Progress()): |
| progress(0.0, desc="Loading models…") |
| gen = load_generator() |
| scorer = load_sentence_model(EMBEDDERS[scorer_model_name]) |
|
|
| progress(0.3, desc="Generating candidates…") |
| outputs = gen( |
| prompt, |
| max_new_tokens=int(max_length), |
| num_return_sequences=int(n_sequences), |
| do_sample=True, |
| temperature=float(temperature), |
| pad_token_id=50256, |
| ) |
| candidates = [o["generated_text"].strip() for o in outputs] |
|
|
| progress(0.7, desc="Scoring candidates…") |
| q = _encode_norm(scorer, [prompt]) |
| cand_vecs = _encode_norm(scorer, candidates) |
| sims = cosine_similarity(q, cand_vecs)[0] |
| best_idx = int(sims.argmax()) |
|
|
| table = pd.DataFrame({ |
| "Rank": np.argsort(-sims) + 1, |
| "Similarity": np.sort(sims)[::-1], |
| "Generated Tweet": [c for _, c in sorted(zip(-sims, candidates))] |
| }) |
| progress(1.0) |
| return candidates[best_idx], float(sims[best_idx]), table |
|
|
| |
| with gr.Blocks(title="Sentiment140 Embeddings + Generation") as demo: |
| gr.Markdown( |
| """ |
| # 🧪 Sentiment140 — Embeddings & Tweet Generator |
| Type a tweet, get similar tweets from Sentiment140, and generate a new one. |
| """ |
| ) |
|
|
| with gr.Row(): |
| test_input = gr.Textbox(label="Your input", value=DEFAULT_INPUT, lines=2) |
| models = gr.CheckboxGroup( |
| choices=list(EMBEDDERS.keys()), |
| value=["MiniLM (fast)"], |
| label="Embedding models to compare", |
| ) |
|
|
| run_btn = gr.Button("🔎 Find Top‑3 Similar Tweets") |
| table_out = gr.Dataframe(interactive=False) |
|
|
| run_btn.click(top3_for_each_model, inputs=[test_input, models], outputs=table_out) |
|
|
| gr.Markdown("---") |
| gr.Markdown("## 📝 Generate Tweets and Pick the Best") |
|
|
| with gr.Row(): |
| n_seq = gr.Slider(1, 8, value=4, step=1, label="Number of candidates") |
| max_len = gr.Slider(20, 80, value=40, step=1, label="Max length (new tokens)") |
| temp = gr.Slider(0.7, 1.3, value=0.9, step=0.05, label="Temperature") |
| scorer_model = gr.Dropdown(list(EMBEDDERS.keys()), value="MiniLM (fast)", label="Scorer embedding") |
|
|
| gen_btn = gr.Button("✨ Generate & Score") |
| best_txt = gr.Textbox(label="Best generated tweet") |
| best_score = gr.Number(label="Similarity (best)") |
| gen_table = gr.Dataframe(interactive=False) |
|
|
| gen_btn.click( |
| generate_and_pick_best, |
| inputs=[test_input, n_seq, max_len, temp, scorer_model], |
| outputs=[best_txt, best_score, gen_table], |
| ) |
|
|
| demo.queue(max_size=32).launch() |
|
|
|
|