"""
Screen 4: Trace Detail View
Shows detailed OpenTelemetry trace visualization
"""
import gradio as gr
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from datetime import datetime
import pandas as pd
from typing import Optional, Callable, Dict, Any, List
from components.thought_graph import create_thought_graph
def create_trace_detail_screen(
trace_data: dict,
on_back: Optional[Callable] = None,
mcp_qa_enabled: bool = True
) -> gr.Blocks:
"""
Create the trace detail screen UI
Args:
trace_data: OpenTelemetry trace data
on_back: Callback for back button
mcp_qa_enabled: Enable MCP Q&A tool
Returns:
Gradio Blocks for trace detail screen
"""
with gr.Blocks() as trace_detail:
with gr.Row():
if on_back:
back_btn = gr.Button("⬅️ Back to Run Detail", variant="secondary", size="sm")
gr.Markdown(f"# 🔍 Trace Detail: {trace_data.get('trace_id', 'Unknown')}")
# Safely extract spans
spans = trace_data.get('spans', [])
if hasattr(spans, 'tolist'):
spans = spans.tolist()
elif not isinstance(spans, list):
spans = list(spans) if spans is not None else []
# Trace metadata
with gr.Row():
gr.Markdown(f"""
**Trace ID:** `{trace_data.get('trace_id', 'N/A')}`
**Total Spans:** {len(spans)}
""")
# Tabs for different visualizations
with gr.Tabs() as tabs:
# Tab 1: Thought Graph (STAR FEATURE!)
with gr.Tab("🧠 Thought Graph"):
gr.Markdown("""
### Agent Reasoning Flow
This graph visualizes how your agent thinks - showing the flow of reasoning steps,
tool calls, and LLM interactions as a network.
**Node Colors:**
- 🟣 Purple: LLM reasoning steps
- 🟠 Orange: Tool calls
- 🔵 Blue: Chains/Agents
- 🔴 Red: Errors
""")
# Create and display thought graph
thought_graph_plot = gr.Plot(
value=create_thought_graph(spans, trace_data.get('trace_id', 'Unknown')),
label=""
)
# Tab 2: Execution Timeline (Waterfall)
with gr.Tab("⏱️ Execution Timeline"):
gr.Markdown("""
### Waterfall Chart
Timeline view showing when each span executed and for how long.
""")
# Span visualization
span_viz = gr.Plot(
value=create_span_visualization(spans, trace_data.get('trace_id', 'Unknown')),
label=""
)
# Tab 3: Span Details
with gr.Tab("📋 Span Details"):
gr.Markdown("""
### Detailed Span Information
Raw span data with attributes, status, and metadata.
""")
# Span details table
span_table = create_span_table(spans)
# MCP Q&A Tool (below tabs)
gr.Markdown("---")
if mcp_qa_enabled:
with gr.Accordion("🤖 Ask About This Trace", open=False):
question_input = gr.Textbox(
label="Question",
placeholder="e.g., Why was the tool called twice? What tool did the agent use first?",
lines=2,
info="Ask questions about this trace execution, tool usage, or agent behavior"
)
ask_btn = gr.Button("Ask", variant="primary")
answer_output = gr.Markdown("*Ask a question to get AI-powered insights*")
# Wire up MCP Q&A (placeholder for now)
ask_btn.click(
fn=lambda q: f"**Answer:** This is a placeholder. MCP integration coming soon.\n\n**Your question:** {q}",
inputs=[question_input],
outputs=[answer_output]
)
# Wire up events
if on_back:
back_btn.click(fn=on_back, inputs=[], outputs=[])
return trace_detail
def process_trace_data(spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process trace spans for waterfall visualization"""
# Ensure spans is a list
if hasattr(spans, 'tolist'):
spans = spans.tolist()
elif not isinstance(spans, list):
spans = list(spans) if spans is not None else []
if not spans:
return []
# Helper function to get timestamp from span (handles different field names)
def get_timestamp(span, field_name):
"""Get timestamp handling different OpenTelemetry field name variations"""
# Try different variations of field names
variations = [
field_name, # e.g., 'startTime'
field_name.lower(), # e.g., 'starttime'
field_name.replace('Time', 'TimeUnixNano'), # e.g., 'startTimeUnixNano'
field_name[0].lower() + field_name[1:], # e.g., 'startTime'
# Add snake_case variations (start_time, end_time)
field_name.replace('Time', '_time').lower(), # e.g., 'start_time'
field_name.replace('Time', '_time_unix_nano').lower(), # e.g., 'start_time_unix_nano'
]
for var in variations:
if var in span:
value = span[var]
# Handle both string and numeric timestamps
if isinstance(value, str):
return int(value)
return value
# If not found, return 0
return 0
# Calculate relative times
start_times = [get_timestamp(span, 'startTime') for span in spans]
min_start = min(start_times) if start_times else 0
max_start = max(start_times) if start_times else 0
# Check if we have any actual timing data
has_timing_data = min_start > 0 or max_start > 0
# Debug: Print first span's raw timestamps
if spans:
first_span = spans[0]
print(f"[DEBUG] First span raw data sample:")
print(f" startTime field: {first_span.get('startTime', 'NOT FOUND')}")
print(f" endTime field: {first_span.get('endTime', 'NOT FOUND')}")
print(f" startTimeUnixNano field: {first_span.get('startTimeUnixNano', 'NOT FOUND')}")
print(f" endTimeUnixNano field: {first_span.get('endTimeUnixNano', 'NOT FOUND')}")
print(f" HAS_TIMING_DATA: {has_timing_data}")
if 'attributes' in first_span:
attrs = first_span['attributes']
print(f" Sample attributes: {list(attrs.keys())[:5] if isinstance(attrs, dict) else 'N/A'}")
if isinstance(attrs, dict):
# Check for cost fields
cost_fields = [k for k in attrs.keys() if 'cost' in k.lower() or 'price' in k.lower()]
if cost_fields:
print(f" Cost-related fields found: {cost_fields}")
# Auto-detect timestamp unit based on magnitude
time_divisor = 1000000 # Default: assume nanoseconds, convert to milliseconds
if start_times and min_start > 0:
# If timestamp is > 1e15, it's likely nanoseconds
# If timestamp is > 1e12, it's likely microseconds
# If timestamp is > 1e9, it's likely milliseconds
# If timestamp is < 1e9, it's likely seconds
if min_start > 1e15:
time_divisor = 1000000 # nanoseconds to milliseconds
time_unit = "nanoseconds"
elif min_start > 1e12:
time_divisor = 1000 # microseconds to milliseconds
time_unit = "microseconds"
elif min_start > 1e9:
time_divisor = 1 # already in milliseconds
time_unit = "milliseconds"
else:
time_divisor = 0.001 # seconds to milliseconds
time_unit = "seconds"
print(f"[DEBUG] Auto-detected timestamp unit: {time_unit} (min_start={min_start}, divisor={time_divisor})")
processed_spans = []
for idx, span in enumerate(spans):
start_time = get_timestamp(span, 'startTime')
end_time = get_timestamp(span, 'endTime')
# Calculate relative start
relative_start = (start_time - min_start) / time_divisor if has_timing_data else 0
# Calculate duration - prefer duration_ms if available
if 'duration_ms' in span and span['duration_ms'] is not None:
actual_duration = float(span['duration_ms'])
else:
actual_duration = (end_time - start_time) / time_divisor
# Debug: Print first few durations
if idx < 3:
duration_source = 'duration_ms' if 'duration_ms' in span else 'calculated'
print(f"[DEBUG] Span {idx}: start={start_time}, end={end_time}, duration={actual_duration:.3f}ms ({duration_source})")
# Handle span ID variations
span_id = span.get('spanId') or span.get('span_id') or span.get('spanID') or f'span_{idx}'
parent_id = span.get('parentSpanId') or span.get('parent_span_id') or span.get('parentSpanID')
# Get span kind - check both top-level and OpenInference attributes
span_kind = span.get('kind', 'INTERNAL')
attributes = span.get('attributes', {})
# Check for OpenInference span kind in attributes
if isinstance(attributes, dict) and 'openinference.span.kind' in attributes:
openinference_kind = attributes.get('openinference.span.kind')
# Map OpenInference kinds to OpenTelemetry kinds for consistency
# OpenInference kinds: CHAIN, TOOL, LLM, RETRIEVER, EMBEDDING, AGENT, etc.
if openinference_kind:
span_kind = openinference_kind.upper()
# Extract token and cost information from attributes
token_info = {}
cost_info = {}
if isinstance(attributes, dict):
# Helper to safely extract numeric values
def safe_numeric(value):
"""Safely convert to numeric, return None if invalid"""
if value is None:
return None
try:
if isinstance(value, (int, float)):
return value
return float(value)
except (ValueError, TypeError):
return None
# Check for token usage (various formats)
prompt_tokens = None
completion_tokens = None
if 'gen_ai.usage.prompt_tokens' in attributes:
prompt_tokens = safe_numeric(attributes['gen_ai.usage.prompt_tokens'])
if 'gen_ai.usage.completion_tokens' in attributes:
completion_tokens = safe_numeric(attributes['gen_ai.usage.completion_tokens'])
if 'llm.token_count.prompt' in attributes and prompt_tokens is None:
prompt_tokens = safe_numeric(attributes['llm.token_count.prompt'])
if 'llm.token_count.completion' in attributes and completion_tokens is None:
completion_tokens = safe_numeric(attributes['llm.token_count.completion'])
# Store valid token counts
if prompt_tokens is not None:
token_info['prompt_tokens'] = int(prompt_tokens)
if completion_tokens is not None:
token_info['completion_tokens'] = int(completion_tokens)
# Calculate total tokens
if 'prompt_tokens' in token_info and 'completion_tokens' in token_info:
token_info['total_tokens'] = token_info['prompt_tokens'] + token_info['completion_tokens']
elif 'llm.usage.total_tokens' in attributes:
total = safe_numeric(attributes['llm.usage.total_tokens'])
if total is not None:
token_info['total_tokens'] = int(total)
# Check for cost information (various formats)
if 'gen_ai.usage.cost.total' in attributes:
cost = safe_numeric(attributes['gen_ai.usage.cost.total'])
if cost is not None:
cost_info['total_cost'] = cost
elif 'llm.usage.cost' in attributes:
cost = safe_numeric(attributes['llm.usage.cost'])
if cost is not None:
cost_info['total_cost'] = cost
# Debug: Print cost info for LLM spans
if idx < 2 and span_kind == 'LLM':
print(f"[DEBUG] LLM Span {idx} cost extraction:")
print(f" gen_ai.usage.cost.total: {attributes.get('gen_ai.usage.cost.total', 'NOT FOUND')}")
print(f" llm.usage.cost: {attributes.get('llm.usage.cost', 'NOT FOUND')}")
print(f" cost_info: {cost_info}")
# Store actual duration for tooltip, use minimum for visualization
display_duration = max(actual_duration, 0.1) # Minimum width for visibility
processed_spans.append({
'span_id': span_id,
'parent_id': parent_id,
'name': span.get('name', 'Unknown'),
'kind': span_kind,
'start_time': relative_start,
'duration': display_duration, # For bar width
'actual_duration': actual_duration, # For tooltip
'end_time': relative_start + actual_duration, # Use actual for end time
'attributes': attributes,
'status': span.get('status', {}).get('code', 'UNKNOWN'),
'tokens': token_info,
'cost': cost_info
})
print(f"[DEBUG] Total spans in input: {len(spans)}")
print(f"[DEBUG] Processed spans: {len(processed_spans)}")
# Debug: Show span kinds and statuses detected
span_kinds = {}
span_statuses = {}
durations = []
spans_with_tokens = 0
spans_with_cost = 0
for span in processed_spans:
kind = span['kind']
status = span['status']
span_kinds[kind] = span_kinds.get(kind, 0) + 1
span_statuses[status] = span_statuses.get(status, 0) + 1
durations.append(span['actual_duration'])
if span['tokens']:
spans_with_tokens += 1
if span['cost']:
spans_with_cost += 1
print(f"[DEBUG] Span kinds detected: {span_kinds}")
print(f"[DEBUG] Span statuses detected: {span_statuses}")
if durations:
print(f"[DEBUG] Duration range: {min(durations):.3f}ms - {max(durations):.3f}ms")
print(f"[DEBUG] Spans with token info: {spans_with_tokens}/{len(processed_spans)}")
print(f"[DEBUG] Spans with cost info: {spans_with_cost}/{len(processed_spans)}")
return processed_spans
def create_span_visualization(spans: List[Dict[str, Any]], trace_id: str = "Unknown") -> go.Figure:
"""Create an interactive Plotly waterfall visualization of spans"""
processed_spans = process_trace_data(spans)
print(f"[DEBUG] create_span_visualization - Received {len(spans)} spans")
print(f"[DEBUG] create_span_visualization - Processed {len(processed_spans)} spans")
if not processed_spans:
# Return empty figure with message
fig = go.Figure()
fig.add_annotation(
text="No spans to display",
xref="paper", yref="paper",
x=0.5, y=0.5, xanchor='center', yanchor='middle',
showarrow=False,
font=dict(size=20)
)
return fig
# Sort spans by start time for better visualization
processed_spans.sort(key=lambda x: x['start_time'])
# Create unique labels for each span (include index to ensure uniqueness)
for idx, span in enumerate(processed_spans):
# Add span index to make labels unique
span['display_name'] = f"{span['name']} [{idx}]"
# Create colors based on span status and kind
colors = []
color_map = {} # Track which colors are assigned to which kinds
for span in processed_spans:
status = span['status']
kind = span['kind']
# Only show red for actual errors (ERROR status)
if status == 'ERROR':
color = '#DC143C' # Crimson for errors
else:
# Color by span kind (supports both OpenTelemetry and OpenInference)
if kind == 'SERVER':
color = '#2E8B57' # Sea Green
elif kind == 'CLIENT':
color = '#4169E1' # Royal Blue
elif kind == 'LLM':
color = '#9B59B6' # Purple for LLM calls
elif kind == 'TOOL':
color = '#E67E22' # Orange for Tool calls
elif kind == 'CHAIN':
color = '#3498DB' # Light Blue for Chains
elif kind == 'AGENT':
color = '#1ABC9C' # Turquoise for Agents
elif kind == 'RETRIEVER':
color = '#F39C12' # Yellow-Orange for Retrievers
elif kind == 'EMBEDDING':
color = '#8E44AD' # Dark Purple for Embeddings
else:
color = '#4682B4' # Steel Blue for INTERNAL/unknown
colors.append(color)
if kind not in color_map:
color_map[kind] = color
print(f"[DEBUG] Color assignments: {color_map}")
# Create the waterfall chart
fig = go.Figure()
# Prepare custom data for hover tooltips
customdata = []
for span in processed_spans:
# Build token info string
token_str = ""
if span['tokens']:
tokens = span['tokens']
if 'total_tokens' in tokens:
token_str = f"
Tokens: {tokens['total_tokens']}"
if 'prompt_tokens' in tokens and 'completion_tokens' in tokens:
token_str += f" (prompt: {tokens['prompt_tokens']}, completion: {tokens['completion_tokens']})"
elif 'prompt_tokens' in tokens or 'completion_tokens' in tokens:
parts = []
if 'prompt_tokens' in tokens:
parts.append(f"prompt: {tokens['prompt_tokens']}")
if 'completion_tokens' in tokens:
parts.append(f"completion: {tokens['completion_tokens']}")
token_str = f"
Tokens: {', '.join(parts)}"
# Build cost info string
cost_str = ""
if span['cost'] and 'total_cost' in span['cost']:
cost_str = f"
Cost: ${span['cost']['total_cost']:.6f}"
customdata.append([
span['name'],
span['kind'],
span['span_id'],
span['end_time'],
span['actual_duration'], # Show actual duration, not display duration
token_str,
cost_str
])
# Add bars for each span (use display_name for unique y-axis labels)
fig.add_trace(go.Bar(
y=[span['display_name'] for span in processed_spans],
x=[span['duration'] for span in processed_spans], # Display duration (min 0.1ms)
base=[span['start_time'] for span in processed_spans],
orientation='h',
marker_color=colors,
hovertemplate=(
"%{customdata[0]}
" +
"Type: %{customdata[1]}
" +
"Span ID: %{customdata[2]}
" +
"Duration: %{customdata[4]:.3f} ms
" + # Actual duration with 3 decimal places
"Start: %{base:.2f} ms
" +
"End: %{customdata[3]:.2f} ms" +
"%{customdata[5]}" + # Token info (already formatted)
"%{customdata[6]}" + # Cost info (already formatted)
"
{memory_percent:.1f}% of {memory_total:.0f} MiB
Power: {power:.1f} W