from datetime import datetime, timezone from pathlib import Path import os import re import shutil import gradio as gr from agent import ( analyze_github_repo, analyze_local_repo, build_experiment_from_report, build_repo_vector_store, fetch_youtube_transcript, generate_youtube_study_notes, rag_answer_from_store, summarize_youtube_chapters, ) from bookmarks import ( bookmark_repo_from_analysis, find_metadata_by_label, get_cache_dirs, get_dropdown_options, ) # File extensions and folders to ignore for local uploads IGNORE_PATTERNS = { 'folders': { '__pycache__', '.git', '.svn', '.hg', 'node_modules', 'venv', 'env', '.venv', '.env', 'dist', 'build', '.idea', '.vscode', '.pytest_cache', '.mypy_cache', 'coverage', '.coverage', 'htmlcov', '.tox', 'eggs', '.eggs', '*.egg-info', '.DS_Store' }, 'extensions': { '.pyc', '.pyo', '.pyd', '.so', '.dll', '.dylib', '.class', '.o', '.obj', '.exe', '.bin', '.lock', '.log', '.tmp', '.temp', '.cache', '.bak', '.swp', '.swo', '.DS_Store', '.gitignore' } } def _should_ignore_path(path: Path) -> bool: """Check if a path should be ignored during local folder processing.""" for part in path.parts: if part in IGNORE_PATTERNS['folders']: return True if part.startswith('.') and part not in {'.', '..'}: return True if path.suffix.lower() in IGNORE_PATTERNS['extensions']: return True return False KNOWLEDGE_TRANSFER_ROOT = Path("Knowledge Transfer") def _is_gradio_v6_or_newer() -> bool: """Return True if the installed Gradio major version is >= 6.""" version_str = getattr(gr, "__version__", "0") try: major = int(version_str.split(".")[0]) return major >= 6 except (ValueError, IndexError): return False IS_GRADIO_V6 = _is_gradio_v6_or_newer() def run_github_ingestion(repo_url: str): """Analyze GitHub repository without indexing.""" repo_url = (repo_url or "").strip() if not repo_url: warning = "⚠️ Please paste a public GitHub repository URL to begin." source_info = "**Source:** Not selected\n**Status:** ⏳ Pending\n**Chunks:** 0 vectors" return warning, source_info, { "analysis": None, "vector_dir": "", "vector_chunks": 0, "summary_base": [], } result = analyze_github_repo(repo_url) if "error" in result: error_msg = f"❌ {result['error']}" source_info = f"**Source:** {repo_url}\n**Status:** ❌ Error\n**Chunks:** 0 vectors" return error_msg, source_info, { "analysis": None, "vector_dir": "", "vector_chunks": 0, "summary_base": [], } docs = result.get("documentation", []) repo_name = result.get("repo_name", repo_url) timestamp = datetime.now(timezone.utc).strftime("%d %b %Y, %H:%M UTC") # Source info for preview panel source_info = f"""**Source:** GitHub Repository **Repository:** {repo_name} **Status:** ✅ Analyzed **Documents:** {len(docs)} files **Analyzed:** {timestamp}""" # Document preview content preview_sections = [] for doc in docs[:5]: content = (doc.get("content") or "").strip() if not content: continue snippet = content[:600] if len(content) > 600: snippet = snippet.rstrip() + " ..." preview_sections.append( f"### 📄 {doc.get('path', 'document')}\n\n{snippet}" ) preview_content = ( "\n\n---\n\n".join(preview_sections) if preview_sections else "*No textual documentation snippets were found.*" ) state_payload = { "analysis": result, "vector_dir": "", "vector_chunks": 0, "summary_base": [repo_name, str(len(docs))], "processed_timestamp": timestamp, "indexed": False, } return preview_content, source_info, state_payload def index_github_repo(state_payload: dict | None): """Index the analyzed GitHub repository for RAG.""" analysis_data = (state_payload or {}).get("analysis") if state_payload else None if not analysis_data: return "⚠️ Run analysis before indexing.", state_payload if state_payload.get("indexed"): return "✅ Repository already indexed and ready for RAG queries.", state_payload docs = analysis_data.get("documentation", []) repo_name = analysis_data.get("repo_name", "repo") repo_url = analysis_data.get("repo_url", "") slug, cache_dir, cache_vector_dir = get_cache_dirs(repo_url, repo_name) if cache_dir.exists(): shutil.rmtree(cache_dir) cache_dir.mkdir(parents=True, exist_ok=True) vector_chunks = 0 if docs: _, chunk_count = build_repo_vector_store(docs, persist_path=cache_vector_dir) vector_chunks = chunk_count else: return "⚠️ No documentation found to index.", state_payload new_state = { **state_payload, "vector_dir": str(cache_vector_dir), "vector_chunks": vector_chunks, "indexed": True, } return f"✅ Indexed {vector_chunks} vector chunks. Ready for RAG queries!", new_state def bookmark_github_repo(state_payload: dict | None): """Bookmark and index the GitHub repository permanently.""" analysis_data = (state_payload or {}).get("analysis") if state_payload else None if not analysis_data: return "⚠️ Run analysis before bookmarking.", state_payload, gr.Dropdown() docs = analysis_data.get("documentation", []) if not docs: return "⚠️ No documentation to bookmark.", state_payload, gr.Dropdown() repo_url = analysis_data.get("repo_url") or analysis_data.get("repo_name") # Build vector store if not already done vector_dir = state_payload.get("vector_dir") if state_payload else "" if not vector_dir: repo_name = analysis_data.get("repo_name", "repo") slug, cache_dir, cache_vector_dir = get_cache_dirs(repo_url, repo_name) if cache_dir.exists(): shutil.rmtree(cache_dir) cache_dir.mkdir(parents=True, exist_ok=True) if docs: _, chunk_count = build_repo_vector_store(docs, persist_path=cache_vector_dir) vector_dir = str(cache_vector_dir) else: chunk_count = 0 else: chunk_count = state_payload.get("vector_chunks", 0) metadata = bookmark_repo_from_analysis( repo_url, analysis_data, prebuilt_vector_dir=Path(vector_dir) if vector_dir else None, prebuilt_chunks=chunk_count, ) choices, metadata_list = get_dropdown_options() dropdown_update = gr.Dropdown( choices=choices, value=metadata.dropdown_label, interactive=True, ) new_state = { **state_payload, "vector_dir": vector_dir, "vector_chunks": chunk_count, "indexed": True, } return f"💾 Repository bookmarked on {metadata.last_pulled_display}. Access it in the Chat tab!", new_state, dropdown_update def run_youtube_ingestion(youtube_url: str): """Analyze YouTube video without indexing.""" youtube_url = (youtube_url or "").strip() if not youtube_url: warning = "⚠️ Paste a YouTube video URL to begin." source_info = "**Source:** Not selected\n**Status:** ⏳ Pending\n**Chunks:** 0 vectors" return warning, source_info, { "analysis": None, "vector_dir": "", "vector_chunks": 0, } result = fetch_youtube_transcript(youtube_url) if "error" in result: error_msg = f"❌ {result['error']}" source_info = f"**Source:** YouTube\n**Status:** ❌ Error\n**Chunks:** 0 vectors" return error_msg, source_info, { "analysis": None, "vector_dir": "", "vector_chunks": 0, } transcript = (result.get("raw_transcript") or "").strip() if not transcript: source_info = "**Source:** YouTube\n**Status:** ⚠️ No transcript\n**Chunks:** 0 vectors" return "⚠️ No transcript text was returned.", source_info, {"analysis": None} timestamp = datetime.now(timezone.utc).strftime("%d %b %Y, %H:%M UTC") video_url = result.get("url", youtube_url) lang = result.get("lang", "en") # Source info for preview panel source_info = f"""**Source:** YouTube Video **URL:** {video_url} **Language:** {lang} **Status:** ✅ Analyzed **Analyzed:** {timestamp}""" # Generate chapter summaries chapters = summarize_youtube_chapters(transcript, url=video_url) # Preview content with chapters preview_content = f"""### 📺 Video Transcript Analysis {chapters} --- ### 📝 Transcript Preview {transcript[:2000]}{"..." if len(transcript) > 2000 else ""} """ state_payload = { "analysis": { "transcript": transcript, "url": video_url, "lang": lang, "chapters": chapters, }, "vector_dir": "", "vector_chunks": 0, "indexed": False, } return preview_content, source_info, state_payload def index_youtube_video(state_payload: dict | None): """Index YouTube transcript for RAG.""" analysis_data = (state_payload or {}).get("analysis") if state_payload else None if not analysis_data: return "⚠️ Run analysis before indexing.", state_payload if state_payload.get("indexed"): return "✅ Video already indexed and ready for RAG queries.", state_payload transcript = analysis_data.get("transcript", "") if not transcript: return "⚠️ No transcript found to index.", state_payload # Create pseudo-documents from transcript docs = [{ "path": "transcript.txt", "content": transcript, "type": "transcript", }] url = analysis_data.get("url", "youtube") slug, cache_dir, cache_vector_dir = get_cache_dirs(url, "youtube") if cache_dir.exists(): shutil.rmtree(cache_dir) cache_dir.mkdir(parents=True, exist_ok=True) _, chunk_count = build_repo_vector_store(docs, persist_path=cache_vector_dir) new_state = { **state_payload, "vector_dir": str(cache_vector_dir), "vector_chunks": chunk_count, "indexed": True, } return f"✅ Indexed {chunk_count} transcript chunks. Ready for RAG queries!", new_state def bookmark_youtube_video(state_payload: dict | None): """Bookmark YouTube video - persists transcript to bookmarks system.""" analysis_data = (state_payload or {}).get("analysis") if state_payload else None if not analysis_data: return "⚠️ Run analysis before bookmarking.", state_payload, gr.Dropdown() transcript = analysis_data.get("transcript", "") if not transcript: return "⚠️ No transcript found to bookmark.", state_payload, gr.Dropdown() video_url = analysis_data.get("url", "youtube-video") chapters = analysis_data.get("chapters", "") # Create pseudo-analysis structure compatible with bookmark_repo_from_analysis pseudo_analysis = { "repo_name": f"YouTube: {video_url[:50]}", "repo_url": video_url, "documentation": [ { "path": "transcript.txt", "content": transcript, }, { "path": "chapters.md", "content": chapters, } ], } # Use prebuilt vector store if already indexed prebuilt_dir = None prebuilt_chunks = None if state_payload.get("indexed") and state_payload.get("vector_dir"): prebuilt_dir = Path(state_payload["vector_dir"]) prebuilt_chunks = state_payload.get("vector_chunks", 0) metadata = bookmark_repo_from_analysis( video_url, pseudo_analysis, prebuilt_vector_dir=prebuilt_dir, prebuilt_chunks=prebuilt_chunks, ) choices, _ = get_dropdown_options() dropdown_update = gr.Dropdown(choices=choices, value=metadata.dropdown_label) new_state = { **state_payload, "vector_dir": metadata.vector_dir, "vector_chunks": metadata.vector_chunks, "indexed": True, "bookmarked": True, } return f"🔖 YouTube video bookmarked! {metadata.vector_chunks} chunks indexed.", new_state, dropdown_update def generate_youtube_transfer_report(youtube_url: str): youtube_url = (youtube_url or "").strip() if not youtube_url: return "⚠️ Paste a YouTube video URL before generating a report." result = fetch_youtube_transcript(youtube_url) if "error" in result: return f"❌ {result['error']}" transcript = (result.get("raw_transcript") or "").strip() if not transcript: return "No transcript text was returned by the youtube-transcript MCP server; report generation was skipped." chapters = summarize_youtube_chapters(transcript, url=result.get("url", youtube_url)) study_notes = generate_youtube_study_notes(chapters, url=result.get("url", youtube_url)) generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") header_lines = [ "# YouTube Knowledge Transfer Report", "", f"- Source: {result.get('url', youtube_url)}", f"- Language: {result.get('lang', 'en')}", f"- Generated at: {generated_at}", ] lines: list[str] = [] lines.extend(header_lines) lines.append("") lines.append("## 1. Topic & Chapter Outline") lines.append("") lines.append(chapters) lines.append("") lines.append("## 2. Study & Interview Guidance") lines.append("") lines.append(study_notes) report_markdown = "\n".join(lines) root = _ensure_knowledge_root() youtube_root = root / "Youtube Video" youtube_root.mkdir(parents=True, exist_ok=True) slug = _slugify_name(result.get("url", youtube_url)) ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") report_path = youtube_root / f"{slug}-youtube-knowledge-{ts}.md" report_path.write_text(report_markdown, encoding="utf-8") rel_path = report_path.relative_to(Path(".")) return f"📄 YouTube knowledge transfer report written to `{rel_path}`." def _ensure_knowledge_root() -> Path: KNOWLEDGE_TRANSFER_ROOT.mkdir(parents=True, exist_ok=True) return KNOWLEDGE_TRANSFER_ROOT def _slugify_name(name: str) -> str: base = (name or "project").lower() safe = re.sub(r"[^a-z0-9-]+", "-", base).strip("-") return safe or "project" def generate_knowledge_transfer_report(state_payload: dict | None): analysis_data = (state_payload or {}).get("analysis") if state_payload else None if not analysis_data: return "⚠️ Run an analysis before generating a report." repo_name = analysis_data.get("repo_name") or "Project" repo_url = analysis_data.get("repo_url") or "local upload" docs = analysis_data.get("documentation") or [] doc_count = len(docs) structure = analysis_data.get("structure") or [] vector_dir = state_payload.get("vector_dir") if state_payload else "" vector_chunks = state_payload.get("vector_chunks", 0) if state_payload else 0 summary_base = state_payload.get("summary_base", []) if state_payload else [] processed_timestamp = state_payload.get("processed_timestamp") if state_payload else None generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") header_lines = [ f"# Knowledge Transfer Report – {repo_name}", "", f"- Source: {repo_url}", f"- Generated at: {generated_at}", f"- Documentation files: {doc_count}", f"- Vector chunks: {vector_chunks}", ] if processed_timestamp: header_lines.append(f"- Last analysis run: {processed_timestamp}") overview_section = "\n".join(summary_base) if summary_base else "No high-level summary was captured during analysis." llm_summary_section = "" if vector_dir and vector_chunks: try: question = ( "Provide a detailed knowledge transfer summary of this repository. " "Explain its purpose, main components, architecture, key dependencies, " "and patterns that would be reusable in other projects. " "Focus on actionable insights and how to extend or adapt this codebase." ) llm_summary_section = rag_answer_from_store(Path(vector_dir), question, repo_summary=overview_section) except Exception as err: llm_summary_section = f"LLM summary unavailable due to error: {err}" else: llm_summary_section = "Vector store not available; running RAG-based summary was skipped." max_items = 80 structure_snippet = "\n".join(structure[:max_items]) if structure else "No repository structure information was captured." doc_paths = [d.get("path", "") for d in docs][:max_items] docs_list_section = "\n".join(f"- {p}" for p in doc_paths) if doc_paths else "No documentation files were detected." lines: list[str] = [] lines.extend(header_lines) lines.append("") lines.append("## 1. High-level Overview") lines.append("") lines.append(overview_section) lines.append("") lines.append("## 2. Repository Layout (snapshot)") lines.append("") lines.append("```") lines.append(structure_snippet) lines.append("```") lines.append("") lines.append("## 3. Documentation Files") lines.append("") lines.append(docs_list_section) lines.append("") lines.append("## 4. LLM Knowledge Summary") lines.append("") lines.append(llm_summary_section) lines.append("") lines.append("## 5. Notes for Future Reuse") lines.append("") lines.append( "Use this report as a starting point when designing new projects. " "Focus on reusing architecture patterns, utility modules, and any " "documented best practices or workflows." ) report_markdown = "\n".join(lines) root = _ensure_knowledge_root() slug = _slugify_name(repo_name) ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") report_path = root / f"{slug}-knowledge-transfer-{ts}.md" report_path.write_text(report_markdown, encoding="utf-8") rel_path = report_path.relative_to(Path(".")) return f"📄 Knowledge transfer report written to `{rel_path}`." def _list_knowledge_report_choices() -> list[str]: root = _ensure_knowledge_root() reports = sorted(root.glob("*.md")) return [report.name for report in reports] def _refresh_lab_reports_dropdown(): return gr.Dropdown( label="Knowledge Transfer report (optional)", choices=_list_knowledge_report_choices(), value=None, interactive=True, ) def load_study_deck_from_report(report_name: str | None) -> str: if not report_name: return "Select a Knowledge Transfer report above to view its study deck-style summary." root = _ensure_knowledge_root() report_path = root / report_name if not report_path.exists(): return f"Report `{report_name}` was not found in the Knowledge Transfer folder." try: raw = report_path.read_text(encoding="utf-8", errors="ignore") except OSError as err: return f"Unable to read report `{report_name}`: {err}" max_chars = 6000 snippet = raw[:max_chars] if len(raw) > max_chars: snippet = snippet.rstrip() + "\n\n... (truncated)" return ( f"### Study Deck · {report_name}\n\n" "Scroll through this condensed report to refresh yourself on the key concepts, " "architecture, and reusable patterns for this project.\n\n" f"```markdown\n{snippet}\n```" ) def _derive_local_repo_root(uploaded: str | list[str] | None) -> Path | None: """Given a directory-style file upload, infer the repository root directory. Gradio's File component with ``file_count="directory"`` returns a list of filepaths under the uploaded folder (or a single filepath). We compute the common parent directory and treat that as the repo root. """ if not uploaded: return None if isinstance(uploaded, str): paths = [uploaded] else: paths = [p for p in uploaded if p] if not paths: return None try: common = os.path.commonpath(paths) except ValueError: return None root = Path(common) return root if root.exists() and root.is_dir() else None def run_local_repo_ingestion(uploaded_folder): """Analyze local repository folder, filtering irrelevant files.""" repo_root = _derive_local_repo_root(uploaded_folder) if not repo_root: warning = "⚠️ Upload a project folder before running analysis." source_info = "**Source:** Not selected\n**Status:** ⏳ Pending\n**Chunks:** 0 vectors" return warning, source_info, { "analysis": None, "vector_dir": "", "vector_chunks": 0, "summary_base": [], } # Filter out irrelevant files before analysis if isinstance(uploaded_folder, list): filtered_files = [f for f in uploaded_folder if not _should_ignore_path(Path(f))] if not filtered_files: warning = "⚠️ No relevant files found after filtering." source_info = "**Source:** Local\n**Status:** ⚠️ No files\n**Chunks:** 0 vectors" return warning, source_info, { "analysis": None, "vector_dir": "", "vector_chunks": 0, "summary_base": [], } result = analyze_local_repo(str(repo_root)) if "error" in result: error_msg = f"❌ {result['error']}" source_info = f"**Source:** Local\n**Status:** ❌ Error\n**Chunks:** 0 vectors" return error_msg, source_info, { "analysis": None, "vector_dir": "", "vector_chunks": 0, "summary_base": [], } docs = result.get("documentation", []) repo_name = result.get("repo_name", repo_root.name) timestamp = datetime.now(timezone.utc).strftime("%d %b %Y, %H:%M UTC") # Source info for preview panel source_info = f"""**Source:** Local Project **Folder:** {repo_name} **Status:** ✅ Analyzed **Documents:** {len(docs)} files **Analyzed:** {timestamp}""" # Document preview content preview_sections = [] for doc in docs[:5]: content = (doc.get("content") or "").strip() if not content: continue snippet = content[:600] if len(content) > 600: snippet = snippet.rstrip() + " ..." preview_sections.append( f"### 📄 {doc.get('path', 'document')}\n\n{snippet}" ) preview_content = ( "\n\n---\n\n".join(preview_sections) if preview_sections else "*No textual documentation snippets were found.*" ) state_payload = { "analysis": result, "vector_dir": "", "vector_chunks": 0, "summary_base": [repo_name, str(len(docs))], "processed_timestamp": timestamp, "indexed": False, } return preview_content, source_info, state_payload def index_local_repo(state_payload: dict | None): """Index the analyzed local repository for RAG.""" return index_github_repo(state_payload) # Same logic def bookmark_local_repo(state_payload: dict | None): """Bookmark and index the local repository permanently.""" return bookmark_github_repo(state_payload) # Same logic def _format_bookmark_info(metadata: dict | None) -> str: if not metadata: return ( "No bookmarks yet. Process a repository in the *Process New Repository* tab, then bookmark it to enable RAG chat." ) preview = (metadata.get("summary_preview") or "").strip() if preview: max_len = 600 if len(preview) > max_len: preview_display = preview[:max_len].rstrip() + " ..." else: preview_display = preview return ( f"### {metadata.get('repo_name', 'Saved Repository')}\n" f"- URL: {metadata.get('repo_url', 'N/A')}\n" f"- Last pulled: {metadata.get('last_pulled_display', '--/--/----')}\n" f"- Documentation files: {metadata.get('docs_count', 0)}\n" f"- Vector chunks: {metadata.get('vector_chunks', 0)}\n\n" f"**Preview:**\n\n{preview_display}" ) return ( f"### {metadata.get('repo_name', 'Saved Repository')}\n" f"- URL: {metadata.get('repo_url', 'N/A')}\n" f"- Last pulled: {metadata.get('last_pulled_display', '--/--/----')}\n" f"- Documentation files: {metadata.get('docs_count', 0)}\n" f"- Vector chunks: {metadata.get('vector_chunks', 0)}" ) def _refresh_bookmarks(preselect: str | None = None): choices, metadata_list = get_dropdown_options() value = preselect if preselect and preselect in choices else (choices[0] if choices else None) dropdown_update = gr.Dropdown( choices=choices, value=value, interactive=bool(choices), label="Bookmarked repositories", allow_custom_value=True, ) info = _format_bookmark_info( find_metadata_by_label(value, metadata_list) if value else None ) return dropdown_update, metadata_list, info def load_bookmarks_on_start(): dropdown_update, metadata_list, info = _refresh_bookmarks() status = "Bookmarks loaded." if metadata_list else "No bookmarks saved yet." return dropdown_update, metadata_list, info, status def _build_summary_from_base(base_lines: list[str], final_message: str) -> str: if not base_lines: return final_message return "\n".join(base_lines + ["", final_message]) def bookmark_current_repo(state_payload: dict | None): analysis_data = (state_payload or {}).get("analysis") if state_payload else None if not analysis_data or not analysis_data.get("documentation"): return ( "⚠️ Run an analysis before bookmarking a repository.", gr.Dropdown(choices=[], value=None, interactive=False, label="Bookmarked repositories"), [], _format_bookmark_info(None), _build_summary_from_base( (state_payload or {}).get("summary_base", []), "⚠️ Bookmark failed because no analysis data is available.", ), state_payload, ) repo_url = analysis_data.get("repo_url") or analysis_data.get("repo_name") vector_dir = state_payload.get("vector_dir") if state_payload else "" metadata = bookmark_repo_from_analysis( repo_url, analysis_data, prebuilt_vector_dir=Path(vector_dir) if vector_dir else None, prebuilt_chunks=state_payload.get("vector_chunks") if state_payload else None, ) dropdown_update, metadata_list, info = _refresh_bookmarks(preselect=metadata.dropdown_label) saved_msg = ( f"💾 Repo saved on {metadata.last_pulled_display}. Access it via the Bookmarked tab for RAG chat." ) updated_summary = _build_summary_from_base( state_payload.get("summary_base", []), saved_msg, ) new_state = { **(state_payload or {}), "summary_base": state_payload.get("summary_base", []), "saved": True, } return saved_msg, dropdown_update, metadata_list, info, updated_summary, new_state def update_selected_bookmark(label: str, metadata_list: list[dict]): metadata = find_metadata_by_label(label, metadata_list or []) if label else None return _format_bookmark_info(metadata) def answer_bookmark_question(label: str, question: str, metadata_list: list[dict]): if not label: return "Select a bookmarked repository before asking a question." if not question.strip(): return "Enter a question to query your bookmarked repository." metadata = find_metadata_by_label(label, metadata_list or []) if not metadata: return "Bookmark metadata not found. Try refreshing bookmarks." if metadata.get("vector_chunks", 0) == 0: return "This bookmark has no vector store yet. Re-bookmark the repo to rebuild embeddings." summary = ( f"Repository: {metadata.get('repo_name', label)}\n" f"Docs: {metadata.get('docs_count', 0)} | Last pulled: {metadata.get('last_pulled_display', '--/--/----')}" ) answer = rag_answer_from_store(Path(metadata["vector_dir"]), question, repo_summary=summary) return answer def placeholder_action_message(label: str): if not label: return "Select a bookmarked repository to use this action." return f"Additional bookmark actions for **{label}** are coming soon." def run_experimental_lab(intention: str, report_name: str | None): text = (intention or "").strip() if not text: return "Describe what you want to build in the Experimental Lab to get started." root = _ensure_knowledge_root() report_markdown = "" context_note = "" if report_name: report_path = root / report_name if report_path.exists(): try: raw = report_path.read_text(encoding="utf-8", errors="ignore") report_markdown = raw snippet = raw[:3000] context_note = ( f"Using Knowledge Transfer report: `{report_name}` as reference.\n\n" f"Snippet from report (truncated):\n\n```markdown\n{snippet}\n```\n\n" ) except OSError: context_note = f"Unable to read Knowledge Transfer report `{report_name}`. Proceeding without embedded context.\n\n" build_result = build_experiment_from_report(text, report_markdown) code = build_result.get("code", "") stdout = build_result.get("stdout", "") error = build_result.get("error", "") base_intro = ( "Experimental Lab is a sandbox where future versions of MonkeyMind will use " "Knowledge Transfer reports as context to plan and build small Gradio apps.\n\n" ) code_section = "### Generated experiment code\n\n" if code: code_section += f"```python\n{code}\n```\n\n" else: code_section += "No code was generated.\n\n" results_section = "### Sandbox output\n\n" if stdout: results_section += f"**Stdout / logs:**\n\n```text\n{stdout}\n```\n\n" if error: results_section += f"**Error:**\n\n```text\n{error}\n```\n\n" if not stdout and not error: results_section += "No errors encountered during sandbox test.\n\n" return ( base_intro + context_note + f"You wrote:\n\n> {text}\n\n" + code_section + results_section ) def lab_fix_bugs(intention: str, report_name: str | None): base = run_experimental_lab(intention, report_name) return ( base + "\n\n---\n\n_This Fix bugs action will eventually trigger another build iteration to resolve errors in the generated app. " "For now, it simply records another planning pass based on your intention and chosen report._" ) def lab_mark_happy(intention: str, report_name: str | None): text = (intention or "").strip() return ( "Marking this experiment as complete.\n\n" f"Final intention:\n\n> {text or 'N/A'}\n\n" "You can now export or reuse this idea elsewhere. Future versions will attach concrete code artifacts here." ) def lab_export_project(intention: str, report_name: str | None): text = (intention or "").strip() return ( "Export placeholder: a future version will bundle generated code, configuration, and a short README " "into a downloadable package.\n\n" f"Current experiment description:\n\n> {text or 'N/A'}\n\n" f"Reference report: `{report_name or 'none selected'}`." ) def answer_chat_question(question: str, github_state, local_state, youtube_state, selected_bookmark, metadata_list): """Answer questions using RAG from any indexed source or bookmark.""" if not question.strip(): return "Please enter a question." # Check if using a bookmark if selected_bookmark: metadata = find_metadata_by_label(selected_bookmark, metadata_list or []) if metadata and metadata.get("vector_chunks", 0) > 0: summary = ( f"Repository: {metadata.get('repo_name', selected_bookmark)}\n" f"Last pulled: {metadata.get('last_pulled_display', '--/--/----')}" ) answer = rag_answer_from_store(Path(metadata["vector_dir"]), question, repo_summary=summary) return f"**[Bookmark: {selected_bookmark}]**\n\n{answer}" # Check current session sources for state, label in [ (github_state, "GitHub"), (local_state, "Local"), (youtube_state, "YouTube"), ]: if state and state.get("indexed") and state.get("vector_dir"): vector_dir = Path(state["vector_dir"]) if vector_dir.exists(): answer = rag_answer_from_store(vector_dir, question) return f"**[{label} Source]**\n\n{answer}" return "⚠️ No indexed sources available. Please index a repository or select a bookmark first." def generate_and_download_report(state_payload: dict | None, source_type: str): """Generate markdown report and return file path for download.""" analysis_data = (state_payload or {}).get("analysis") if state_payload else None if not analysis_data: return None if source_type == "youtube": transcript = analysis_data.get("transcript", "") chapters = analysis_data.get("chapters", "") url = analysis_data.get("url", "youtube") lang = analysis_data.get("lang", "en") study_notes = generate_youtube_study_notes(chapters, url=url) generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") lines = [ "# YouTube Knowledge Transfer Report", "", f"- Source: {url}", f"- Language: {lang}", f"- Generated at: {generated_at}", "", "## 1. Chapter Outline", "", chapters, "", "## 2. Study Notes", "", study_notes, ] else: # GitHub or Local repo repo_name = analysis_data.get("repo_name", "Project") repo_url = analysis_data.get("repo_url", "local") docs = analysis_data.get("documentation", []) generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") lines = [ f"# Knowledge Transfer Report – {repo_name}", "", f"- Source: {repo_url}", f"- Generated at: {generated_at}", f"- Documentation files: {len(docs)}", "", "## Documentation Files", "", ] for doc in docs[:50]: lines.append(f"- {doc.get('path', 'unknown')}") report_markdown = "\n".join(lines) root = _ensure_knowledge_root() slug = _slugify_name(analysis_data.get("repo_name", "project")) ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") report_path = root / f"{slug}-report-{ts}.md" report_path.write_text(report_markdown, encoding="utf-8") return str(report_path) def refresh_bookmarks_dropdown(): """Refresh the bookmarks dropdown.""" choices, metadata_list = get_dropdown_options() return gr.Dropdown(choices=choices, value=None, interactive=True), metadata_list def build_interface() -> tuple[gr.Blocks, gr.Theme | None, str | None]: """Build the Gradio interface with improved UI/UX inspired by modern dashboard design.""" custom_css = """ @import url('https://fonts.googleapis.com/css2?family=Outfit:wght@300;400;500;600;700&family=JetBrains+Mono:wght@400;500&display=swap'); :root { --primary: #34d399; /* Mint Green */ --primary-glow: rgba(52, 211, 153, 0.4); --glass-bg: rgba(15, 23, 42, 0.6); --glass-border: rgba(255, 255, 255, 0.08); --text-main: #f8fafc; --text-muted: #94a3b8; } body { background-color: #0f172a; color: var(--text-main); font-family: 'Outfit', sans-serif !important; } /* Global container override */ .gradio-container { max-width: 1400px !important; background: #0f172a !important; background-image: radial-gradient(circle at 0% 0%, rgba(52, 211, 153, 0.15) 0%, transparent 50%), radial-gradient(circle at 100% 100%, rgba(16, 185, 129, 0.1) 0%, transparent 50%) !important; border: none !important; } /* Header styling */ .header-container { background: rgba(15, 23, 42, 0.8); backdrop-filter: blur(12px); border-bottom: 1px solid var(--glass-border); padding: 20px 24px; margin: -16px -16px 24px -16px; border-radius: 0; } /* Card/Panel styling */ .source-card, .gradio-group, .tabs, .tabitem, .box-container { background: var(--glass-bg) !important; backdrop-filter: blur(12px); border: 1px solid var(--glass-border) !important; border-radius: 16px !important; box-shadow: 0 4px 20px rgba(0, 0, 0, 0.2); padding: 20px !important; margin-bottom: 20px !important; } /* Inputs and Textareas */ input, textarea, .gr-input, .gr-box, .dropdown-wrap { background-color: rgba(30, 41, 59, 0.6) !important; border: 1px solid rgba(255, 255, 255, 0.1) !important; color: var(--text-main) !important; border-radius: 10px !important; } input:focus, textarea:focus { border-color: var(--primary) !important; box-shadow: 0 0 0 2px var(--primary-glow) !important; } /* Buttons */ button.primary { background: linear-gradient(135deg, #34d399 0%, #10b981 100%) !important; color: #0f172a !important; font-weight: 600 !important; border: none !important; box-shadow: 0 4px 15px rgba(52, 211, 153, 0.3) !important; } button.secondary { background: rgba(30, 41, 59, 0.8) !important; border: 1px solid rgba(255, 255, 255, 0.1) !important; color: var(--text-muted) !important; } button.secondary:hover { color: var(--text-main) !important; border-color: var(--primary) !important; } button.stop { background: linear-gradient(135deg, #f87171 0%, #ef4444 100%) !important; color: white !important; } /* Status indicators */ .status-ready { background: rgba(16, 185, 129, 0.1); color: #34d399; padding: 6px 12px; border-radius: 20px; font-size: 12px; border: 1px solid rgba(16, 185, 129, 0.2); } .status-pending { background: rgba(251, 191, 36, 0.1); color: #fbbf24; padding: 6px 12px; border-radius: 20px; font-size: 12px; border: 1px solid rgba(251, 191, 36, 0.2); } /* Keep copy/like buttons always visible and styled */ .message-buttons { opacity: 1 !important; display: flex !important; gap: 4px !important; } .message-buttons button { color: #94a3b8 !important; /* text-muted */ background: transparent !important; border: none !important; box-shadow: none !important; } .message-buttons button:hover { color: #34d399 !important; /* primary */ background: rgba(52, 211, 153, 0.1) !important; } /* Chat bubbles */ .chat-assistant { background: rgba(30, 41, 59, 0.8) !important; border: 1px solid var(--glass-border); border-radius: 18px 18px 18px 4px !important; color: var(--text-main) !important; } .chat-user { background: linear-gradient(135deg, #34d399 0%, #10b981 100%) !important; color: #022c22 !important; border-radius: 18px 18px 4px 18px !important; font-weight: 500; } /* Chat message text size and font */ .message-wrap .message { font-size: 0.9rem !important; font-family: 'JetBrains Mono', monospace !important; line-height: 1.5 !important; } /* Typography overrides */ .prose, .prose h1, .prose h2, .prose h3, .prose p, .prose strong { color: var(--text-main) !important; } /* Scrollbar */ ::-webkit-scrollbar { width: 6px; height: 6px; } ::-webkit-scrollbar-track { background: transparent; } ::-webkit-scrollbar-thumb { background: #334155; border-radius: 10px; } ::-webkit-scrollbar-thumb:hover { background: #475569; } """ app_theme = gr.themes.Soft( primary_hue="emerald", secondary_hue="slate", neutral_hue="slate", ).set( body_background_fill="#0f172a", block_background_fill="#1e293b", block_border_color="rgba(255,255,255,0.1)", input_background_fill="#0f172a", button_primary_background_fill="#34d399", button_primary_text_color="#0f172a", ) blocks_kwargs = {"title": "🐒 MonkeyMind - Knowledge Transfer Agent"} if not IS_GRADIO_V6: blocks_kwargs.update(theme=app_theme, css=custom_css) with gr.Blocks(**blocks_kwargs) as demo: # State variables github_state = gr.State({}) local_state = gr.State({}) youtube_state = gr.State({}) bookmarks_metadata = gr.State([]) chat_history = gr.State([]) notepad_content = gr.State("") # For notepad feature copied_texts = gr.State(set()) # Track copied texts to prevent duplicates # ===== HEADER ===== with gr.Row(elem_classes=["header-container"]): gr.HTML("""
🐒

