Spaces:
Running
Running
Blu3Orange
feat: Introduce argument direction handling and enhance conviction mechanics for juror interactions
373ff24
| """Orchestrator for 12 Angry Agents deliberation. | |
| Handles turn management, speaker selection, and deliberation flow. | |
| """ | |
| import random | |
| from dataclasses import dataclass, field | |
| from typing import TYPE_CHECKING | |
| from core.game_state import GameState, DeliberationTurn, GamePhase, ArgumentDirection | |
| from core.models import JurorConfig, JurorMemory | |
| from core.conviction import calculate_conviction_change, apply_conviction_update | |
| if TYPE_CHECKING: | |
| from agents.smolagent_juror import SmolagentJuror | |
| from case_db.models import CriminalCase | |
| class SpeakerWeight: | |
| """Weight information for speaker selection.""" | |
| juror_id: str | |
| weight: float | |
| reason: str | |
| class TurnResult: | |
| """Result of a single turn in deliberation.""" | |
| turn: DeliberationTurn | |
| conviction_changes: dict[str, float] | |
| vote_changes: list[tuple[str, str, str]] | |
| reasoning_steps: list[str] = field(default_factory=list) | |
| tool_calls: list[str] = field(default_factory=list) | |
| class TurnManager: | |
| """Manages fair speaker selection with weighted queue. | |
| Selection priorities: | |
| 1. Jurors "on the fence" (conviction 0.35-0.65) - most interesting | |
| 2. Jurors who haven't spoken recently - fairness | |
| 3. Jurors with high influence - they drive conversation | |
| 4. Some randomness to keep things unpredictable | |
| """ | |
| ON_FENCE_BONUS = 2.0 | |
| RECENCY_PENALTY = 0.3 | |
| INFLUENCE_WEIGHT = 1.5 | |
| RANDOM_FACTOR = 0.3 | |
| RECENCY_WINDOW = 2 | |
| def __init__(self): | |
| self.speaker_history: list[list[str]] = [] | |
| def select_speakers( | |
| self, | |
| game_state: GameState, | |
| juror_configs: list[JurorConfig], | |
| juror_memories: dict[str, JurorMemory], | |
| num_speakers: int = None, | |
| exclude_player: bool = True | |
| ) -> list[str]: | |
| """Select speakers for the next round using weighted selection.""" | |
| if num_speakers is None: | |
| num_speakers = random.randint(1, 3) | |
| eligible = [ | |
| c for c in juror_configs | |
| if not (exclude_player and c.is_player()) | |
| ] | |
| if not eligible: | |
| return [] | |
| weights = self._calculate_weights( | |
| eligible, | |
| juror_memories, | |
| game_state.round_number | |
| ) | |
| selected = self._weighted_select(weights, min(num_speakers, len(eligible))) | |
| self.speaker_history.append(selected) | |
| game_state.speaking_queue = selected | |
| return selected | |
| def _calculate_weights( | |
| self, | |
| configs: list[JurorConfig], | |
| memories: dict[str, JurorMemory], | |
| current_round: int | |
| ) -> list[SpeakerWeight]: | |
| """Calculate selection weight for each juror.""" | |
| weights = [] | |
| for config in configs: | |
| jid = config.juror_id | |
| memory = memories.get(jid) | |
| base_weight = 0.5 + (config.influence * self.INFLUENCE_WEIGHT) | |
| if memory: | |
| conviction = memory.current_conviction | |
| fence_distance = abs(conviction - 0.5) | |
| if fence_distance < 0.15: | |
| fence_bonus = self.ON_FENCE_BONUS * (1 - fence_distance / 0.15) | |
| else: | |
| fence_bonus = 0.0 | |
| else: | |
| fence_bonus = 0.0 | |
| recency_multiplier = 1.0 | |
| reason_parts = [] | |
| for rounds_ago, speakers in enumerate(reversed(self.speaker_history[-self.RECENCY_WINDOW:])): | |
| if jid in speakers: | |
| penalty = self.RECENCY_PENALTY ** (rounds_ago + 1) | |
| recency_multiplier *= penalty | |
| reason_parts.append(f"spoke {rounds_ago + 1} rounds ago") | |
| break | |
| volatility_bonus = config.volatility * 0.5 | |
| weight = (base_weight + fence_bonus + volatility_bonus) * recency_multiplier | |
| weight += random.uniform(0, self.RANDOM_FACTOR) | |
| reasons = [] | |
| if fence_bonus > 0: | |
| reasons.append(f"on fence (+{fence_bonus:.2f})") | |
| if recency_multiplier < 1.0: | |
| reasons.append(f"recent speaker (x{recency_multiplier:.2f})") | |
| if config.influence > 0.6: | |
| reasons.append("high influence") | |
| if config.volatility > 0.6: | |
| reasons.append("volatile") | |
| weights.append(SpeakerWeight( | |
| juror_id=jid, | |
| weight=max(0.1, weight), | |
| reason=", ".join(reasons) if reasons else "baseline" | |
| )) | |
| return weights | |
| def _weighted_select( | |
| self, | |
| weights: list[SpeakerWeight], | |
| count: int | |
| ) -> list[str]: | |
| """Select jurors using weighted random selection without replacement.""" | |
| selected = [] | |
| remaining = list(weights) | |
| for _ in range(count): | |
| if not remaining: | |
| break | |
| total = sum(w.weight for w in remaining) | |
| if total <= 0: | |
| break | |
| r = random.uniform(0, total) | |
| cumulative = 0 | |
| for i, w in enumerate(remaining): | |
| cumulative += w.weight | |
| if r <= cumulative: | |
| selected.append(w.juror_id) | |
| remaining.pop(i) | |
| break | |
| return selected | |
| def reset(self): | |
| """Reset speaker history for new game.""" | |
| self.speaker_history = [] | |
| class OrchestratorAgent: | |
| """Master agent that coordinates the deliberation. | |
| Handles: | |
| - Game phase transitions | |
| - Turn management and speaker selection | |
| - Processing arguments and reactions | |
| - Vote tracking and stability detection | |
| """ | |
| def __init__( | |
| self, | |
| juror_configs: list[JurorConfig], | |
| juror_agents: dict[str, "SmolagentJuror"], | |
| case: "CriminalCase" | |
| ): | |
| self.juror_configs = juror_configs | |
| self.juror_agents = juror_agents | |
| self.case = case | |
| self.turn_manager = TurnManager() | |
| self.state = GameState(case_id=case.case_id) | |
| for jid, agent in juror_agents.items(): | |
| self.state.votes[jid] = agent.get_vote() | |
| self.state.conviction_scores[jid] = agent.memory.current_conviction | |
| def game_state(self) -> GameState: | |
| """Get current game state.""" | |
| return self.state | |
| def get_juror_memories(self) -> dict[str, JurorMemory]: | |
| """Get memory state for all jurors.""" | |
| return {jid: agent.memory for jid, agent in self.juror_agents.items()} | |
| async def run_deliberation_round( | |
| self, | |
| num_speakers: int = None | |
| ) -> list[TurnResult]: | |
| """Run a single round of deliberation.""" | |
| self.state.round_number += 1 | |
| results = [] | |
| votes_at_start = dict(self.state.votes) | |
| speakers = self.turn_manager.select_speakers( | |
| self.state, | |
| self.juror_configs, | |
| self.get_juror_memories(), | |
| num_speakers=num_speakers, | |
| exclude_player=True | |
| ) | |
| for speaker_id in speakers: | |
| result = await self._process_speaker_turn(speaker_id) | |
| if result: | |
| results.append(result) | |
| if self.state.votes == votes_at_start: | |
| self.state.rounds_without_change += 1 | |
| else: | |
| self.state.rounds_without_change = 0 | |
| return results | |
| async def _process_speaker_turn(self, speaker_id: str) -> TurnResult | None: | |
| """Process a single speaker's turn.""" | |
| agent = self.juror_agents.get(speaker_id) | |
| if not agent: | |
| return None | |
| try: | |
| # Generate argument - SmolagentJuror always returns (turn, reasoning_steps) | |
| turn, reasoning_data = await agent.generate_argument(self.case, self.state) | |
| # Extract reasoning steps for UI | |
| reasoning_steps = [] | |
| if reasoning_data: | |
| reasoning_steps = [ | |
| f"Step {s.step_number}: {s.action} - {s.content[:100]}" | |
| if hasattr(s, 'step_number') else str(s) | |
| for s in reasoning_data | |
| ] | |
| # Extract tool calls | |
| tool_calls = agent.last_tool_calls if hasattr(agent, 'last_tool_calls') else [] | |
| # Select active listeners for full processing | |
| active_listeners = self._select_active_listeners(turn) | |
| # Process reactions from other jurors | |
| conviction_changes = {} | |
| vote_changes = [] | |
| for other_id, other_agent in self.juror_agents.items(): | |
| if other_id == speaker_id: | |
| continue | |
| old_vote = self.state.votes.get(other_id) | |
| # Calculate base strength based on speaker influence | |
| base_strength = 0.08 + (agent.config.influence * 0.07) | |
| if other_id in active_listeners: | |
| base_strength *= 1.2 | |
| # Use direction-aware conviction calculation | |
| delta = calculate_conviction_change( | |
| other_agent.config, | |
| other_agent.memory, | |
| turn, # turn now includes direction | |
| base_strength=base_strength | |
| ) | |
| # Store argument in memory (don't apply delta here - do it via apply_conviction_update) | |
| other_agent.receive_argument(turn, 0.0) | |
| conviction_changes[other_id] = delta | |
| turn.impact[other_id] = delta | |
| # Apply conviction update with hysteresis | |
| vote_flipped, new_vote = apply_conviction_update(other_agent.memory, delta) | |
| if vote_flipped and new_vote: | |
| self.state.votes[other_id] = new_vote | |
| vote_changes.append((other_id, old_vote, new_vote)) | |
| self.state.conviction_scores[other_id] = other_agent.memory.current_conviction | |
| self.state.deliberation_log.append(turn) | |
| return TurnResult( | |
| turn=turn, | |
| conviction_changes=conviction_changes, | |
| vote_changes=vote_changes, | |
| reasoning_steps=reasoning_steps, | |
| tool_calls=tool_calls, | |
| ) | |
| except Exception as e: | |
| print(f"Error processing turn for {speaker_id}: {e}") | |
| return None | |
| def _select_active_listeners( | |
| self, | |
| turn: DeliberationTurn, | |
| max_active: int = 3 | |
| ) -> list[str]: | |
| """Select jurors for full agent processing (active listeners).""" | |
| active = [] | |
| for jid, agent in self.juror_agents.items(): | |
| if jid == turn.speaker_id: | |
| continue | |
| if 0.35 < agent.memory.current_conviction < 0.65: | |
| active.append((jid, 3)) | |
| elif agent.config.influence > 0.7: | |
| active.append((jid, 2)) | |
| elif turn.target_id == jid: | |
| active.append((jid, 3)) | |
| elif len(agent.memory.conviction_history) > 1: | |
| recent_change = abs( | |
| agent.memory.conviction_history[-1] - | |
| agent.memory.conviction_history[-2] | |
| ) if len(agent.memory.conviction_history) >= 2 else 0 | |
| if recent_change > 0.1: | |
| active.append((jid, 2)) | |
| else: | |
| active.append((jid, 1)) | |
| active.sort(key=lambda x: x[1], reverse=True) | |
| return [jid for jid, _ in active[:max_active]] | |
| def process_player_argument( | |
| self, | |
| content: str, | |
| argument_type: str, | |
| target_id: str | None = None | |
| ) -> TurnResult: | |
| """Process an argument from the human player.""" | |
| # Determine direction from player's chosen side | |
| direction = ( | |
| ArgumentDirection.PROSECUTION | |
| if self.state.player_side == "prosecute" | |
| else ArgumentDirection.DEFENSE | |
| ) | |
| turn = DeliberationTurn( | |
| round_number=self.state.round_number, | |
| speaker_id="juror_7", | |
| speaker_name="You", | |
| argument_type=argument_type, | |
| direction=direction, | |
| content=content, | |
| target_id=target_id | |
| ) | |
| conviction_changes = {} | |
| vote_changes = [] | |
| for juror_id, agent in self.juror_agents.items(): | |
| old_vote = self.state.votes.get(juror_id) | |
| # Calculate base strength (player has moderate influence) | |
| base_strength = 0.10 | |
| if target_id == juror_id: | |
| base_strength *= 1.5 | |
| delta = calculate_conviction_change( | |
| agent.config, | |
| agent.memory, | |
| turn, | |
| base_strength=base_strength | |
| ) | |
| # Store argument in memory (don't apply delta here) | |
| agent.receive_argument(turn, 0.0) | |
| conviction_changes[juror_id] = delta | |
| turn.impact[juror_id] = delta | |
| # Apply conviction update with hysteresis | |
| vote_flipped, new_vote = apply_conviction_update(agent.memory, delta) | |
| if vote_flipped and new_vote: | |
| self.state.votes[juror_id] = new_vote | |
| vote_changes.append((juror_id, old_vote, new_vote)) | |
| self.state.conviction_scores[juror_id] = agent.memory.current_conviction | |
| self.state.deliberation_log.append(turn) | |
| return TurnResult( | |
| turn=turn, | |
| conviction_changes=conviction_changes, | |
| vote_changes=vote_changes | |
| ) | |
| def process_external_argument( | |
| self, | |
| speaker_id: str, | |
| speaker_name: str, | |
| content: str, | |
| direction: ArgumentDirection, # REQUIRED - no fallback | |
| argument_type: str = "logical", | |
| target_id: str | None = None | |
| ) -> TurnResult: | |
| """Process an argument from an external MCP agent. | |
| Similar to process_player_argument but with configurable speaker identity. | |
| Used when external AI agents participate via MCP. | |
| Args: | |
| speaker_id: Juror seat ID (e.g., "juror_3") | |
| speaker_name: Display name for the speaker | |
| content: Argument text | |
| direction: REQUIRED - "prosecution", "defense", or "neutral" | |
| argument_type: Type of argument (default: "logical") | |
| target_id: Optional juror_id to address directly | |
| Returns: | |
| TurnResult with conviction changes and vote changes | |
| """ | |
| turn = DeliberationTurn( | |
| round_number=self.state.round_number, | |
| speaker_id=speaker_id, | |
| speaker_name=speaker_name, | |
| argument_type=argument_type, | |
| direction=direction, | |
| content=content, | |
| target_id=target_id | |
| ) | |
| conviction_changes = {} | |
| vote_changes = [] | |
| for juror_id, agent in self.juror_agents.items(): | |
| if juror_id == speaker_id: | |
| continue | |
| old_vote = self.state.votes.get(juror_id) | |
| # Calculate base strength (external agents have moderate influence) | |
| base_strength = 0.10 | |
| if target_id == juror_id: | |
| base_strength *= 1.5 | |
| delta = calculate_conviction_change( | |
| agent.config, | |
| agent.memory, | |
| turn, | |
| base_strength=base_strength | |
| ) | |
| # Store argument in memory (don't apply delta here) | |
| agent.receive_argument(turn, 0.0) | |
| conviction_changes[juror_id] = delta | |
| turn.impact[juror_id] = delta | |
| # Apply conviction update with hysteresis | |
| vote_flipped, new_vote = apply_conviction_update(agent.memory, delta) | |
| if vote_flipped and new_vote: | |
| self.state.votes[juror_id] = new_vote | |
| vote_changes.append((juror_id, old_vote, new_vote)) | |
| self.state.conviction_scores[juror_id] = agent.memory.current_conviction | |
| self.state.deliberation_log.append(turn) | |
| return TurnResult( | |
| turn=turn, | |
| conviction_changes=conviction_changes, | |
| vote_changes=vote_changes | |
| ) | |
| def set_player_side(self, side: str) -> None: | |
| """Set the player's chosen side.""" | |
| self.state.player_side = side | |
| self.state.phase = GamePhase.DELIBERATION | |
| player_vote = "guilty" if side == "prosecute" else "not_guilty" | |
| self.state.votes["juror_7"] = player_vote | |
| self.state.conviction_scores["juror_7"] = 0.8 if side == "prosecute" else 0.2 | |
| def check_should_end(self) -> bool: | |
| """Check if deliberation should end.""" | |
| return self.state.should_end_deliberation() | |
| def get_verdict(self) -> dict: | |
| """Get the final verdict information.""" | |
| guilty, not_guilty = self.state.get_vote_tally() | |
| if self.state.is_unanimous(): | |
| verdict = "GUILTY" if guilty == 12 else "NOT GUILTY" | |
| unanimous = True | |
| else: | |
| verdict = "HUNG JURY" | |
| unanimous = False | |
| return { | |
| "verdict": verdict, | |
| "unanimous": unanimous, | |
| "guilty_count": guilty, | |
| "not_guilty_count": not_guilty, | |
| "rounds": self.state.round_number, | |
| "ended_by": self._get_end_reason() | |
| } | |
| def _get_end_reason(self) -> str: | |
| """Get the reason deliberation ended.""" | |
| if self.state.is_unanimous(): | |
| return "unanimous_verdict" | |
| elif self.state.rounds_without_change >= self.state.stability_threshold: | |
| return "votes_stabilized" | |
| elif self.state.round_number >= self.state.max_rounds: | |
| return "max_rounds_reached" | |
| return "unknown" | |
| def reset(self) -> None: | |
| """Reset for a new game.""" | |
| self.turn_manager.reset() | |
| self.state = GameState(case_id=self.case.case_id if self.case else "") | |