from __future__ import annotations import threading from typing import Any import os from dotenv import load_dotenv from UI import EastSyncInterface from get_final_text import get_final_text load_dotenv() ui = EastSyncInterface() current_thread: threading.Thread | None = None is_run_cancelled = False should_narrate_events = os.getenv("SHOULD_NARRATE_EVENTS", "False").lower() == "true" try: # pragma: no cover - fallback when providers misconfigured from agents.orchestrator_agent import OrchestratorAgent except Exception as exc: # pylint: disable=broad-except OrchestratorAgent = None # type: ignore ORCHESTRATOR_IMPORT_ERROR = str(exc) else: ORCHESTRATOR_IMPORT_ERROR = None try: from agents.narrator_agent import NarratorAgent except Exception as exc: NarratorAgent = None print(f"Narrator Agent unavailable: {exc}") if OrchestratorAgent is not None: try: orchestrator_agent: OrchestratorAgent | None = OrchestratorAgent() orchestrator_error: str | None = None except Exception as exc: # pragma: no cover - best-effort graceful fallback orchestrator_agent = None orchestrator_error = str(exc) else: orchestrator_agent = None orchestrator_error = ORCHESTRATOR_IMPORT_ERROR or "Provider unavailable" if NarratorAgent: narrator_agent = NarratorAgent() else: narrator_agent = None def cancel_run(): global is_run_cancelled, current_thread is_run_cancelled = True if current_thread and current_thread.is_alive(): current_thread.join(timeout=1) # Run the agent using a background task, so it can be cancelled def analyze_and_plan_interface(user_prompt: str): global current_thread, is_run_cancelled # Reset cancel flag for new run is_run_cancelled = False if orchestrator_agent is None: message = orchestrator_error or "Agent unavailable" ui.register_agent_action("System Offline", {"reason": message, "prompt": user_prompt}) return ui.render_error_state(message) # Cancel any existing run, if any if current_thread and current_thread.is_alive() and current_thread != threading.current_thread(): is_run_cancelled = True current_thread = threading.current_thread() def run_agent(): try: result: Any = orchestrator_agent.analyze_and_plan( user_prompt, ui.register_agent_action, get_is_run_cancelled_flag=lambda: is_run_cancelled ) if result is not None: # agent didn't get cancelled, keep running ui.set_analysis_result(result) # Queue the final corny summary to the narrator stream if should_narrate_events and narrator_agent: corny_summary = result.get('corny_summary', '') final_text = get_final_text(corny_summary) if corny_summary: narrator_agent.queue_final_summary(final_text) except Exception as exc: ui.set_analysis_error(str(exc)) finally: # Stop processing when agent thread completes ui.stop_processing() # Start a brand new thread for the new request # Note: start_processing() was already called by the button handler current_thread = threading.Thread(target=run_agent) current_thread.start() # Return processing state - timer will poll for updates return ui.render_project_processing_state() def start_audio_stream(): """ Starts the background narrator thread and returns the audio generator. """ if should_narrate_events: yield from narrator_agent.narrate_event_streaming(ui) def main(): demo = ui.build_interface(analyze_and_plan_interface, cancel_run, start_audio_stream) demo.launch(server_name="0.0.0.0", server_port=7860, share=True) if __name__ == "__main__": main()