MonkeyMind

AGENT ACTIVE

""") # ===== APP DESCRIPTION ===== gr.Markdown(""" > **🐒 MonkeyMind** is your Knowledge Transfer & RAG Agent. Analyze GitHub repos, local projects, or YouTube videos > to build a personal knowledge base. Chat with your sources using AI-powered retrieval. > > **Quick Start:** Paste a GitHub URL → Click **Analyze** → Click **Index** → Start chatting! > Bookmark important sources to access them anytime. """) # ===== MAIN LAYOUT: 1/3 Left Panel + 2/3 Right Panel ===== with gr.Row(): # ===== LEFT PANEL: Data Sources + Bookmarks ===== with gr.Column(scale=1, min_width=350): # Data Sources Section gr.Markdown("### 📡 Data Ingestion") # Source Type Selector (pill-style tabs) source_type = gr.Radio( choices=["🔗 GitHub", "📁 Local", "🎥 YouTube"], value="🔗 GitHub", label="", container=False, interactive=True ) # Dynamic source input based on selection with gr.Group(visible=True) as github_group: gr.Markdown("*Paste a GitHub repo URL and click **Analyze** to extract docs. Then **Index** to enable RAG chat. You can see retrieved contents in the **Preview** tab on the right.*") github_url = gr.Textbox( label="Repository URL", placeholder="https://github.com/username/repo", show_label=True, ) github_analyze_btn = gr.Button("🔍 Analyze Repository", variant="secondary") github_index_btn = gr.Button("⚡ Index for RAG", variant="primary") github_status = gr.Markdown("Ready to analyze.", elem_classes=["status-pending"]) github_bookmark_btn = gr.Button("🔖 Bookmark & Save to Knowledge Base", variant="secondary") gr.Markdown("---") gr.Markdown("**📝 Notepad** *(copy useful info here)*") github_notepad = gr.Textbox( label="", placeholder="Paste or write notes here...", lines=3, container=False, show_copy_button=True, ) with gr.Row(): github_notepad_download = gr.DownloadButton("📥 Download as .md", variant="secondary", size="sm") with gr.Group(visible=False) as local_group: gr.Markdown("*Upload a project folder. Irrelevant files are auto-filtered. You can see retrieved contents in the **Preview** tab on the right.*") local_folder = gr.File( label="Upload Project Folder", file_count="directory", type="filepath", ) local_analyze_btn = gr.Button("🔍 Analyze Project", variant="secondary") local_index_btn = gr.Button("⚡ Index for RAG", variant="primary") local_status = gr.Markdown("Ready to analyze.", elem_classes=["status-pending"]) local_bookmark_btn = gr.Button("🔖 Bookmark & Save to Knowledge Base", variant="secondary") gr.Markdown("---") gr.Markdown("**📝 Notepad** *(copy useful info here)*") local_notepad = gr.Textbox( label="", placeholder="Paste or write notes here...", lines=3, container=False, show_copy_button=True, ) with gr.Row(): local_notepad_download = gr.DownloadButton("📥 Download as .md", variant="secondary", size="sm") with gr.Group(visible=False) as youtube_group: gr.Markdown("*Paste a YouTube video URL to extract and analyze the transcript. You can see retrieved contents in the **Preview** tab on the right.*") youtube_url = gr.Textbox( label="Video URL", placeholder="https://www.youtube.com/watch?v=...", ) youtube_analyze_btn = gr.Button("🔍 Analyze Video", variant="secondary") youtube_index_btn = gr.Button("⚡ Index for RAG", variant="primary") youtube_status = gr.Markdown("Ready to analyze.", elem_classes=["status-pending"]) youtube_bookmark_btn = gr.Button("🔖 Bookmark & Save to Knowledge Base", variant="secondary") gr.Markdown("---") gr.Markdown("**📝 Notepad** *(copy useful info here)*") youtube_notepad = gr.Textbox( label="", placeholder="Paste or write notes here...", lines=3, container=False, show_copy_button=True, ) with gr.Row(): youtube_notepad_download = gr.DownloadButton("📥 Download as .md", variant="secondary", size="sm") # Source type switching logic def switch_source(choice): return ( gr.Group(visible=("GitHub" in choice)), gr.Group(visible=("Local" in choice)), gr.Group(visible=("YouTube" in choice)), ) source_type.change( fn=switch_source, inputs=[source_type], outputs=[github_group, local_group, youtube_group], ) gr.Markdown("---") # Bookmarks Quick Access Section gr.Markdown("### 🧠 Knowledge Base") gr.Markdown("*Select sources to use for chat. Multiple selections allowed.*") # Use CheckboxGroup for multi-select bookmarks_checkboxes = gr.CheckboxGroup( label="Active Sources", choices=[], value=[], interactive=True, info="Check sources to include in chat context" ) # Keep dropdown for compatibility (hidden, used internally) bookmarks_dropdown = gr.Dropdown( label="", choices=[], value=None, interactive=True, visible=False, ) with gr.Row(): refresh_bookmarks_btn = gr.Button("🔄 Refresh", variant="secondary", size="sm", scale=1) view_all_btn = gr.Button("📋 View All", variant="secondary", size="sm", scale=1) # Bookmark info display bookmark_info = gr.Markdown( value="*No sources selected. Bookmark repos to add them here.*", elem_classes=["info-box"] ) # ===== RIGHT PANEL: Chat & Preview ===== with gr.Column(scale=2, min_width=500): # Chat vs Preview Toggle right_panel_mode = gr.Radio( choices=["💬 Chat & RAG", "📄 Preview"], value="💬 Chat & RAG", label="", container=False, ) # Chat Interface with gr.Group(visible=True) as chat_panel: with gr.Group(elem_classes=["box-container"]): gr.Markdown("### 🐒 Knowledge Assistant") gr.Markdown("*Hover over messages to see copy icon. Use toggles below to augment with web/wiki search.*") chatbot_kwargs = dict( value=[], height=450, show_label=False, avatar_images=["images/user.png", "images/monkey.png"], elem_classes=["chat-container"], type="messages", ) chatbot = gr.Chatbot(**chatbot_kwargs) # Toolbar row: Web Search, Wiki Search (Clear Chat removed) with gr.Row(): # clear_chat_btn removed as requested web_search_toggle = gr.Checkbox( label="🌐 Web Search", value=False, interactive=True, ) wiki_search_toggle = gr.Checkbox( label="📚 Wikipedia", value=False, interactive=True, ) with gr.Row(): question_box = gr.Textbox( label="", placeholder="Ask anything about your indexed sources...", lines=1, scale=5, container=False, ) send_btn = gr.Button("Send", variant="primary", scale=1) gr.Examples( examples=[ "Summarize the main architecture patterns", "What are the key dependencies?", "Explain the core functionality", "What patterns can I reuse?", ], inputs=question_box, label="Quick Questions" ) # ===== EXPERIMENTAL LAB (Collapsible) - Moved here ===== with gr.Group(elem_classes=["box-container"]): gr.Markdown("### 🧪 Experimental Lab") gr.Markdown("Use this lab to prototype small apps based on your knowledge base.") with gr.Accordion("Open Lab", open=False): gr.Markdown("*The agent generates Gradio code and tests it in a sandbox.*") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 🎯 Build an Experiment") lab_intention = gr.Textbox( label="What do you want to build?", placeholder="e.g., A Gradio app that visualizes patterns from a Knowledge Transfer report.", lines=3, ) lab_report_dropdown = gr.Dropdown( label="Reference Material (bookmarked report)", choices=_list_knowledge_report_choices(), value=None, interactive=True, info="Select a Knowledge Transfer report as context" ) with gr.Row(): lab_refresh_reports_btn = gr.Button("🔄 Refresh", variant="secondary", size="sm") lab_start_btn = gr.Button("▶️ Start Build", variant="primary", size="sm") gr.Markdown("---") gr.Markdown("**🔧 Fix Issues**") lab_fix_instruction = gr.Textbox( label="", placeholder="Describe what needs to be fixed (e.g., 'The button click handler is not working')...", lines=2, container=False, ) with gr.Row(): lab_fix_btn = gr.Button("🔧 Apply Fix", variant="secondary", size="sm") lab_happy_btn = gr.Button("✅ Done", variant="secondary", size="sm") lab_export_btn = gr.Button("📥 Export Code", variant="primary") lab_download = gr.File(label="Download", visible=False) with gr.Column(scale=2): gr.Markdown("### 🔬 Experiment Output") lab_output = gr.Markdown( "Describe what you want to build, select a reference report from your bookmarks, then click **Start Build**.\n\n" "If there are errors, describe the issue in the fix box and click **Apply Fix**." ) lab_code_display = gr.Code( label="Generated Code", language="python", visible=False, ) # Preview Interface with gr.Group(visible=False) as preview_panel: gr.Markdown("### 📄 Document Preview") with gr.Row(): with gr.Column(scale=1): preview_source_info = gr.Markdown(""" **Source:** Not selected **Status:** ⏳ Pending **Chunks:** 0 vectors """, elem_classes=["info-box"]) with gr.Column(scale=1, min_width=100): with gr.Row(): preview_copy_btn = gr.Button("📋 Copy", variant="secondary", size="sm", scale=1) preview_download_btn = gr.DownloadButton("📥 Download", variant="secondary", size="sm", scale=1) preview_download_file = gr.File(visible=False) preview_content = gr.Markdown( value="Select a source and analyze it to see the preview here.", elem_classes=["preview-card"] ) # Panel switching logic def switch_panel(choice): return ( gr.Group(visible=("Chat" in choice)), gr.Group(visible=("Preview" in choice)), ) right_panel_mode.change( fn=switch_panel, inputs=[right_panel_mode], outputs=[chat_panel, preview_panel], ) gr.Markdown("---") # ===== HIDDEN STATE COMPONENTS FOR DOWNLOADS ===== github_download = gr.File(label="Download", visible=False) local_download = gr.File(label="Download", visible=False) youtube_download = gr.File(label="Download", visible=False) # ===== EVENT HANDLERS ===== # GitHub handlers github_analyze_btn.click( fn=lambda: "⏳ **Analyzing repository...** Please wait.", outputs=[github_status], ).then( fn=run_github_ingestion, inputs=[github_url], outputs=[preview_content, preview_source_info, github_state], ).then( fn=lambda: "✅ **Analysis complete!** Click **Index for RAG** to enable chat.", outputs=[github_status], ) def index_github_with_status(state): status, new_state = index_github_repo(state) chunks = new_state.get("vector_chunks", 0) if new_state else 0 if "✅" in status or chunks > 0: return f"✅ **Indexed {chunks} vector chunks.** Ready for RAG queries!", new_state return status, new_state def index_local_with_status(state): status, new_state = index_local_repo(state) chunks = new_state.get("vector_chunks", 0) if new_state else 0 if "✅" in status or chunks > 0: return f"✅ **Indexed {chunks} vector chunks.** Ready for RAG queries!", new_state return status, new_state def index_youtube_with_status(state): status, new_state = index_youtube_video(state) chunks = new_state.get("vector_chunks", 0) if new_state else 0 if "✅" in status or chunks > 0: return f"✅ **Indexed {chunks} transcript chunks.** Ready for RAG queries!", new_state return status, new_state github_index_btn.click( fn=lambda: "⏳ **Indexing...** Building vector embeddings.", outputs=[github_status], ).then( fn=index_github_with_status, inputs=[github_state], outputs=[github_status, github_state], ) def bookmark_with_refresh(state): status, new_state, dropdown = bookmark_github_repo(state) choices, meta = get_dropdown_options() return ( "🔖 **Bookmarked!** Added to Knowledge Base for future chat sessions.", new_state, gr.Dropdown(choices=choices, value=None), gr.CheckboxGroup(choices=choices, value=[]), meta ) github_bookmark_btn.click( fn=bookmark_with_refresh, inputs=[github_state], outputs=[github_status, github_state, bookmarks_dropdown, bookmarks_checkboxes, bookmarks_metadata], ) # Local handlers local_analyze_btn.click( fn=lambda: "⏳ **Analyzing project...** Please wait.", outputs=[local_status], ).then( fn=run_local_repo_ingestion, inputs=[local_folder], outputs=[preview_content, preview_source_info, local_state], ).then( fn=lambda: "✅ **Analysis complete!** Click **Index for RAG** to enable chat.", outputs=[local_status], ) local_index_btn.click( fn=lambda: "⏳ **Indexing...** Building vector embeddings.", outputs=[local_status], ).then( fn=index_local_with_status, inputs=[local_state], outputs=[local_status, local_state], ) def bookmark_local_with_refresh(state): status, new_state, dropdown = bookmark_local_repo(state) choices, meta = get_dropdown_options() return ( "🔖 **Bookmarked!** Added to Knowledge Base for future chat sessions.", new_state, gr.Dropdown(choices=choices, value=None), gr.CheckboxGroup(choices=choices, value=[]), meta ) local_bookmark_btn.click( fn=bookmark_local_with_refresh, inputs=[local_state], outputs=[local_status, local_state, bookmarks_dropdown, bookmarks_checkboxes, bookmarks_metadata], ) # YouTube handlers youtube_analyze_btn.click( fn=lambda: "⏳ **Fetching transcript...** Please wait.", outputs=[youtube_status], ).then( fn=run_youtube_ingestion, inputs=[youtube_url], outputs=[preview_content, preview_source_info, youtube_state], ).then( fn=lambda: "✅ **Analysis complete!** Click **Index for RAG** to enable chat.", outputs=[youtube_status], ) youtube_index_btn.click( fn=lambda: "⏳ **Indexing...** Building vector embeddings.", outputs=[youtube_status], ).then( fn=index_youtube_with_status, inputs=[youtube_state], outputs=[youtube_status, youtube_state], ) def bookmark_youtube_with_refresh(state): status, new_state, dropdown = bookmark_youtube_video(state) choices, meta = get_dropdown_options() return ( "🔖 **Bookmarked!** Added to Knowledge Base for future chat sessions.", new_state, gr.Dropdown(choices=choices, value=None), gr.CheckboxGroup(choices=choices, value=[]), meta ) youtube_bookmark_btn.click( fn=bookmark_youtube_with_refresh, inputs=[youtube_state], outputs=[youtube_status, youtube_state, bookmarks_dropdown, bookmarks_checkboxes, bookmarks_metadata], ) # Chat handlers def handle_chat_with_history(question, history, github_s, local_s, youtube_s, selected_sources, meta): # Use first selected source from checkboxes, or fall back to indexed sources bookmark = selected_sources[0] if selected_sources else None answer = answer_chat_question(question, github_s, local_s, youtube_s, bookmark, meta) history = history or [] history = history or [] history.append({"role": "user", "content": question}) history.append({"role": "assistant", "content": answer}) return history, "" send_btn.click( fn=handle_chat_with_history, inputs=[question_box, chatbot, github_state, local_state, youtube_state, bookmarks_checkboxes, bookmarks_metadata], outputs=[chatbot, question_box], ) question_box.submit( fn=handle_chat_with_history, inputs=[question_box, chatbot, github_state, local_state, youtube_state, bookmarks_checkboxes, bookmarks_metadata], outputs=[chatbot, question_box], ) # Chatbot Like/Dislike Handler def handle_chatbot_like(data: gr.LikeData): # Placeholder for future feedback logging print(f"User feedback: {'Liked' if data.liked else 'Disliked'} message index {data.index}") return None chatbot.like( fn=handle_chatbot_like, outputs=None ) # Bookmark handlers - refresh both dropdown and checkboxes def refresh_all_bookmarks(): choices, meta = get_dropdown_options() return ( gr.Dropdown(choices=choices, value=None), gr.CheckboxGroup(choices=choices, value=[]), meta ) refresh_bookmarks_btn.click( fn=refresh_all_bookmarks, outputs=[bookmarks_dropdown, bookmarks_checkboxes, bookmarks_metadata], ) # View All button - show bookmark details def view_all_bookmarks(meta): if not meta: return "*No bookmarks found. Analyze and bookmark repositories to see them here.*" lines = ["**📚 All Bookmarked Sources:**\n"] for m in meta: name = m.get("repo_name", "Unknown") date = m.get("last_pulled_display", "--") chunks = m.get("vector_chunks", 0) lines.append(f"- **{name}** — {date} — {chunks} chunks") return "\n".join(lines) view_all_btn.click( fn=view_all_bookmarks, inputs=[bookmarks_metadata], outputs=[bookmark_info], ) # Update bookmark info when checkboxes change def update_checkbox_info(selected, meta): if not selected: return "*No sources selected. Check sources above to include in chat.*" lines = [f"**{len(selected)} source(s) selected:**\n"] for label in selected: m = find_metadata_by_label(label, meta or []) if m: lines.append(f"- {m.get('repo_name', label)} ({m.get('vector_chunks', 0)} chunks)") return "\n".join(lines) bookmarks_checkboxes.change( fn=update_checkbox_info, inputs=[bookmarks_checkboxes, bookmarks_metadata], outputs=[bookmark_info], ) # Notepad download handlers def download_notepad_as_md(content): if not content or not content.strip(): return None import tempfile with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding='utf-8') as f: f.write(content) return f.name github_notepad_download.click( fn=download_notepad_as_md, inputs=[github_notepad], outputs=[github_notepad_download], ) local_notepad_download.click( fn=download_notepad_as_md, inputs=[local_notepad], outputs=[local_notepad_download], ) youtube_notepad_download.click( fn=download_notepad_as_md, inputs=[youtube_notepad], outputs=[youtube_notepad_download], ) # Preview Copy Handler (JS) preview_copy_btn.click( fn=None, inputs=[preview_content], js="(content) => { navigator.clipboard.writeText(content); return 'Copied!'; }", ) # Preview Download Handler preview_download_btn.click( fn=download_notepad_as_md, # Reusing the md download function inputs=[preview_content], outputs=[preview_download_btn], ) # Lab handlers with improved output def run_lab_with_code(intention, report): result = run_experimental_lab(intention, report) # Extract code if present in result if "```python" in result: code_start = result.find("```python") + 9 code_end = result.find("```", code_start) code = result[code_start:code_end].strip() if code_end > code_start else "" return result, gr.Code(value=code, visible=True) return result, gr.Code(visible=False) lab_start_btn.click( fn=run_lab_with_code, inputs=[lab_intention, lab_report_dropdown], outputs=[lab_output, lab_code_display], ) def fix_lab_with_instruction(intention, report, fix_instruction): # Pass the fix instruction to the fix function combined = f"{intention}\n\nFIX REQUEST: {fix_instruction}" if fix_instruction else intention return lab_fix_bugs(combined, report) lab_fix_btn.click( fn=fix_lab_with_instruction, inputs=[lab_intention, lab_report_dropdown, lab_fix_instruction], outputs=[lab_output], ) lab_happy_btn.click( fn=lab_mark_happy, inputs=[lab_intention, lab_report_dropdown], outputs=[lab_output], ) # Export lab code to file def export_lab_code(code_content): if not code_content: return None import tempfile with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: f.write(code_content) return f.name lab_export_btn.click( fn=export_lab_code, inputs=[lab_code_display], outputs=[lab_download], ).then( fn=lambda: gr.File(visible=True), outputs=[lab_download], ) lab_refresh_reports_btn.click( fn=_refresh_lab_reports_dropdown, outputs=[lab_report_dropdown], ) # Load bookmarks on startup (refresh both dropdown and checkboxes) demo.load( fn=refresh_all_bookmarks, outputs=[bookmarks_dropdown, bookmarks_checkboxes, bookmarks_metadata], ) return demo, (app_theme if IS_GRADIO_V6 else None), (custom_css if IS_GRADIO_V6 else None) if __name__ == "__main__": demo, app_theme, custom_css = build_interface() launch_kwargs = {"share": False} if IS_GRADIO_V6: launch_kwargs.update(theme=app_theme, css=custom_css) demo.launch(**launch_kwargs)