"""Orchestrator for 12 Angry Agents deliberation. Handles turn management, speaker selection, and deliberation flow. """ import random from dataclasses import dataclass, field from typing import TYPE_CHECKING from core.game_state import GameState, DeliberationTurn, GamePhase, ArgumentDirection from core.models import JurorConfig, JurorMemory from core.conviction import calculate_conviction_change, apply_conviction_update if TYPE_CHECKING: from agents.smolagent_juror import SmolagentJuror from case_db.models import CriminalCase @dataclass class SpeakerWeight: """Weight information for speaker selection.""" juror_id: str weight: float reason: str @dataclass class TurnResult: """Result of a single turn in deliberation.""" turn: DeliberationTurn conviction_changes: dict[str, float] vote_changes: list[tuple[str, str, str]] reasoning_steps: list[str] = field(default_factory=list) tool_calls: list[str] = field(default_factory=list) class TurnManager: """Manages fair speaker selection with weighted queue. Selection priorities: 1. Jurors "on the fence" (conviction 0.35-0.65) - most interesting 2. Jurors who haven't spoken recently - fairness 3. Jurors with high influence - they drive conversation 4. Some randomness to keep things unpredictable """ ON_FENCE_BONUS = 2.0 RECENCY_PENALTY = 0.3 INFLUENCE_WEIGHT = 1.5 RANDOM_FACTOR = 0.3 RECENCY_WINDOW = 2 def __init__(self): self.speaker_history: list[list[str]] = [] def select_speakers( self, game_state: GameState, juror_configs: list[JurorConfig], juror_memories: dict[str, JurorMemory], num_speakers: int = None, exclude_player: bool = True ) -> list[str]: """Select speakers for the next round using weighted selection.""" if num_speakers is None: num_speakers = random.randint(1, 3) eligible = [ c for c in juror_configs if not (exclude_player and c.is_player()) ] if not eligible: return [] weights = self._calculate_weights( eligible, juror_memories, game_state.round_number ) selected = self._weighted_select(weights, min(num_speakers, len(eligible))) self.speaker_history.append(selected) game_state.speaking_queue = selected return selected def _calculate_weights( self, configs: list[JurorConfig], memories: dict[str, JurorMemory], current_round: int ) -> list[SpeakerWeight]: """Calculate selection weight for each juror.""" weights = [] for config in configs: jid = config.juror_id memory = memories.get(jid) base_weight = 0.5 + (config.influence * self.INFLUENCE_WEIGHT) if memory: conviction = memory.current_conviction fence_distance = abs(conviction - 0.5) if fence_distance < 0.15: fence_bonus = self.ON_FENCE_BONUS * (1 - fence_distance / 0.15) else: fence_bonus = 0.0 else: fence_bonus = 0.0 recency_multiplier = 1.0 reason_parts = [] for rounds_ago, speakers in enumerate(reversed(self.speaker_history[-self.RECENCY_WINDOW:])): if jid in speakers: penalty = self.RECENCY_PENALTY ** (rounds_ago + 1) recency_multiplier *= penalty reason_parts.append(f"spoke {rounds_ago + 1} rounds ago") break volatility_bonus = config.volatility * 0.5 weight = (base_weight + fence_bonus + volatility_bonus) * recency_multiplier weight += random.uniform(0, self.RANDOM_FACTOR) reasons = [] if fence_bonus > 0: reasons.append(f"on fence (+{fence_bonus:.2f})") if recency_multiplier < 1.0: reasons.append(f"recent speaker (x{recency_multiplier:.2f})") if config.influence > 0.6: reasons.append("high influence") if config.volatility > 0.6: reasons.append("volatile") weights.append(SpeakerWeight( juror_id=jid, weight=max(0.1, weight), reason=", ".join(reasons) if reasons else "baseline" )) return weights def _weighted_select( self, weights: list[SpeakerWeight], count: int ) -> list[str]: """Select jurors using weighted random selection without replacement.""" selected = [] remaining = list(weights) for _ in range(count): if not remaining: break total = sum(w.weight for w in remaining) if total <= 0: break r = random.uniform(0, total) cumulative = 0 for i, w in enumerate(remaining): cumulative += w.weight if r <= cumulative: selected.append(w.juror_id) remaining.pop(i) break return selected def reset(self): """Reset speaker history for new game.""" self.speaker_history = [] class OrchestratorAgent: """Master agent that coordinates the deliberation. Handles: - Game phase transitions - Turn management and speaker selection - Processing arguments and reactions - Vote tracking and stability detection """ def __init__( self, juror_configs: list[JurorConfig], juror_agents: dict[str, "SmolagentJuror"], case: "CriminalCase" ): self.juror_configs = juror_configs self.juror_agents = juror_agents self.case = case self.turn_manager = TurnManager() self.state = GameState(case_id=case.case_id) for jid, agent in juror_agents.items(): self.state.votes[jid] = agent.get_vote() self.state.conviction_scores[jid] = agent.memory.current_conviction @property def game_state(self) -> GameState: """Get current game state.""" return self.state def get_juror_memories(self) -> dict[str, JurorMemory]: """Get memory state for all jurors.""" return {jid: agent.memory for jid, agent in self.juror_agents.items()} async def run_deliberation_round( self, num_speakers: int = None ) -> list[TurnResult]: """Run a single round of deliberation.""" self.state.round_number += 1 results = [] votes_at_start = dict(self.state.votes) speakers = self.turn_manager.select_speakers( self.state, self.juror_configs, self.get_juror_memories(), num_speakers=num_speakers, exclude_player=True ) for speaker_id in speakers: result = await self._process_speaker_turn(speaker_id) if result: results.append(result) if self.state.votes == votes_at_start: self.state.rounds_without_change += 1 else: self.state.rounds_without_change = 0 return results async def _process_speaker_turn(self, speaker_id: str) -> TurnResult | None: """Process a single speaker's turn.""" agent = self.juror_agents.get(speaker_id) if not agent: return None try: # Generate argument - SmolagentJuror always returns (turn, reasoning_steps) turn, reasoning_data = await agent.generate_argument(self.case, self.state) # Extract reasoning steps for UI reasoning_steps = [] if reasoning_data: reasoning_steps = [ f"Step {s.step_number}: {s.action} - {s.content[:100]}" if hasattr(s, 'step_number') else str(s) for s in reasoning_data ] # Extract tool calls tool_calls = agent.last_tool_calls if hasattr(agent, 'last_tool_calls') else [] # Select active listeners for full processing active_listeners = self._select_active_listeners(turn) # Process reactions from other jurors conviction_changes = {} vote_changes = [] for other_id, other_agent in self.juror_agents.items(): if other_id == speaker_id: continue old_vote = self.state.votes.get(other_id) # Calculate base strength based on speaker influence base_strength = 0.08 + (agent.config.influence * 0.07) if other_id in active_listeners: base_strength *= 1.2 # Use direction-aware conviction calculation delta = calculate_conviction_change( other_agent.config, other_agent.memory, turn, # turn now includes direction base_strength=base_strength ) # Store argument in memory (don't apply delta here - do it via apply_conviction_update) other_agent.receive_argument(turn, 0.0) conviction_changes[other_id] = delta turn.impact[other_id] = delta # Apply conviction update with hysteresis vote_flipped, new_vote = apply_conviction_update(other_agent.memory, delta) if vote_flipped and new_vote: self.state.votes[other_id] = new_vote vote_changes.append((other_id, old_vote, new_vote)) self.state.conviction_scores[other_id] = other_agent.memory.current_conviction self.state.deliberation_log.append(turn) return TurnResult( turn=turn, conviction_changes=conviction_changes, vote_changes=vote_changes, reasoning_steps=reasoning_steps, tool_calls=tool_calls, ) except Exception as e: print(f"Error processing turn for {speaker_id}: {e}") return None def _select_active_listeners( self, turn: DeliberationTurn, max_active: int = 3 ) -> list[str]: """Select jurors for full agent processing (active listeners).""" active = [] for jid, agent in self.juror_agents.items(): if jid == turn.speaker_id: continue if 0.35 < agent.memory.current_conviction < 0.65: active.append((jid, 3)) elif agent.config.influence > 0.7: active.append((jid, 2)) elif turn.target_id == jid: active.append((jid, 3)) elif len(agent.memory.conviction_history) > 1: recent_change = abs( agent.memory.conviction_history[-1] - agent.memory.conviction_history[-2] ) if len(agent.memory.conviction_history) >= 2 else 0 if recent_change > 0.1: active.append((jid, 2)) else: active.append((jid, 1)) active.sort(key=lambda x: x[1], reverse=True) return [jid for jid, _ in active[:max_active]] def process_player_argument( self, content: str, argument_type: str, target_id: str | None = None ) -> TurnResult: """Process an argument from the human player.""" # Determine direction from player's chosen side direction = ( ArgumentDirection.PROSECUTION if self.state.player_side == "prosecute" else ArgumentDirection.DEFENSE ) turn = DeliberationTurn( round_number=self.state.round_number, speaker_id="juror_7", speaker_name="You", argument_type=argument_type, direction=direction, content=content, target_id=target_id ) conviction_changes = {} vote_changes = [] for juror_id, agent in self.juror_agents.items(): old_vote = self.state.votes.get(juror_id) # Calculate base strength (player has moderate influence) base_strength = 0.10 if target_id == juror_id: base_strength *= 1.5 delta = calculate_conviction_change( agent.config, agent.memory, turn, base_strength=base_strength ) # Store argument in memory (don't apply delta here) agent.receive_argument(turn, 0.0) conviction_changes[juror_id] = delta turn.impact[juror_id] = delta # Apply conviction update with hysteresis vote_flipped, new_vote = apply_conviction_update(agent.memory, delta) if vote_flipped and new_vote: self.state.votes[juror_id] = new_vote vote_changes.append((juror_id, old_vote, new_vote)) self.state.conviction_scores[juror_id] = agent.memory.current_conviction self.state.deliberation_log.append(turn) return TurnResult( turn=turn, conviction_changes=conviction_changes, vote_changes=vote_changes ) def process_external_argument( self, speaker_id: str, speaker_name: str, content: str, direction: ArgumentDirection, # REQUIRED - no fallback argument_type: str = "logical", target_id: str | None = None ) -> TurnResult: """Process an argument from an external MCP agent. Similar to process_player_argument but with configurable speaker identity. Used when external AI agents participate via MCP. Args: speaker_id: Juror seat ID (e.g., "juror_3") speaker_name: Display name for the speaker content: Argument text direction: REQUIRED - "prosecution", "defense", or "neutral" argument_type: Type of argument (default: "logical") target_id: Optional juror_id to address directly Returns: TurnResult with conviction changes and vote changes """ turn = DeliberationTurn( round_number=self.state.round_number, speaker_id=speaker_id, speaker_name=speaker_name, argument_type=argument_type, direction=direction, content=content, target_id=target_id ) conviction_changes = {} vote_changes = [] for juror_id, agent in self.juror_agents.items(): if juror_id == speaker_id: continue old_vote = self.state.votes.get(juror_id) # Calculate base strength (external agents have moderate influence) base_strength = 0.10 if target_id == juror_id: base_strength *= 1.5 delta = calculate_conviction_change( agent.config, agent.memory, turn, base_strength=base_strength ) # Store argument in memory (don't apply delta here) agent.receive_argument(turn, 0.0) conviction_changes[juror_id] = delta turn.impact[juror_id] = delta # Apply conviction update with hysteresis vote_flipped, new_vote = apply_conviction_update(agent.memory, delta) if vote_flipped and new_vote: self.state.votes[juror_id] = new_vote vote_changes.append((juror_id, old_vote, new_vote)) self.state.conviction_scores[juror_id] = agent.memory.current_conviction self.state.deliberation_log.append(turn) return TurnResult( turn=turn, conviction_changes=conviction_changes, vote_changes=vote_changes ) def set_player_side(self, side: str) -> None: """Set the player's chosen side.""" self.state.player_side = side self.state.phase = GamePhase.DELIBERATION player_vote = "guilty" if side == "prosecute" else "not_guilty" self.state.votes["juror_7"] = player_vote self.state.conviction_scores["juror_7"] = 0.8 if side == "prosecute" else 0.2 def check_should_end(self) -> bool: """Check if deliberation should end.""" return self.state.should_end_deliberation() def get_verdict(self) -> dict: """Get the final verdict information.""" guilty, not_guilty = self.state.get_vote_tally() if self.state.is_unanimous(): verdict = "GUILTY" if guilty == 12 else "NOT GUILTY" unanimous = True else: verdict = "HUNG JURY" unanimous = False return { "verdict": verdict, "unanimous": unanimous, "guilty_count": guilty, "not_guilty_count": not_guilty, "rounds": self.state.round_number, "ended_by": self._get_end_reason() } def _get_end_reason(self) -> str: """Get the reason deliberation ended.""" if self.state.is_unanimous(): return "unanimous_verdict" elif self.state.rounds_without_change >= self.state.stability_threshold: return "votes_stabilized" elif self.state.round_number >= self.state.max_rounds: return "max_rounds_reached" return "unknown" def reset(self) -> None: """Reset for a new game.""" self.turn_manager.reset() self.state = GameState(case_id=self.case.case_id if self.case else "")