")
+ if last_div > start:
+ return content[start : last_div + 6] # Include the closing tag
+
+ # Look for code blocks
+ if "```html" in content and "```" in content:
+ start = content.find("```html") + 7
+ end = content.find("```", start)
+ if end > start:
+ return content[start:end].strip()
+
+ # Look for JSON with an "html" field
+ try:
+ parsed = json.loads(content)
+ if isinstance(parsed, dict) and "html" in parsed:
+ return parsed["html"]
+ except:
+ pass
+
+ # If all extraction attempts fail, return the original content
+ return content
+
+
+def safe_json_loads(json_str: Optional[str]) -> Dict[str, Any]:
+ """Safely parse JSON string.
+
+ Args:
+ json_str: JSON string to parse, can be None.
+
+ Returns:
+ Dict[str, Any]: Parsed JSON as dictionary or empty dict if parsing fails or input is None.
+ """
+ if json_str is None:
+ # Optionally, log a warning here if None input is unexpected for certain call sites
+ # logger.warning("safe_json_loads received None input.")
+ return {}
+ try:
+ return json.loads(json_str)
+ except (
+ JSONDecodeError,
+ TypeError,
+ ): # Catch TypeError if json_str is not a valid type for json.loads
+ # Optionally, log the error and the problematic string (or its beginning)
+ # logger.warning(f"Failed to decode JSON string: '{str(json_str)[:200]}...'", exc_info=True)
+ return {}
+
+
+def load_pipeline_config(config_path: str) -> Dict[str, Any]:
+ """Load pipeline configuration from YAML file.
+
+ This is used only for pipeline-level configuration, not for step parameters.
+ Step parameters should be defined directly in the step functions.
+
+ Args:
+ config_path: Path to the configuration YAML file
+
+ Returns:
+ Pipeline configuration dictionary
+ """
+ # Get absolute path if relative
+ if not os.path.isabs(config_path):
+ base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+ config_path = os.path.join(base_dir, config_path)
+
+ # Load YAML configuration
+ try:
+ with open(config_path, "r") as f:
+ config = yaml.safe_load(f)
+ return config
+ except Exception as e:
+ logger.error(f"Error loading pipeline configuration: {e}")
+ # Return a minimal default configuration in case of loading error
+ return {
+ "pipeline": {
+ "name": "deep_research_pipeline",
+ "enable_cache": True,
+ },
+ "environment": {
+ "docker": {
+ "requirements": [
+ "openai>=1.0.0",
+ "tavily-python>=0.2.8",
+ "PyYAML>=6.0",
+ "click>=8.0.0",
+ "pydantic>=2.0.0",
+ "typing_extensions>=4.0.0",
+ ]
+ }
+ },
+ "resources": {"cpu": 1, "memory": "4Gi"},
+ "timeout": 3600,
+ }
+
+
+def check_required_env_vars(env_vars: list[str]) -> list[str]:
+ """Check if required environment variables are set.
+
+ Args:
+ env_vars: List of environment variable names to check
+
+ Returns:
+ List of missing environment variables
+ """
+ missing_vars = []
+ for var in env_vars:
+ if not os.environ.get(var):
+ missing_vars.append(var)
+ return missing_vars
diff --git a/deep_research/utils/llm_utils.py b/deep_research/utils/llm_utils.py
new file mode 100644
index 00000000..1f4dc194
--- /dev/null
+++ b/deep_research/utils/llm_utils.py
@@ -0,0 +1,387 @@
+import contextlib
+import json
+import logging
+from typing import Any, Dict, List, Optional
+
+import litellm
+from litellm import completion
+from utils.helper_functions import (
+ clean_json_tags,
+ remove_reasoning_from_output,
+ safe_json_loads,
+)
+from utils.prompts import SYNTHESIS_PROMPT
+from zenml import get_step_context
+
+logger = logging.getLogger(__name__)
+
+# This module uses litellm for all LLM interactions
+# Models are specified with a provider prefix (e.g., "sambanova/DeepSeek-R1-Distill-Llama-70B")
+# ALL model names require a provider prefix (e.g., "sambanova/", "openai/", "anthropic/")
+
+litellm.callbacks = ["langfuse"]
+
+
+def run_llm_completion(
+ prompt: str,
+ system_prompt: str,
+ model: str = "sambanova/Llama-4-Maverick-17B-128E-Instruct",
+ clean_output: bool = True,
+ max_tokens: int = 2000, # Increased default token limit
+ temperature: float = 0.2,
+ top_p: float = 0.9,
+ project: str = "deep-research",
+ tags: Optional[List[str]] = None,
+) -> str:
+ """Run an LLM completion with standard error handling and output cleaning.
+
+ Uses litellm for model inference.
+
+ Args:
+ prompt: User prompt for the LLM
+ system_prompt: System prompt for the LLM
+ model: Model to use for completion (with provider prefix)
+ clean_output: Whether to clean reasoning and JSON tags from output. When True,
+ this removes any reasoning sections marked with tags and strips JSON
+ code block markers.
+ max_tokens: Maximum tokens to generate
+ temperature: Sampling temperature
+ top_p: Top-p sampling value
+ project: Langfuse project name for LLM tracking
+ tags: Optional list of tags for Langfuse tracking. If provided, also converted to trace_metadata format.
+
+ Returns:
+ str: Processed LLM output with optional cleaning applied
+ """
+ try:
+ # Ensure model name has provider prefix
+ if not any(
+ model.startswith(prefix + "/")
+ for prefix in [
+ "sambanova",
+ "openai",
+ "anthropic",
+ "meta",
+ "google",
+ "aws",
+ "openrouter",
+ ]
+ ):
+ # Raise an error if no provider prefix is specified
+ error_msg = f"Model '{model}' does not have a provider prefix. Please specify provider (e.g., 'sambanova/{model}')"
+ logger.error(error_msg)
+ raise ValueError(error_msg)
+
+ # Get pipeline run name and id for trace_name and trace_id if running in a step
+ trace_name = None
+ trace_id = None
+ with contextlib.suppress(RuntimeError):
+ context = get_step_context()
+ trace_name = context.pipeline_run.name
+ trace_id = str(context.pipeline_run.id)
+ # Build metadata dict
+ metadata = {"project": project}
+ if tags is not None:
+ metadata["tags"] = tags
+ # Convert tags to trace_metadata format
+ metadata["trace_metadata"] = {tag: True for tag in tags}
+ if trace_name:
+ metadata["trace_name"] = trace_name
+ if trace_id:
+ metadata["trace_id"] = trace_id
+
+ response = completion(
+ model=model,
+ messages=[
+ {"role": "system", "content": system_prompt},
+ {"role": "user", "content": prompt},
+ ],
+ max_tokens=max_tokens,
+ temperature=temperature,
+ top_p=top_p,
+ metadata=metadata,
+ )
+
+ # Defensive access to content
+ content = None
+ if response and response.choices and len(response.choices) > 0:
+ choice = response.choices[0]
+ if choice and choice.message:
+ content = choice.message.content
+
+ if content is None:
+ logger.warning("LLM response content is missing or empty.")
+ return ""
+
+ if clean_output:
+ content = remove_reasoning_from_output(content)
+ content = clean_json_tags(content)
+
+ return content
+ except Exception as e:
+ logger.error(f"Error in LLM completion: {e}")
+ return ""
+
+
+def get_structured_llm_output(
+ prompt: str,
+ system_prompt: str,
+ model: str = "sambanova/Llama-4-Maverick-17B-128E-Instruct",
+ fallback_response: Optional[Dict[str, Any]] = None,
+ max_tokens: int = 2000, # Increased default token limit for structured outputs
+ temperature: float = 0.2,
+ top_p: float = 0.9,
+ project: str = "deep-research",
+ tags: Optional[List[str]] = None,
+) -> Dict[str, Any]:
+ """Get structured JSON output from an LLM with error handling.
+
+ Uses litellm for model inference.
+
+ Args:
+ prompt: User prompt for the LLM
+ system_prompt: System prompt for the LLM
+ model: Model to use for completion (with provider prefix)
+ fallback_response: Fallback response if parsing fails
+ max_tokens: Maximum tokens to generate
+ temperature: Sampling temperature
+ top_p: Top-p sampling value
+ project: Langfuse project name for LLM tracking
+ tags: Optional list of tags for Langfuse tracking. Defaults to ["structured_llm_output"] if None.
+
+ Returns:
+ Parsed JSON response or fallback
+ """
+ try:
+ # Use provided tags or default to ["structured_llm_output"]
+ if tags is None:
+ tags = ["structured_llm_output"]
+
+ content = run_llm_completion(
+ prompt=prompt,
+ system_prompt=system_prompt,
+ model=model,
+ clean_output=True,
+ max_tokens=max_tokens,
+ temperature=temperature,
+ top_p=top_p,
+ project=project,
+ tags=tags,
+ )
+
+ if not content:
+ logger.warning("Empty content returned from LLM")
+ return fallback_response if fallback_response is not None else {}
+
+ result = safe_json_loads(content)
+
+ if not result and fallback_response is not None:
+ return fallback_response
+
+ return result
+ except Exception as e:
+ logger.error(f"Error processing structured LLM output: {e}")
+ return fallback_response if fallback_response is not None else {}
+
+
+def is_text_relevant(text1: str, text2: str, min_word_length: int = 4) -> bool:
+ """Determine if two pieces of text are relevant to each other.
+
+ Relevance is determined by checking if one text is contained within the other,
+ or if they share significant words (words longer than min_word_length).
+ This is a simple heuristic approach that checks for:
+ 1. Complete containment (one text string inside the other)
+ 2. Shared significant words (words longer than min_word_length)
+
+ Args:
+ text1: First text to compare
+ text2: Second text to compare
+ min_word_length: Minimum length of words to check for shared content
+
+ Returns:
+ bool: True if the texts are deemed relevant to each other based on the criteria
+ """
+ if not text1 or not text2:
+ return False
+
+ return (
+ text1.lower() in text2.lower()
+ or text2.lower() in text1.lower()
+ or any(
+ word
+ for word in text1.lower().split()
+ if len(word) > min_word_length and word in text2.lower()
+ )
+ )
+
+
+def find_most_relevant_string(
+ target: str,
+ options: List[str],
+ model: Optional[str] = "sambanova/Llama-4-Maverick-17B-128E-Instruct",
+ project: str = "deep-research",
+ tags: Optional[List[str]] = None,
+) -> Optional[str]:
+ """Find the most relevant string from a list of options using simple text matching.
+
+ If model is provided, uses litellm to determine relevance.
+
+ Args:
+ target: The target string to find relevance for
+ options: List of string options to check against
+ model: Model to use for matching (with provider prefix)
+ project: Langfuse project name for LLM tracking
+ tags: Optional list of tags for Langfuse tracking. Defaults to ["find_most_relevant_string"] if None.
+
+ Returns:
+ The most relevant string, or None if no relevant options
+ """
+ if not options:
+ return None
+
+ if len(options) == 1:
+ return options[0]
+
+ # If model is provided, use litellm for more accurate matching
+ if model:
+ try:
+ # Ensure model name has provider prefix
+ if not any(
+ model.startswith(prefix + "/")
+ for prefix in [
+ "sambanova",
+ "openai",
+ "anthropic",
+ "meta",
+ "google",
+ "aws",
+ "openrouter",
+ ]
+ ):
+ # Raise an error if no provider prefix is specified
+ error_msg = f"Model '{model}' does not have a provider prefix. Please specify provider (e.g., 'sambanova/{model}')"
+ logger.error(error_msg)
+ raise ValueError(error_msg)
+
+ system_prompt = "You are a research assistant."
+ prompt = f"""Given the text: "{target}"
+Which of the following options is most relevant to this text?
+{options}
+
+Respond with only the exact text of the most relevant option."""
+
+ # Get pipeline run name and id for trace_name and trace_id if running in a step
+ trace_name = None
+ trace_id = None
+ try:
+ context = get_step_context()
+ trace_name = context.pipeline_run.name
+ trace_id = str(context.pipeline_run.id)
+ except RuntimeError:
+ # Not running in a step context
+ pass
+
+ # Use provided tags or default to ["find_most_relevant_string"]
+ if tags is None:
+ tags = ["find_most_relevant_string"]
+
+ # Build metadata dict
+ metadata = {"project": project, "tags": tags}
+ # Convert tags to trace_metadata format
+ metadata["trace_metadata"] = {tag: True for tag in tags}
+ if trace_name:
+ metadata["trace_name"] = trace_name
+ if trace_id:
+ metadata["trace_id"] = trace_id
+
+ response = completion(
+ model=model,
+ messages=[
+ {"role": "system", "content": system_prompt},
+ {"role": "user", "content": prompt},
+ ],
+ max_tokens=100,
+ temperature=0.2,
+ metadata=metadata,
+ )
+
+ answer = response.choices[0].message.content.strip()
+
+ # Check if the answer is one of the options
+ if answer in options:
+ return answer
+
+ # If not an exact match, find the closest one
+ for option in options:
+ if option in answer or answer in option:
+ return option
+
+ except Exception as e:
+ logger.error(f"Error finding relevant string with LLM: {e}")
+
+ # Simple relevance check - find exact matches first
+ for option in options:
+ if target.lower() == option.lower():
+ return option
+
+ # Then check partial matches
+ for option in options:
+ if is_text_relevant(target, option):
+ return option
+
+ # Return the first option as a fallback
+ return options[0]
+
+
+def synthesize_information(
+ synthesis_input: Dict[str, Any],
+ model: str = "sambanova/Llama-4-Maverick-17B-128E-Instruct",
+ system_prompt: Optional[str] = None,
+ project: str = "deep-research",
+ tags: Optional[List[str]] = None,
+) -> Dict[str, Any]:
+ """Synthesize information from search results for a sub-question.
+
+ Uses litellm for model inference.
+
+ Args:
+ synthesis_input: Dictionary with sub-question, search results, and sources
+ model: Model to use (with provider prefix)
+ system_prompt: System prompt for the LLM
+ project: Langfuse project name for LLM tracking
+ tags: Optional list of tags for Langfuse tracking. Defaults to ["information_synthesis"] if None.
+
+ Returns:
+ Dictionary with synthesized information
+ """
+ if system_prompt is None:
+ system_prompt = SYNTHESIS_PROMPT
+
+ sub_question_for_log = synthesis_input.get(
+ "sub_question", "unknown question"
+ )
+
+ # Define the fallback response
+ fallback_response = {
+ "synthesized_answer": f"Synthesis failed for '{sub_question_for_log}'.",
+ "key_sources": synthesis_input.get("sources", [])[:1],
+ "confidence_level": "low",
+ "information_gaps": "An error occurred during the synthesis process.",
+ }
+
+ # Use provided tags or default to ["information_synthesis"]
+ if tags is None:
+ tags = ["information_synthesis"]
+
+ # Use the utility function to get structured output
+ result = get_structured_llm_output(
+ prompt=json.dumps(synthesis_input),
+ system_prompt=system_prompt,
+ model=model,
+ fallback_response=fallback_response,
+ max_tokens=3000, # Increased for more detailed synthesis
+ project=project,
+ tags=tags,
+ )
+
+ return result
diff --git a/deep_research/utils/prompt_loader.py b/deep_research/utils/prompt_loader.py
new file mode 100644
index 00000000..c2b3d2f7
--- /dev/null
+++ b/deep_research/utils/prompt_loader.py
@@ -0,0 +1,136 @@
+"""Utility functions for loading prompts into the PromptsBundle model.
+
+This module provides functions to create PromptsBundle instances from
+the existing prompt definitions in prompts.py.
+"""
+
+from utils import prompts
+from utils.prompt_models import PromptsBundle, PromptTemplate
+
+
+def load_prompts_bundle(pipeline_version: str = "1.2.0") -> PromptsBundle:
+ """Load all prompts from prompts.py into a PromptsBundle.
+
+ Args:
+ pipeline_version: Version of the pipeline using these prompts
+
+ Returns:
+ PromptsBundle containing all prompts
+ """
+ # Create PromptTemplate instances for each prompt
+ search_query_prompt = PromptTemplate(
+ name="search_query_prompt",
+ content=prompts.DEFAULT_SEARCH_QUERY_PROMPT,
+ description="Generates effective search queries from sub-questions",
+ version="1.0.0",
+ tags=["search", "query", "information-gathering"],
+ )
+
+ query_decomposition_prompt = PromptTemplate(
+ name="query_decomposition_prompt",
+ content=prompts.QUERY_DECOMPOSITION_PROMPT,
+ description="Breaks down complex research queries into specific sub-questions",
+ version="1.0.0",
+ tags=["analysis", "decomposition", "planning"],
+ )
+
+ synthesis_prompt = PromptTemplate(
+ name="synthesis_prompt",
+ content=prompts.SYNTHESIS_PROMPT,
+ description="Synthesizes search results into comprehensive answers for sub-questions",
+ version="1.1.0",
+ tags=["synthesis", "integration", "analysis"],
+ )
+
+ viewpoint_analysis_prompt = PromptTemplate(
+ name="viewpoint_analysis_prompt",
+ content=prompts.VIEWPOINT_ANALYSIS_PROMPT,
+ description="Analyzes synthesized answers across different perspectives and viewpoints",
+ version="1.1.0",
+ tags=["analysis", "viewpoint", "perspective"],
+ )
+
+ reflection_prompt = PromptTemplate(
+ name="reflection_prompt",
+ content=prompts.REFLECTION_PROMPT,
+ description="Evaluates research and identifies gaps, biases, and areas for improvement",
+ version="1.0.0",
+ tags=["reflection", "critique", "improvement"],
+ )
+
+ additional_synthesis_prompt = PromptTemplate(
+ name="additional_synthesis_prompt",
+ content=prompts.ADDITIONAL_SYNTHESIS_PROMPT,
+ description="Enhances original synthesis with new information and addresses critique points",
+ version="1.1.0",
+ tags=["synthesis", "enhancement", "integration"],
+ )
+
+ conclusion_generation_prompt = PromptTemplate(
+ name="conclusion_generation_prompt",
+ content=prompts.CONCLUSION_GENERATION_PROMPT,
+ description="Synthesizes all research findings into a comprehensive conclusion",
+ version="1.0.0",
+ tags=["report", "conclusion", "synthesis"],
+ )
+
+ executive_summary_prompt = PromptTemplate(
+ name="executive_summary_prompt",
+ content=prompts.EXECUTIVE_SUMMARY_GENERATION_PROMPT,
+ description="Creates a compelling, insight-driven executive summary",
+ version="1.1.0",
+ tags=["report", "summary", "insights"],
+ )
+
+ introduction_prompt = PromptTemplate(
+ name="introduction_prompt",
+ content=prompts.INTRODUCTION_GENERATION_PROMPT,
+ description="Creates a contextual, engaging introduction",
+ version="1.1.0",
+ tags=["report", "introduction", "context"],
+ )
+
+ # Create and return the bundle
+ return PromptsBundle(
+ search_query_prompt=search_query_prompt,
+ query_decomposition_prompt=query_decomposition_prompt,
+ synthesis_prompt=synthesis_prompt,
+ viewpoint_analysis_prompt=viewpoint_analysis_prompt,
+ reflection_prompt=reflection_prompt,
+ additional_synthesis_prompt=additional_synthesis_prompt,
+ conclusion_generation_prompt=conclusion_generation_prompt,
+ executive_summary_prompt=executive_summary_prompt,
+ introduction_prompt=introduction_prompt,
+ pipeline_version=pipeline_version,
+ )
+
+
+def get_prompt_for_step(bundle: PromptsBundle, step_name: str) -> str:
+ """Get the appropriate prompt content for a specific step.
+
+ Args:
+ bundle: The PromptsBundle containing all prompts
+ step_name: Name of the step requesting the prompt
+
+ Returns:
+ The prompt content string
+
+ Raises:
+ ValueError: If no prompt mapping exists for the step
+ """
+ # Map step names to prompt attributes
+ step_to_prompt_mapping = {
+ "query_decomposition": "query_decomposition_prompt",
+ "search_query_generation": "search_query_prompt",
+ "synthesis": "synthesis_prompt",
+ "viewpoint_analysis": "viewpoint_analysis_prompt",
+ "reflection": "reflection_prompt",
+ "additional_synthesis": "additional_synthesis_prompt",
+ "conclusion_generation": "conclusion_generation_prompt",
+ }
+
+ prompt_attr = step_to_prompt_mapping.get(step_name)
+ if not prompt_attr:
+ raise ValueError(f"No prompt mapping found for step: {step_name}")
+
+ return bundle.get_prompt_content(prompt_attr)
diff --git a/deep_research/utils/prompt_models.py b/deep_research/utils/prompt_models.py
new file mode 100644
index 00000000..c96a7bd0
--- /dev/null
+++ b/deep_research/utils/prompt_models.py
@@ -0,0 +1,123 @@
+"""Pydantic models for prompt tracking and management.
+
+This module contains models for bundling prompts as trackable artifacts
+in the ZenML pipeline, enabling better observability and version control.
+"""
+
+from datetime import datetime
+from typing import Dict, Optional
+
+from pydantic import BaseModel, Field
+
+
+class PromptTemplate(BaseModel):
+ """Represents a single prompt template with metadata."""
+
+ name: str = Field(..., description="Unique identifier for the prompt")
+ content: str = Field(..., description="The actual prompt template content")
+ description: str = Field(
+ "", description="Human-readable description of what this prompt does"
+ )
+ version: str = Field("1.0.0", description="Version of the prompt template")
+ tags: list[str] = Field(
+ default_factory=list, description="Tags for categorizing prompts"
+ )
+
+ model_config = {
+ "extra": "ignore",
+ "frozen": False,
+ "validate_assignment": True,
+ }
+
+
+class PromptsBundle(BaseModel):
+ """Bundle of all prompts used in the research pipeline.
+
+ This model serves as a single artifact that contains all prompts,
+ making them trackable, versionable, and visualizable in the ZenML dashboard.
+ """
+
+ # Core prompts used in the pipeline
+ search_query_prompt: PromptTemplate
+ query_decomposition_prompt: PromptTemplate
+ synthesis_prompt: PromptTemplate
+ viewpoint_analysis_prompt: PromptTemplate
+ reflection_prompt: PromptTemplate
+ additional_synthesis_prompt: PromptTemplate
+ conclusion_generation_prompt: PromptTemplate
+ executive_summary_prompt: PromptTemplate
+ introduction_prompt: PromptTemplate
+
+ # Metadata
+ pipeline_version: str = Field(
+ "1.0.0", description="Version of the pipeline using these prompts"
+ )
+ created_at: str = Field(
+ default_factory=lambda: datetime.now().isoformat(),
+ description="Timestamp when this bundle was created",
+ )
+
+ # Additional prompts can be stored here
+ custom_prompts: Dict[str, PromptTemplate] = Field(
+ default_factory=dict,
+ description="Additional custom prompts not part of the core set",
+ )
+
+ model_config = {
+ "extra": "ignore",
+ "frozen": False,
+ "validate_assignment": True,
+ }
+
+ def get_prompt_by_name(self, name: str) -> Optional[PromptTemplate]:
+ """Retrieve a prompt by its name.
+
+ Args:
+ name: Name of the prompt to retrieve
+
+ Returns:
+ PromptTemplate if found, None otherwise
+ """
+ # Check core prompts
+ for field_name, field_value in self.__dict__.items():
+ if (
+ isinstance(field_value, PromptTemplate)
+ and field_value.name == name
+ ):
+ return field_value
+
+ # Check custom prompts
+ return self.custom_prompts.get(name)
+
+ def list_all_prompts(self) -> Dict[str, PromptTemplate]:
+ """Get all prompts as a dictionary.
+
+ Returns:
+ Dictionary mapping prompt names to PromptTemplate objects
+ """
+ all_prompts = {}
+
+ # Add core prompts
+ for field_name, field_value in self.__dict__.items():
+ if isinstance(field_value, PromptTemplate):
+ all_prompts[field_value.name] = field_value
+
+ # Add custom prompts
+ all_prompts.update(self.custom_prompts)
+
+ return all_prompts
+
+ def get_prompt_content(self, prompt_type: str) -> str:
+ """Get the content of a specific prompt by its type.
+
+ Args:
+ prompt_type: Type of prompt (e.g., 'search_query_prompt', 'synthesis_prompt')
+
+ Returns:
+ The prompt content string
+
+ Raises:
+ AttributeError: If prompt type doesn't exist
+ """
+ prompt = getattr(self, prompt_type)
+ return prompt.content
diff --git a/deep_research/utils/prompts.py b/deep_research/utils/prompts.py
new file mode 100644
index 00000000..e072b238
--- /dev/null
+++ b/deep_research/utils/prompts.py
@@ -0,0 +1,1605 @@
+"""
+Centralized collection of prompts used throughout the deep research pipeline.
+
+This module contains all system prompts used by LLM calls in various steps of the
+research pipeline to ensure consistency and make prompt management easier.
+"""
+
+# Search query generation prompt
+# Used to generate effective search queries from sub-questions
+DEFAULT_SEARCH_QUERY_PROMPT = """
+You are a Deep Research assistant. Given a specific research sub-question, your task is to formulate an effective search
+query that will help find relevant information to answer the question.
+
+A good search query should:
+1. Extract the key concepts from the sub-question
+2. Use precise, specific terminology
+3. Exclude unnecessary words or context
+4. Include alternative terms or synonyms when helpful
+5. Be concise yet comprehensive enough to find relevant results
+
+Format the output in json with the following json schema definition:
+
+
+
+Make sure that the output is a json object with an output json schema defined above.
+Only return the json object, no explanation or additional text.
+"""
+
+# Query decomposition prompt
+# Used to break down complex research queries into specific sub-questions
+QUERY_DECOMPOSITION_PROMPT = """
+You are a Deep Research assistant specializing in research design. You will be given a MAIN RESEARCH QUERY that needs to be explored comprehensively. Your task is to create diverse, insightful sub-questions that explore different dimensions of the topic.
+
+IMPORTANT: The main query should be interpreted as a single research question, not as a noun phrase. For example:
+- If the query is "Is LLMOps a subset of MLOps?", create questions ABOUT LLMOps and MLOps, not questions like "What is 'Is LLMOps a subset of MLOps?'"
+- Focus on the concepts, relationships, and implications within the query
+
+Create sub-questions that explore these DIFFERENT DIMENSIONS:
+
+1. **Definitional/Conceptual**: Define key terms and establish conceptual boundaries
+ Example: "What are the core components and characteristics of LLMOps?"
+
+2. **Comparative/Relational**: Compare and contrast the concepts mentioned
+ Example: "How do the workflows and tooling of LLMOps differ from traditional MLOps?"
+
+3. **Historical/Evolutionary**: Trace development and emergence
+ Example: "How did LLMOps emerge from MLOps practices?"
+
+4. **Structural/Technical**: Examine technical architecture and implementation
+ Example: "What specific tools and platforms are unique to LLMOps?"
+
+5. **Practical/Use Cases**: Explore real-world applications
+ Example: "What are the key use cases that require LLMOps but not traditional MLOps?"
+
+6. **Stakeholder/Industry**: Consider different perspectives and adoption
+ Example: "How are different industries adopting LLMOps vs MLOps?"
+
+7. **Challenges/Limitations**: Identify problems and constraints
+ Example: "What unique challenges does LLMOps face that MLOps doesn't?"
+
+8. **Future/Trends**: Look at emerging developments
+ Example: "How is the relationship between LLMOps and MLOps expected to evolve?"
+
+QUALITY GUIDELINES:
+- Each sub-question must explore a DIFFERENT dimension - no repetitive variations
+- Questions should be specific, concrete, and investigable
+- Mix descriptive ("what/who") with analytical ("why/how") questions
+- Ensure questions build toward answering the main query comprehensively
+- Frame questions to elicit detailed, nuanced responses
+- Consider technical, business, organizational, and strategic aspects
+
+Format the output in json with the following json schema definition:
+
+
+
+Make sure that the output is a json object with an output json schema defined above.
+Only return the json object, no explanation or additional text.
+"""
+
+# Synthesis prompt for individual sub-questions
+# Used to synthesize search results into comprehensive answers for sub-questions
+SYNTHESIS_PROMPT = """
+You are a Deep Research assistant specializing in information synthesis. Given a sub-question and search results, your task is to synthesize the information
+into a comprehensive, accurate, and well-structured answer.
+
+Your synthesis should:
+1. Begin with a direct, concise answer to the sub-question in the first paragraph
+2. Provide detailed evidence and explanation in subsequent paragraphs (at least 3-5 paragraphs total)
+3. Integrate information from multiple sources, citing them within your answer
+4. Acknowledge any conflicting information or contrasting viewpoints you encounter
+5. Use data, statistics, examples, and quotations when available to strengthen your answer
+6. Organize information logically with a clear flow between concepts
+7. Identify key sources that provided the most valuable information (at least 2-3 sources)
+8. Explicitly acknowledge information gaps where the search results were incomplete
+9. Write in plain text format - do NOT use markdown formatting, bullet points, or special characters
+
+Confidence level criteria:
+- HIGH: Multiple high-quality sources provide consistent information, comprehensive coverage of the topic, and few information gaps
+- MEDIUM: Decent sources with some consistency, but notable information gaps or some conflicting information
+- LOW: Limited sources, major information gaps, significant contradictions, or only tangentially relevant information
+
+Information gaps should specifically identify:
+1. Aspects of the question that weren't addressed in the search results
+2. Areas where more detailed or up-to-date information would be valuable
+3. Perspectives or data sources that would complement the existing information
+
+Format the output in json with the following json schema definition:
+
+
+
+Make sure that the output is a json object with an output json schema defined above.
+Only return the json object, no explanation or additional text.
+"""
+
+# Viewpoint analysis prompt for cross-perspective examination
+# Used to analyze synthesized answers across different perspectives and viewpoints
+VIEWPOINT_ANALYSIS_PROMPT = """
+You are a Deep Research assistant specializing in multi-perspective analysis. You will be given a set of synthesized answers
+to sub-questions related to a main research query. Your task is to perform a thorough, nuanced analysis of how different
+perspectives would interpret this information.
+
+Think deeply about the following viewpoint categories and how they would approach the information differently:
+- Scientific: Evidence-based, empirical approach focused on data, research findings, and methodological rigor
+- Political: Power dynamics, governance structures, policy implications, and ideological frameworks
+- Economic: Resource allocation, financial impacts, market dynamics, and incentive structures
+- Social: Cultural norms, community impacts, group dynamics, and public welfare
+- Ethical: Moral principles, values considerations, rights and responsibilities, and normative judgments
+- Historical: Long-term patterns, precedents, contextual development, and evolutionary change
+
+For each synthesized answer, analyze how these different perspectives would interpret the information by:
+
+1. Identifying 5-8 main points of agreement where multiple perspectives align (with specific examples)
+2. Analyzing at least 3-5 areas of tension between perspectives with:
+ - A clear topic title for each tension point
+ - Contrasting interpretations from at least 2-3 different viewpoint categories per tension
+ - Specific examples or evidence showing why these perspectives differ
+ - The nuanced positions of each perspective, not just simplified oppositions
+
+3. Thoroughly examining perspective gaps by identifying:
+ - Which perspectives are underrepresented or missing in the current research
+ - How including these missing perspectives would enrich understanding
+ - Specific questions or dimensions that remain unexplored
+ - Write in plain text format - do NOT use markdown formatting, bullet points, or special characters
+
+4. Developing integrative insights that:
+ - Synthesize across multiple perspectives to form a more complete understanding
+ - Highlight how seemingly contradictory viewpoints can complement each other
+ - Suggest frameworks for reconciling tensions or finding middle-ground approaches
+ - Identify actionable takeaways that incorporate multiple perspectives
+ - Write in plain text format - do NOT use markdown formatting, bullet points, or special characters
+
+Format the output in json with the following json schema definition:
+
+
+
+Make sure that the output is a json object with an output json schema defined above.
+Only return the json object, no explanation or additional text.
+"""
+
+# Reflection prompt for self-critique and improvement
+# Used to evaluate the research and identify gaps, biases, and areas for improvement
+REFLECTION_PROMPT = """
+You are a Deep Research assistant with the ability to critique and improve your own research. You will be given:
+1. The main research query
+2. The sub-questions explored so far
+3. The synthesized information for each sub-question
+4. Any viewpoint analysis performed
+
+Your task is to critically evaluate this research and identify:
+1. Areas where the research is incomplete or has gaps
+2. Questions that are important but not yet answered
+3. Aspects where additional evidence or depth would significantly improve the research
+4. Potential biases or limitations in the current findings
+
+Be constructively critical and identify the most important improvements that would substantially enhance the research.
+
+Format the output in json with the following json schema definition:
+
+
+
+Make sure that the output is a json object with an output json schema defined above.
+Only return the json object, no explanation or additional text.
+"""
+
+# Additional synthesis prompt for incorporating new information
+# Used to enhance original synthesis with new information and address critique points
+ADDITIONAL_SYNTHESIS_PROMPT = """
+You are a Deep Research assistant. You will be given:
+1. The original synthesized information on a research topic
+2. New information from additional research
+3. A critique of the original synthesis
+
+Your task is to enhance the original synthesis by incorporating the new information and addressing the critique.
+The updated synthesis should:
+1. Integrate new information seamlessly
+2. Address gaps identified in the critique
+3. Maintain a balanced, comprehensive, and accurate representation
+4. Preserve the strengths of the original synthesis
+5. Write in plain text format - do NOT use markdown formatting, bullet points, or special characters
+
+Format the output in json with the following json schema definition:
+
+
+
+Make sure that the output is a json object with an output json schema defined above.
+Only return the json object, no explanation or additional text.
+"""
+
+# Final report generation prompt
+# Used to compile a comprehensive HTML research report from all synthesized information
+REPORT_GENERATION_PROMPT = """
+You are a Deep Research assistant responsible for compiling an in-depth, comprehensive research report. You will be given:
+1. The original research query
+2. The sub-questions that were explored
+3. Synthesized information for each sub-question
+4. Viewpoint analysis comparing different perspectives (if available)
+5. Reflection metadata highlighting improvements and limitations
+
+Your task is to create a well-structured, coherent, professional-quality research report with the following features:
+
+EXECUTIVE SUMMARY (250-400 words):
+- Begin with a compelling, substantive executive summary that provides genuine insight
+- Highlight 3-5 key findings or insights that represent the most important discoveries
+- Include brief mention of methodology and limitations
+- Make the summary self-contained so it can be read independently of the full report
+- End with 1-2 sentences on broader implications or applications of the research
+
+INTRODUCTION (200-300 words):
+- Provide relevant background context on the main research query
+- Explain why this topic is significant or worth investigating
+- Outline the methodological approach used (sub-questions, search strategy, synthesis)
+- Preview the overall structure of the report
+
+SUB-QUESTION SECTIONS:
+- For each sub-question, create a dedicated section with:
+ * A descriptive section title (not just repeating the sub-question)
+ * A brief (1 paragraph) overview of key findings for this sub-question
+ * A "Key Findings" box highlighting 3-4 important discoveries for scannable reading
+ * The detailed, synthesized answer with appropriate paragraph breaks, lists, and formatting
+ * Proper citation of sources within the text (e.g., "According to [Source Name]...")
+ * Clear confidence indicator with appropriate styling
+ * Information gaps clearly identified in their own subsection
+ * Complete list of key sources used
+
+VIEWPOINT ANALYSIS SECTION (if available):
+- Create a detailed section that:
+ * Explains the purpose and value of multi-perspective analysis
+ * Presents points of agreement as actionable insights, not just observations
+ * Structures tension areas with clear topic headings and balanced presentation of viewpoints
+ * Uses visual elements (different background colors, icons) to distinguish different perspectives
+ * Integrates perspective gaps and insights into a cohesive narrative
+
+CONCLUSION (300-400 words):
+- Synthesize the overall findings, not just summarizing each section
+- Connect insights from different sub-questions to form higher-level understanding
+- Address the main research query directly with evidence-based conclusions
+- Acknowledge remaining uncertainties and suggestions for further research
+- End with implications or applications of the research findings
+
+OVERALL QUALITY REQUIREMENTS:
+1. Create visually scannable content with clear headings, bullet points, and short paragraphs
+2. Use semantic HTML (h1, h2, h3, p, blockquote, etc.) to create proper document structure
+3. Include a comprehensive table of contents with anchor links to all major sections
+4. Format all sources consistently in the references section with proper linking when available
+5. Use tables, lists, and blockquotes to improve readability and highlight important information
+6. Apply appropriate styling for different confidence levels (high, medium, low)
+7. Ensure proper HTML nesting and structure throughout the document
+8. Balance sufficient detail with clarity and conciseness
+9. Make all text directly actionable and insight-driven, not just descriptive
+
+The report should be formatted in HTML with appropriate headings, paragraphs, citations, and formatting.
+Use semantic HTML (h1, h2, h3, p, blockquote, etc.) to create a structured document.
+Include a table of contents at the beginning with anchor links to each section.
+For citations, use a consistent format and collect them in a references section at the end.
+
+Include this exact CSS stylesheet in your HTML to ensure consistent styling (do not modify it):
+
+```css
+
+```
+
+The HTML structure should follow this pattern:
+
+```html
+
+
+
+
+
+ [CSS STYLESHEET GOES HERE]
+
+
+
+
Research Report: [Main Query]
+
+
+
+
+
+
+
Executive Summary
+ [CONCISE SUMMARY OF KEY FINDINGS]
+
+
+
+
+
Introduction
+
[INTRODUCTION TO THE RESEARCH QUERY]
+
[OVERVIEW OF THE APPROACH AND SUB-QUESTIONS]
+
+
+
+ [FOR EACH SUB-QUESTION]:
+
+
[INDEX]. [SUB-QUESTION TEXT]
+
Confidence Level: [LEVEL]
+
+
+
+
Key Findings
+
+ - [KEY FINDING 1]
+ - [KEY FINDING 2]
+ [...]
+
+
+
+
+ [DETAILED ANSWER]
+
+
+
+
+
+
+
+
Key Sources
+
+ - [SOURCE 1]
+ - [SOURCE 2]
+ [...]
+
+
+
+
+
+
+
Viewpoint Analysis
+
+
Points of Agreement
+
+
+ - [AGREEMENT 1]
+ - [AGREEMENT 2]
+ [...]
+
+
+
+
Areas of Tension
+ [FOR EACH TENSION]:
+
+
[TENSION TOPIC]
+
+ - [VIEWPOINT 1 TITLE]
+ - [VIEWPOINT 1 CONTENT]
+ - [VIEWPOINT 2 TITLE]
+ - [VIEWPOINT 2 CONTENT]
+ [...]
+
+
+
+
Perspective Gaps
+
[PERSPECTIVE GAPS CONTENT]
+
+
Integrative Insights
+
[INTEGRATIVE INSIGHTS CONTENT]
+
+
+
+
+
Conclusion
+
[CONCLUSION TEXT]
+
+
+
+
+
References
+
+ - [REFERENCE 1]
+ - [REFERENCE 2]
+ [...]
+
+
+
+
+
+```
+
+Special instructions:
+1. For each sub-question, display the confidence level with appropriate styling (confidence-high, confidence-medium, or confidence-low)
+2. Extract 2-3 key findings from each answer to create the key-findings box
+3. Format all sources consistently in the references section
+4. Use tables, lists, and blockquotes where appropriate to improve readability
+5. Use the notice classes (info, warning) to highlight important information or limitations
+6. Ensure all sections have proper ID attributes for the table of contents links
+
+Return only the complete HTML code for the report, with no explanations or additional text.
+"""
+
+# Static HTML template for direct report generation without LLM
+STATIC_HTML_TEMPLATE = """
+
+
+
+
+
Research Report: {main_query}
+
+
+
+
+
Research Report: {main_query}
+
+
+
+
+
+
+
Executive Summary
+
{executive_summary}
+
+
+
+
+
Introduction
+ {introduction_html}
+
+
+
+ {sub_questions_html}
+
+
+ {viewpoint_analysis_html}
+
+
+
+
Conclusion
+ {conclusion_html}
+
+
+
+
+
References
+ {references_html}
+
+
+
+
+"""
+
+# Template for sub-question section in the static HTML report
+SUB_QUESTION_TEMPLATE = """
+
+
+
+
+
+ {info_gaps_html}
+
+ {key_sources_html}
+
+"""
+
+# Template for viewpoint analysis section in the static HTML report
+VIEWPOINT_ANALYSIS_TEMPLATE = """
+
+
Viewpoint Analysis
+
+
+
🤝 Points of Agreement
+
+
+
+
+
⚖️ Areas of Tension
+
+ {tensions_html}
+
+
+
+
+
+
+
💡 Integrative Insights
+
+
{integrative_insights}
+
+
+
+"""
+
+# Executive Summary generation prompt
+# Used to create a compelling, insight-driven executive summary
+EXECUTIVE_SUMMARY_GENERATION_PROMPT = """
+You are a Deep Research assistant specializing in creating executive summaries. Given comprehensive research findings, your task is to create a compelling executive summary that captures the essence of the research and its key insights.
+
+Your executive summary should:
+
+1. **Opening Statement (1-2 sentences):**
+ - Start with a powerful, direct answer to the main research question
+ - Make it clear and definitive based on the evidence gathered
+
+2. **Key Findings (3-5 bullet points):**
+ - Extract the MOST IMPORTANT discoveries from across all sub-questions
+ - Focus on insights that are surprising, actionable, or paradigm-shifting
+ - Each finding should be specific and evidence-based, not generic
+ - Prioritize findings that directly address the main query
+
+3. **Critical Insights (2-3 sentences):**
+ - Synthesize patterns or themes that emerged across multiple sub-questions
+ - Highlight any unexpected discoveries or counter-intuitive findings
+ - Connect disparate findings to reveal higher-level understanding
+
+4. **Implications (2-3 sentences):**
+ - What do these findings mean for practitioners/stakeholders?
+ - What actions or decisions can be made based on this research?
+ - Why should the reader care about these findings?
+
+5. **Confidence and Limitations (1-2 sentences):**
+ - Briefly acknowledge the overall confidence level of the findings
+ - Note any significant gaps or areas requiring further investigation
+
+IMPORTANT GUIDELINES:
+- Be CONCISE but INSIGHTFUL - every sentence should add value
+- Use active voice and strong, definitive language where evidence supports it
+- Avoid generic statements - be specific to the actual research findings
+- Lead with the most important information
+- Make it self-contained - reader should understand key findings without reading the full report
+- Target length: 250-400 words
+
+Format as well-structured HTML paragraphs using
tags and
/- for bullet points.
+"""
+
+# Introduction generation prompt
+# Used to create a contextual, engaging introduction
+INTRODUCTION_GENERATION_PROMPT = """
+You are a Deep Research assistant specializing in creating engaging introductions. Given a research query and the sub-questions explored, your task is to create an introduction that provides context and sets up the reader's expectations.
+
+Your introduction should:
+
+1. **Context and Relevance (2-3 sentences):**
+ - Why is this research question important NOW?
+ - What makes this topic significant or worth investigating?
+ - Connect to current trends, debates, or challenges in the field
+
+2. **Scope and Approach (2-3 sentences):**
+ - What specific aspects of the topic does this research explore?
+ - Briefly mention the key dimensions covered (based on sub-questions)
+ - Explain the systematic approach without being too technical
+
+3. **What to Expect (2-3 sentences):**
+ - Preview the structure of the report
+ - Hint at some of the interesting findings or tensions discovered
+ - Set expectations about the depth and breadth of analysis
+
+IMPORTANT GUIDELINES:
+- Make it engaging - hook the reader's interest from the start
+- Provide real context, not generic statements
+- Connect to why this matters for the reader
+- Keep it concise but informative (200-300 words)
+- Use active voice and clear language
+- Build anticipation for the findings without giving everything away
+
+Format as well-structured HTML paragraphs using
tags. Do NOT include any headings or section titles.
+"""
+
+# Conclusion generation prompt
+# Used to synthesize all research findings into a comprehensive conclusion
+CONCLUSION_GENERATION_PROMPT = """
+You are a Deep Research assistant specializing in synthesizing comprehensive research conclusions. Given all the research findings from a deep research study, your task is to create a thoughtful, evidence-based conclusion that ties together the overall findings.
+
+Your conclusion should:
+
+1. **Synthesis and Integration (150-200 words):**
+ - Connect insights from different sub-questions to form a higher-level understanding
+ - Identify overarching themes and patterns that emerge from the research
+ - Highlight how different findings relate to and support each other
+ - Avoid simply summarizing each section separately
+
+2. **Direct Response to Main Query (100-150 words):**
+ - Address the original research question directly with evidence-based conclusions
+ - State what the research definitively established vs. what remains uncertain
+ - Provide a clear, actionable answer based on the synthesized evidence
+
+3. **Limitations and Future Directions (100-120 words):**
+ - Acknowledge remaining uncertainties and information gaps across all sections
+ - Suggest specific areas where additional research would be most valuable
+ - Identify what types of evidence or perspectives would strengthen the findings
+
+4. **Implications and Applications (80-100 words):**
+ - Explain the practical significance of the research findings
+ - Suggest how the insights might be applied or what they mean for stakeholders
+ - Connect findings to broader contexts or implications
+
+Format your output as a well-structured conclusion section in HTML format with appropriate paragraph breaks and formatting. Use
tags for paragraphs and organize the content logically with clear transitions between the different aspects outlined above.
+
+IMPORTANT: Do NOT include any headings like "Conclusion",
, or tags - the section already has a heading. Start directly with the conclusion content in paragraph form. Just create flowing, well-structured paragraphs that cover all four aspects naturally.
+
+Ensure the conclusion feels cohesive and draws meaningful connections between findings rather than just listing them sequentially.
+"""
diff --git a/deep_research/utils/pydantic_models.py b/deep_research/utils/pydantic_models.py
new file mode 100644
index 00000000..9fca23a3
--- /dev/null
+++ b/deep_research/utils/pydantic_models.py
@@ -0,0 +1,300 @@
+"""Pydantic model definitions for the research pipeline.
+
+This module contains all the Pydantic models that represent the state of the research
+pipeline. These models replace the previous dataclasses implementation and leverage
+Pydantic's validation, serialization, and integration with ZenML.
+"""
+
+import time
+from typing import Any, Dict, List, Optional
+
+from pydantic import BaseModel, Field
+from typing_extensions import Literal
+
+
+class SearchResult(BaseModel):
+ """Represents a search result for a sub-question."""
+
+ url: str = ""
+ content: str = ""
+ title: str = ""
+ snippet: str = ""
+ metadata: Optional[Dict[str, Any]] = Field(default_factory=dict)
+
+ model_config = {
+ "extra": "ignore", # Ignore extra fields during deserialization
+ "frozen": False, # Allow attribute updates
+ "validate_assignment": True, # Validate when attributes are set
+ }
+
+
+class ViewpointTension(BaseModel):
+ """Represents a tension between different viewpoints on a topic."""
+
+ topic: str = ""
+ viewpoints: Dict[str, str] = Field(default_factory=dict)
+
+ model_config = {
+ "extra": "ignore",
+ "frozen": False,
+ "validate_assignment": True,
+ }
+
+
+class SynthesizedInfo(BaseModel):
+ """Represents synthesized information for a sub-question."""
+
+ synthesized_answer: str = ""
+ key_sources: List[str] = Field(default_factory=list)
+ confidence_level: Literal["high", "medium", "low"] = "medium"
+ information_gaps: str = ""
+ improvements: List[str] = Field(default_factory=list)
+
+ model_config = {
+ "extra": "ignore",
+ "frozen": False,
+ "validate_assignment": True,
+ }
+
+
+class ViewpointAnalysis(BaseModel):
+ """Represents the analysis of different viewpoints on the research topic."""
+
+ main_points_of_agreement: List[str] = Field(default_factory=list)
+ areas_of_tension: List[ViewpointTension] = Field(default_factory=list)
+ perspective_gaps: str = ""
+ integrative_insights: str = ""
+
+ model_config = {
+ "extra": "ignore",
+ "frozen": False,
+ "validate_assignment": True,
+ }
+
+
+class ReflectionMetadata(BaseModel):
+ """Metadata about the reflection process."""
+
+ critique_summary: List[str] = Field(default_factory=list)
+ additional_questions_identified: List[str] = Field(default_factory=list)
+ searches_performed: List[str] = Field(default_factory=list)
+ improvements_made: float = Field(
+ default=0
+ ) # Changed from int to float to handle timestamp values
+ error: Optional[str] = None
+
+ model_config = {
+ "extra": "ignore",
+ "frozen": False,
+ "validate_assignment": True,
+ }
+
+
+class ResearchState(BaseModel):
+ """Comprehensive state object for the enhanced research pipeline."""
+
+ # Initial query information
+ main_query: str = ""
+ sub_questions: List[str] = Field(default_factory=list)
+
+ # Information gathering results
+ search_results: Dict[str, List[SearchResult]] = Field(default_factory=dict)
+
+ # Synthesized information
+ synthesized_info: Dict[str, SynthesizedInfo] = Field(default_factory=dict)
+
+ # Viewpoint analysis
+ viewpoint_analysis: Optional[ViewpointAnalysis] = None
+
+ # Reflection results
+ enhanced_info: Dict[str, SynthesizedInfo] = Field(default_factory=dict)
+ reflection_metadata: Optional[ReflectionMetadata] = None
+
+ # Final report
+ final_report_html: str = ""
+
+ # Search cost tracking
+ search_costs: Dict[str, float] = Field(
+ default_factory=dict,
+ description="Total costs by search provider (e.g., {'exa': 0.0, 'tavily': 0.0})",
+ )
+ search_cost_details: List[Dict[str, Any]] = Field(
+ default_factory=list,
+ description="Detailed log of each search with cost information",
+ )
+ # Format: [{"provider": "exa", "query": "...", "cost": 0.0, "timestamp": ..., "step": "...", "sub_question": "..."}]
+
+ model_config = {
+ "extra": "ignore",
+ "frozen": False,
+ "validate_assignment": True,
+ }
+
+ def get_current_stage(self) -> str:
+ """Determine the current stage of research based on filled data."""
+ if self.final_report_html:
+ return "final_report"
+ elif self.enhanced_info:
+ return "after_reflection"
+ elif self.viewpoint_analysis:
+ return "after_viewpoint_analysis"
+ elif self.synthesized_info:
+ return "after_synthesis"
+ elif self.search_results:
+ return "after_search"
+ elif self.sub_questions:
+ return "after_query_decomposition"
+ elif self.main_query:
+ return "initial"
+ else:
+ return "empty"
+
+ def update_sub_questions(self, sub_questions: List[str]) -> None:
+ """Update the sub-questions list."""
+ self.sub_questions = sub_questions
+
+ def update_search_results(
+ self, search_results: Dict[str, List[SearchResult]]
+ ) -> None:
+ """Update the search results."""
+ self.search_results = search_results
+
+ def update_synthesized_info(
+ self, synthesized_info: Dict[str, SynthesizedInfo]
+ ) -> None:
+ """Update the synthesized information."""
+ self.synthesized_info = synthesized_info
+
+ def update_viewpoint_analysis(
+ self, viewpoint_analysis: ViewpointAnalysis
+ ) -> None:
+ """Update the viewpoint analysis."""
+ self.viewpoint_analysis = viewpoint_analysis
+
+ def update_after_reflection(
+ self,
+ enhanced_info: Dict[str, SynthesizedInfo],
+ metadata: ReflectionMetadata,
+ ) -> None:
+ """Update with reflection results."""
+ self.enhanced_info = enhanced_info
+ self.reflection_metadata = metadata
+
+ def set_final_report(self, html: str) -> None:
+ """Set the final report HTML."""
+ self.final_report_html = html
+
+
+class ReflectionOutput(BaseModel):
+ """Output from the reflection generation step."""
+
+ state: ResearchState
+ recommended_queries: List[str] = Field(default_factory=list)
+ critique_summary: List[Dict[str, Any]] = Field(default_factory=list)
+ additional_questions: List[str] = Field(default_factory=list)
+
+ model_config = {
+ "extra": "ignore",
+ "frozen": False,
+ "validate_assignment": True,
+ }
+
+
+class ApprovalDecision(BaseModel):
+ """Approval decision from human reviewer."""
+
+ approved: bool = False
+ selected_queries: List[str] = Field(default_factory=list)
+ approval_method: str = "" # "APPROVE_ALL", "SKIP", "SELECT_SPECIFIC"
+ reviewer_notes: str = ""
+ timestamp: float = Field(default_factory=lambda: time.time())
+
+ model_config = {
+ "extra": "ignore",
+ "frozen": False,
+ "validate_assignment": True,
+ }
+
+
+class PromptTypeMetrics(BaseModel):
+ """Metrics for a specific prompt type."""
+
+ prompt_type: str
+ total_cost: float
+ input_tokens: int
+ output_tokens: int
+ call_count: int
+ avg_cost_per_call: float
+ percentage_of_total_cost: float
+
+ model_config = {
+ "extra": "ignore",
+ "frozen": False,
+ "validate_assignment": True,
+ }
+
+
+class TracingMetadata(BaseModel):
+ """Metadata about token usage, costs, and performance for a pipeline run."""
+
+ # Pipeline information
+ pipeline_run_name: str = ""
+ pipeline_run_id: str = ""
+
+ # Token usage
+ total_input_tokens: int = 0
+ total_output_tokens: int = 0
+ total_tokens: int = 0
+
+ # Cost information
+ total_cost: float = 0.0
+ cost_breakdown_by_model: Dict[str, float] = Field(default_factory=dict)
+
+ # Performance metrics
+ total_latency_seconds: float = 0.0
+ formatted_latency: str = ""
+ observation_count: int = 0
+
+ # Model usage
+ models_used: List[str] = Field(default_factory=list)
+ model_token_breakdown: Dict[str, Dict[str, int]] = Field(
+ default_factory=dict
+ )
+ # Format: {"model_name": {"input_tokens": X, "output_tokens": Y, "total_tokens": Z}}
+
+ # Trace information
+ trace_id: str = ""
+ trace_name: str = ""
+ trace_tags: List[str] = Field(default_factory=list)
+ trace_metadata: Dict[str, Any] = Field(default_factory=dict)
+
+ # Step-by-step breakdown
+ step_costs: Dict[str, float] = Field(default_factory=dict)
+ step_tokens: Dict[str, Dict[str, int]] = Field(default_factory=dict)
+ # Format: {"step_name": {"input_tokens": X, "output_tokens": Y}}
+
+ # Prompt-level metrics
+ prompt_metrics: List[PromptTypeMetrics] = Field(
+ default_factory=list, description="Cost breakdown by prompt type"
+ )
+
+ # Search provider costs
+ search_costs: Dict[str, float] = Field(
+ default_factory=dict, description="Total costs by search provider"
+ )
+ search_queries_count: Dict[str, int] = Field(
+ default_factory=dict,
+ description="Number of queries by search provider",
+ )
+ search_cost_details: List[Dict[str, Any]] = Field(
+ default_factory=list, description="Detailed search cost information"
+ )
+
+ # Timestamp
+ collected_at: float = Field(default_factory=lambda: time.time())
+
+ model_config = {
+ "extra": "ignore",
+ "frozen": False,
+ "validate_assignment": True,
+ }
diff --git a/deep_research/utils/search_utils.py b/deep_research/utils/search_utils.py
new file mode 100644
index 00000000..a9b04d51
--- /dev/null
+++ b/deep_research/utils/search_utils.py
@@ -0,0 +1,721 @@
+import logging
+import os
+from enum import Enum
+from typing import Any, Dict, List, Optional, Union
+
+from tavily import TavilyClient
+
+try:
+ from exa_py import Exa
+
+ EXA_AVAILABLE = True
+except ImportError:
+ EXA_AVAILABLE = False
+ Exa = None
+
+from utils.llm_utils import get_structured_llm_output
+from utils.prompts import DEFAULT_SEARCH_QUERY_PROMPT
+from utils.pydantic_models import SearchResult
+
+logger = logging.getLogger(__name__)
+
+
+class SearchProvider(Enum):
+ TAVILY = "tavily"
+ EXA = "exa"
+ BOTH = "both"
+
+
+class SearchEngineConfig:
+ """Configuration for search engines"""
+
+ def __init__(self):
+ self.tavily_api_key = os.getenv("TAVILY_API_KEY")
+ self.exa_api_key = os.getenv("EXA_API_KEY")
+ self.default_provider = os.getenv("DEFAULT_SEARCH_PROVIDER", "tavily")
+ self.enable_parallel_search = (
+ os.getenv("ENABLE_PARALLEL_SEARCH", "false").lower() == "true"
+ )
+
+
+def get_search_client(provider: Union[str, SearchProvider]) -> Optional[Any]:
+ """Get the appropriate search client based on provider."""
+ if isinstance(provider, str):
+ provider = SearchProvider(provider.lower())
+
+ config = SearchEngineConfig()
+
+ if provider == SearchProvider.TAVILY:
+ if not config.tavily_api_key:
+ raise ValueError("TAVILY_API_KEY environment variable not set")
+ return TavilyClient(api_key=config.tavily_api_key)
+
+ elif provider == SearchProvider.EXA:
+ if not EXA_AVAILABLE:
+ raise ImportError(
+ "exa-py is not installed. Please install it with: pip install exa-py"
+ )
+ if not config.exa_api_key:
+ raise ValueError("EXA_API_KEY environment variable not set")
+ return Exa(config.exa_api_key)
+
+ return None
+
+
+def tavily_search(
+ query: str,
+ include_raw_content: bool = True,
+ max_results: int = 3,
+ cap_content_length: int = 20000,
+) -> Dict[str, Any]:
+ """Perform a search using the Tavily API.
+
+ Args:
+ query: Search query
+ include_raw_content: Whether to include raw content in results
+ max_results: Maximum number of results to return
+ cap_content_length: Maximum length of content to return
+
+ Returns:
+ Dict[str, Any]: Search results from Tavily in the following format:
+ {
+ "query": str, # The original query
+ "results": List[Dict], # List of search result objects
+ "error": str, # Error message (if an error occurred, otherwise omitted)
+ }
+
+ Each result in "results" has the following structure:
+ {
+ "url": str, # URL of the search result
+ "raw_content": str, # Raw content of the page (if include_raw_content=True)
+ "title": str, # Title of the page
+ "snippet": str, # Snippet of the page content
+ }
+ """
+ try:
+ tavily_client = get_search_client(SearchProvider.TAVILY)
+
+ # First try with advanced search
+ results = tavily_client.search(
+ query=query,
+ include_raw_content=include_raw_content,
+ max_results=max_results,
+ search_depth="advanced", # Use advanced search for better results
+ include_domains=[], # No domain restrictions
+ exclude_domains=[], # No exclusions
+ include_answer=False, # We don't need the answer field
+ include_images=False, # We don't need images
+ # Note: 'include_snippets' is not a supported parameter
+ )
+
+ # Check if we got good results (with non-None and non-empty content)
+ if include_raw_content and "results" in results:
+ bad_content_count = sum(
+ 1
+ for r in results["results"]
+ if "raw_content" in r
+ and (
+ r["raw_content"] is None or r["raw_content"].strip() == ""
+ )
+ )
+
+ # If more than half of results have bad content, try a different approach
+ if bad_content_count > len(results["results"]) / 2:
+ logger.warning(
+ f"{bad_content_count}/{len(results['results'])} results have None or empty content. "
+ "Trying to use 'content' field instead of 'raw_content'..."
+ )
+
+ # Try to use the 'content' field which comes by default
+ for result in results["results"]:
+ if (
+ "raw_content" in result
+ and (
+ result["raw_content"] is None
+ or result["raw_content"].strip() == ""
+ )
+ ) and "content" in result:
+ result["raw_content"] = result["content"]
+ logger.info(
+ f"Using 'content' field as 'raw_content' for URL {result.get('url', 'unknown')}"
+ )
+
+ # Re-check after our fix
+ bad_content_count = sum(
+ 1
+ for r in results["results"]
+ if "raw_content" in r
+ and (
+ r["raw_content"] is None
+ or r["raw_content"].strip() == ""
+ )
+ )
+
+ if bad_content_count > 0:
+ logger.warning(
+ f"Still have {bad_content_count}/{len(results['results'])} results with bad content after fixes."
+ )
+
+ # Try alternative approach - search with 'include_answer=True'
+ try:
+ # Search with include_answer=True which may give us better content
+ logger.info(
+ "Trying alternative search with include_answer=True"
+ )
+ alt_results = tavily_client.search(
+ query=query,
+ include_raw_content=include_raw_content,
+ max_results=max_results,
+ search_depth="advanced",
+ include_domains=[],
+ exclude_domains=[],
+ include_answer=True, # Include answer this time
+ include_images=False,
+ )
+
+ # Check if we got any improved content
+ if "results" in alt_results:
+ # Create a merged results set taking the best content
+ for i, result in enumerate(alt_results["results"]):
+ if i < len(results["results"]):
+ if (
+ "raw_content" in result
+ and result["raw_content"]
+ and (
+ results["results"][i].get(
+ "raw_content"
+ )
+ is None
+ or results["results"][i]
+ .get("raw_content", "")
+ .strip()
+ == ""
+ )
+ ):
+ # Replace the bad content with better content from alt_results
+ results["results"][i]["raw_content"] = (
+ result["raw_content"]
+ )
+ logger.info(
+ f"Replaced bad content with better content from alternative search for URL {result.get('url', 'unknown')}"
+ )
+
+ # If answer is available, add it as a special result
+ if "answer" in alt_results and alt_results["answer"]:
+ answer_text = alt_results["answer"]
+ answer_result = {
+ "url": "tavily-generated-answer",
+ "title": "Generated Answer",
+ "raw_content": f"Generated Answer based on search results:\n\n{answer_text}",
+ "content": answer_text,
+ }
+ results["results"].append(answer_result)
+ logger.info(
+ "Added Tavily generated answer as additional search result"
+ )
+
+ except Exception as alt_error:
+ logger.warning(
+ f"Failed to get better results with alternative search: {alt_error}"
+ )
+
+ # Cap content length if specified
+ if cap_content_length > 0 and "results" in results:
+ for result in results["results"]:
+ if "raw_content" in result and result["raw_content"]:
+ result["raw_content"] = result["raw_content"][
+ :cap_content_length
+ ]
+
+ return results
+ except Exception as e:
+ logger.error(f"Error in Tavily search: {e}")
+ # Return an error structure that's compatible with our expected format
+ return {"query": query, "results": [], "error": str(e)}
+
+
+def exa_search(
+ query: str,
+ max_results: int = 3,
+ cap_content_length: int = 20000,
+ search_mode: str = "auto",
+ include_highlights: bool = False,
+) -> Dict[str, Any]:
+ """Perform a search using the Exa API.
+
+ Args:
+ query: Search query
+ max_results: Maximum number of results to return
+ cap_content_length: Maximum length of content to return
+ search_mode: Search mode ("neural", "keyword", or "auto")
+ include_highlights: Whether to include highlights in results
+
+ Returns:
+ Dict[str, Any]: Search results from Exa in a format compatible with Tavily
+ """
+ try:
+ exa_client = get_search_client(SearchProvider.EXA)
+
+ # Configure content options
+ text_options = {"max_characters": cap_content_length}
+
+ kwargs = {
+ "query": query,
+ "num_results": max_results,
+ "type": search_mode, # "neural", "keyword", or "auto"
+ "text": text_options,
+ }
+
+ if include_highlights:
+ kwargs["highlights"] = {
+ "highlights_per_url": 2,
+ "num_sentences": 3,
+ }
+
+ response = exa_client.search_and_contents(**kwargs)
+
+ # Extract cost information
+ exa_cost = 0.0
+ if hasattr(response, "cost_dollars") and hasattr(
+ response.cost_dollars, "total"
+ ):
+ exa_cost = response.cost_dollars.total
+ logger.info(
+ f"Exa search cost for query '{query}': ${exa_cost:.4f}"
+ )
+
+ # Convert to standardized format compatible with Tavily
+ results = {"query": query, "results": [], "exa_cost": exa_cost}
+
+ for r in response.results:
+ result_dict = {
+ "url": r.url,
+ "title": r.title or "",
+ "snippet": "",
+ "raw_content": getattr(r, "text", ""),
+ "content": getattr(r, "text", ""),
+ }
+
+ # Add highlights as snippet if available
+ if hasattr(r, "highlights") and r.highlights:
+ result_dict["snippet"] = " ".join(r.highlights[:1])
+
+ # Store additional metadata
+ result_dict["_metadata"] = {
+ "provider": "exa",
+ "score": getattr(r, "score", None),
+ "published_date": getattr(r, "published_date", None),
+ "author": getattr(r, "author", None),
+ }
+
+ results["results"].append(result_dict)
+
+ return results
+
+ except Exception as e:
+ logger.error(f"Error in Exa search: {e}")
+ return {"query": query, "results": [], "error": str(e)}
+
+
+def unified_search(
+ query: str,
+ provider: Union[str, SearchProvider, None] = None,
+ max_results: int = 3,
+ cap_content_length: int = 20000,
+ search_mode: str = "auto",
+ include_highlights: bool = False,
+ compare_results: bool = False,
+ **kwargs,
+) -> Union[List[SearchResult], Dict[str, List[SearchResult]]]:
+ """Unified search interface supporting multiple providers.
+
+ Args:
+ query: Search query
+ provider: Search provider to use (tavily, exa, both)
+ max_results: Maximum number of results
+ cap_content_length: Maximum content length
+ search_mode: Search mode for Exa ("neural", "keyword", "auto")
+ include_highlights: Include highlights for Exa results
+ compare_results: Return results from both providers separately
+
+ Returns:
+ List[SearchResult] or Dict mapping provider to results (when compare_results=True or provider="both")
+ """
+ # Use default provider if not specified
+ if provider is None:
+ config = SearchEngineConfig()
+ provider = config.default_provider
+
+ # Convert string to enum if needed
+ if isinstance(provider, str):
+ provider = SearchProvider(provider.lower())
+
+ # Handle single provider case
+ if provider == SearchProvider.TAVILY:
+ results = tavily_search(
+ query,
+ max_results=max_results,
+ cap_content_length=cap_content_length,
+ )
+ extracted, cost = extract_search_results(results, provider="tavily")
+ return extracted if not compare_results else {"tavily": extracted}
+
+ elif provider == SearchProvider.EXA:
+ results = exa_search(
+ query=query,
+ max_results=max_results,
+ cap_content_length=cap_content_length,
+ search_mode=search_mode,
+ include_highlights=include_highlights,
+ )
+ extracted, cost = extract_search_results(results, provider="exa")
+ return extracted if not compare_results else {"exa": extracted}
+
+ elif provider == SearchProvider.BOTH:
+ # Run both searches
+ tavily_results = tavily_search(
+ query,
+ max_results=max_results,
+ cap_content_length=cap_content_length,
+ )
+ exa_results = exa_search(
+ query=query,
+ max_results=max_results,
+ cap_content_length=cap_content_length,
+ search_mode=search_mode,
+ include_highlights=include_highlights,
+ )
+
+ # Extract results from both
+ tavily_extracted, tavily_cost = extract_search_results(
+ tavily_results, provider="tavily"
+ )
+ exa_extracted, exa_cost = extract_search_results(
+ exa_results, provider="exa"
+ )
+
+ if compare_results:
+ return {"tavily": tavily_extracted, "exa": exa_extracted}
+ else:
+ # Merge results, interleaving them
+ merged = []
+ max_len = max(len(tavily_extracted), len(exa_extracted))
+ for i in range(max_len):
+ if i < len(tavily_extracted):
+ merged.append(tavily_extracted[i])
+ if i < len(exa_extracted):
+ merged.append(exa_extracted[i])
+ return merged[:max_results] # Limit to requested number
+
+ else:
+ raise ValueError(f"Unknown provider: {provider}")
+
+
+def extract_search_results(
+ search_results: Dict[str, Any], provider: str = "tavily"
+) -> tuple[List[SearchResult], float]:
+ """Extract SearchResult objects from provider-specific API responses.
+
+ Args:
+ search_results: Results from search API
+ provider: Which provider the results came from
+
+ Returns:
+ Tuple of (List[SearchResult], float): List of converted SearchResult objects with standardized fields
+ and the search cost (0.0 if not available).
+ SearchResult is a Pydantic model defined in data_models.py that includes:
+ - url: The URL of the search result
+ - content: The raw content of the page
+ - title: The title of the page
+ - snippet: A brief snippet of the page content
+ """
+ results_list = []
+ search_cost = search_results.get(
+ "exa_cost", 0.0
+ ) # Extract cost if present
+
+ if "results" in search_results:
+ for result in search_results["results"]:
+ if "url" in result:
+ # Get fields with defaults
+ url = result["url"]
+ title = result.get("title", "")
+
+ # Try to extract the best content available:
+ # 1. First try raw_content (if we requested it)
+ # 2. Then try regular content (always available)
+ # 3. Then try to use snippet combined with title
+ # 4. Last resort: use just title
+
+ raw_content = result.get("raw_content", None)
+ regular_content = result.get("content", "")
+ snippet = result.get("snippet", "")
+
+ # Set our final content - prioritize raw_content if available and not None
+ if raw_content is not None and raw_content.strip():
+ content = raw_content
+ # Next best is the regular content field
+ elif regular_content and regular_content.strip():
+ content = regular_content
+ logger.info(
+ f"Using 'content' field for URL {url} because raw_content was not available"
+ )
+ # Try to create a usable content from snippet and title
+ elif snippet:
+ content = f"Title: {title}\n\nContent: {snippet}"
+ logger.warning(
+ f"Using title and snippet as content fallback for {url}"
+ )
+ # Last resort - just use the title
+ elif title:
+ content = (
+ f"Title: {title}\n\nNo content available for this URL."
+ )
+ logger.warning(
+ f"Using only title as content fallback for {url}"
+ )
+ # Nothing available
+ else:
+ content = ""
+ logger.warning(
+ f"No content available for URL {url}, using empty string"
+ )
+
+ # Create SearchResult with provider metadata
+ search_result = SearchResult(
+ url=url,
+ content=content,
+ title=title,
+ snippet=snippet,
+ )
+
+ # Add provider info to metadata if available
+ if "_metadata" in result:
+ search_result.metadata = result["_metadata"]
+ else:
+ search_result.metadata = {"provider": provider}
+
+ results_list.append(search_result)
+
+ # If we got the answer (Tavily specific), add it as a special result
+ if (
+ provider == "tavily"
+ and "answer" in search_results
+ and search_results["answer"]
+ ):
+ answer_text = search_results["answer"]
+ results_list.append(
+ SearchResult(
+ url="tavily-generated-answer",
+ content=f"Generated Answer based on search results:\n\n{answer_text}",
+ title="Tavily Generated Answer",
+ snippet=answer_text[:100] + "..."
+ if len(answer_text) > 100
+ else answer_text,
+ metadata={"provider": "tavily", "type": "generated_answer"},
+ )
+ )
+ logger.info("Added Tavily generated answer as a search result")
+
+ return results_list, search_cost
+
+
+def generate_search_query(
+ sub_question: str,
+ model: str = "sambanova/Llama-4-Maverick-17B-128E-Instruct",
+ system_prompt: Optional[str] = None,
+ project: str = "deep-research",
+) -> Dict[str, Any]:
+ """Generate an optimized search query for a sub-question.
+
+ Uses litellm for model inference via get_structured_llm_output.
+
+ Args:
+ sub_question: The sub-question to generate a search query for
+ model: Model to use (with provider prefix)
+ system_prompt: System prompt for the LLM, defaults to DEFAULT_SEARCH_QUERY_PROMPT
+ project: Langfuse project name for LLM tracking
+
+ Returns:
+ Dictionary with search query and reasoning
+ """
+ if system_prompt is None:
+ system_prompt = DEFAULT_SEARCH_QUERY_PROMPT
+
+ fallback_response = {"search_query": sub_question, "reasoning": ""}
+
+ return get_structured_llm_output(
+ prompt=sub_question,
+ system_prompt=system_prompt,
+ model=model,
+ fallback_response=fallback_response,
+ project=project,
+ )
+
+
+def search_and_extract_results(
+ query: str,
+ max_results: int = 3,
+ cap_content_length: int = 20000,
+ max_retries: int = 2,
+ provider: Optional[Union[str, SearchProvider]] = None,
+ search_mode: str = "auto",
+ include_highlights: bool = False,
+) -> tuple[List[SearchResult], float]:
+ """Perform a search and extract results in one step.
+
+ Args:
+ query: Search query
+ max_results: Maximum number of results to return
+ cap_content_length: Maximum length of content to return
+ max_retries: Maximum number of retries in case of failure
+ provider: Search provider to use (tavily, exa, both)
+ search_mode: Search mode for Exa ("neural", "keyword", "auto")
+ include_highlights: Include highlights for Exa results
+
+ Returns:
+ Tuple of (List of SearchResult objects, search cost)
+ """
+ results = []
+ total_cost = 0.0
+ retry_count = 0
+
+ # List of alternative query formats to try if the original query fails
+ # to yield good results with non-None content
+ alternative_queries = [
+ query, # Original query first
+ f'"{query}"', # Try exact phrase matching
+ f"about {query}", # Try broader context
+ f"research on {query}", # Try research-oriented results
+ query.replace(" OR ", " "), # Try without OR operator
+ ]
+
+ while retry_count <= max_retries and retry_count < len(
+ alternative_queries
+ ):
+ try:
+ current_query = alternative_queries[retry_count]
+ logger.info(
+ f"Searching with query ({retry_count + 1}/{max_retries + 1}): {current_query}"
+ )
+
+ # Determine if we're using Exa to track costs
+ using_exa = False
+ if provider:
+ if isinstance(provider, str):
+ using_exa = provider.lower() in ["exa", "both"]
+ else:
+ using_exa = provider in [
+ SearchProvider.EXA,
+ SearchProvider.BOTH,
+ ]
+ else:
+ config = SearchEngineConfig()
+ using_exa = config.default_provider.lower() in ["exa", "both"]
+
+ # Perform search based on provider
+ if using_exa and provider != SearchProvider.BOTH:
+ # Direct Exa search
+ search_results = exa_search(
+ query=current_query,
+ max_results=max_results,
+ cap_content_length=cap_content_length,
+ search_mode=search_mode,
+ include_highlights=include_highlights,
+ )
+ results, cost = extract_search_results(
+ search_results, provider="exa"
+ )
+ total_cost += cost
+ elif provider == SearchProvider.BOTH:
+ # Search with both providers
+ tavily_results = tavily_search(
+ current_query,
+ max_results=max_results,
+ cap_content_length=cap_content_length,
+ )
+ exa_results = exa_search(
+ query=current_query,
+ max_results=max_results,
+ cap_content_length=cap_content_length,
+ search_mode=search_mode,
+ include_highlights=include_highlights,
+ )
+
+ # Extract results from both
+ tavily_extracted, _ = extract_search_results(
+ tavily_results, provider="tavily"
+ )
+ exa_extracted, exa_cost = extract_search_results(
+ exa_results, provider="exa"
+ )
+ total_cost += exa_cost
+
+ # Merge results
+ results = []
+ max_len = max(len(tavily_extracted), len(exa_extracted))
+ for i in range(max_len):
+ if i < len(tavily_extracted):
+ results.append(tavily_extracted[i])
+ if i < len(exa_extracted):
+ results.append(exa_extracted[i])
+ results = results[:max_results]
+ else:
+ # Tavily search or unified search
+ results = unified_search(
+ query=current_query,
+ provider=provider,
+ max_results=max_results,
+ cap_content_length=cap_content_length,
+ search_mode=search_mode,
+ include_highlights=include_highlights,
+ )
+
+ # Handle case where unified_search returns a dict
+ if isinstance(results, dict):
+ all_results = []
+ for provider_results in results.values():
+ all_results.extend(provider_results)
+ results = all_results[:max_results]
+
+ # Check if we got results with actual content
+ if results:
+ # Count results with non-empty content
+ content_results = sum(1 for r in results if r.content.strip())
+
+ if content_results >= max(1, len(results) // 2):
+ logger.info(
+ f"Found {content_results}/{len(results)} results with content"
+ )
+ return results, total_cost
+ else:
+ logger.warning(
+ f"Only found {content_results}/{len(results)} results with content. "
+ f"Trying alternative query..."
+ )
+
+ # If we didn't get good results but haven't hit max retries yet, try again
+ if retry_count < max_retries:
+ logger.warning(
+ f"Inadequate search results. Retrying with alternative query... ({retry_count + 1}/{max_retries})"
+ )
+ retry_count += 1
+ else:
+ # If we're out of retries, return whatever we have
+ logger.warning(
+ f"Out of retries. Returning best results found ({len(results)} results)."
+ )
+ return results, total_cost
+
+ except Exception as e:
+ if retry_count < max_retries:
+ logger.warning(
+ f"Search failed with error: {e}. Retrying... ({retry_count + 1}/{max_retries})"
+ )
+ retry_count += 1
+ else:
+ logger.error(f"Search failed after {max_retries} retries: {e}")
+ return [], 0.0
+
+ # If we've exhausted all retries, return the best results we have
+ return results, total_cost
diff --git a/deep_research/utils/tracing_metadata_utils.py b/deep_research/utils/tracing_metadata_utils.py
new file mode 100644
index 00000000..59c7b37e
--- /dev/null
+++ b/deep_research/utils/tracing_metadata_utils.py
@@ -0,0 +1,745 @@
+"""Utilities for collecting and analyzing tracing metadata from Langfuse."""
+
+import time
+from datetime import datetime, timedelta, timezone
+from functools import wraps
+from typing import Any, Dict, List, Optional, Tuple
+
+from langfuse import Langfuse
+from langfuse.api.core import ApiError
+from langfuse.client import ObservationsView, TraceWithDetails
+from rich import print
+from rich.console import Console
+from rich.table import Table
+
+console = Console()
+
+langfuse = Langfuse()
+
+# Prompt type identification keywords
+PROMPT_IDENTIFIERS = {
+ "query_decomposition": [
+ "MAIN RESEARCH QUERY",
+ "DIFFERENT DIMENSIONS",
+ "sub-questions",
+ ],
+ "search_query": ["Deep Research assistant", "effective search query"],
+ "synthesis": [
+ "information synthesis",
+ "comprehensive answer",
+ "confidence level",
+ ],
+ "viewpoint_analysis": [
+ "multi-perspective analysis",
+ "viewpoint categories",
+ ],
+ "reflection": ["critique and improve", "information gaps"],
+ "additional_synthesis": ["enhance the original synthesis"],
+ "conclusion_generation": [
+ "Synthesis and Integration",
+ "Direct Response to Main Query",
+ ],
+ "executive_summary": [
+ "executive summaries",
+ "Key Findings",
+ "250-400 words",
+ ],
+ "introduction": ["engaging introductions", "Context and Relevance"],
+}
+
+# Rate limiting configuration
+# Adjust these based on your Langfuse tier:
+# - Hobby: 30 req/min for Other APIs -> ~2s between requests
+# - Core: 100 req/min -> ~0.6s between requests
+# - Pro: 1000 req/min -> ~0.06s between requests
+RATE_LIMIT_DELAY = 0.1 # 100ms between requests (safe for most tiers)
+MAX_RETRIES = 3
+INITIAL_BACKOFF = 1.0 # Initial backoff in seconds
+
+# Batch processing configuration
+BATCH_DELAY = 0.5 # Additional delay between batches of requests
+
+
+def rate_limited(func):
+ """Decorator to add rate limiting between API calls."""
+
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ time.sleep(RATE_LIMIT_DELAY)
+ return func(*args, **kwargs)
+
+ return wrapper
+
+
+def retry_with_backoff(func):
+ """Decorator to retry functions with exponential backoff on rate limit errors."""
+
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ backoff = INITIAL_BACKOFF
+ last_exception = None
+
+ for attempt in range(MAX_RETRIES):
+ try:
+ return func(*args, **kwargs)
+ except ApiError as e:
+ if e.status_code == 429: # Rate limit error
+ last_exception = e
+ if attempt < MAX_RETRIES - 1:
+ wait_time = backoff * (2**attempt)
+ console.print(
+ f"[yellow]Rate limit hit. Retrying in {wait_time:.1f}s...[/yellow]"
+ )
+ time.sleep(wait_time)
+ continue
+ raise
+ except Exception:
+ # For non-rate limit errors, raise immediately
+ raise
+
+ # If we've exhausted all retries
+ if last_exception:
+ raise last_exception
+
+ return wrapper
+
+
+@rate_limited
+@retry_with_backoff
+def fetch_traces_safe(limit: Optional[int] = None) -> List[TraceWithDetails]:
+ """Safely fetch traces with rate limiting and retry logic."""
+ return langfuse.fetch_traces(limit=limit).data
+
+
+@rate_limited
+@retry_with_backoff
+def fetch_observations_safe(trace_id: str) -> List[ObservationsView]:
+ """Safely fetch observations with rate limiting and retry logic."""
+ return langfuse.fetch_observations(trace_id=trace_id).data
+
+
+def get_total_trace_cost(trace_id: str) -> float:
+ """Calculate the total cost for a single trace by summing all observation costs.
+
+ Args:
+ trace_id: The ID of the trace to calculate cost for
+
+ Returns:
+ Total cost across all observations in the trace
+ """
+ try:
+ observations = fetch_observations_safe(trace_id=trace_id)
+ total_cost = 0.0
+
+ for obs in observations:
+ # Check multiple possible cost fields
+ if (
+ hasattr(obs, "calculated_total_cost")
+ and obs.calculated_total_cost
+ ):
+ total_cost += obs.calculated_total_cost
+ elif hasattr(obs, "total_price") and obs.total_price:
+ total_cost += obs.total_price
+ elif hasattr(obs, "total_cost") and obs.total_cost:
+ total_cost += obs.total_cost
+ # If cost details are available, calculate from input/output costs
+ elif hasattr(obs, "calculated_input_cost") and hasattr(
+ obs, "calculated_output_cost"
+ ):
+ if obs.calculated_input_cost and obs.calculated_output_cost:
+ total_cost += (
+ obs.calculated_input_cost + obs.calculated_output_cost
+ )
+
+ return total_cost
+ except Exception as e:
+ print(f"[red]Error calculating trace cost: {e}[/red]")
+ return 0.0
+
+
+def get_total_tokens_used(trace_id: str) -> Tuple[int, int]:
+ """Calculate total input and output tokens used for a trace.
+
+ Args:
+ trace_id: The ID of the trace to calculate tokens for
+
+ Returns:
+ Tuple of (input_tokens, output_tokens)
+ """
+ try:
+ observations = fetch_observations_safe(trace_id=trace_id)
+ total_input_tokens = 0
+ total_output_tokens = 0
+
+ for obs in observations:
+ # Check for token fields in different possible locations
+ if hasattr(obs, "usage") and obs.usage:
+ if hasattr(obs.usage, "input") and obs.usage.input:
+ total_input_tokens += obs.usage.input
+ if hasattr(obs.usage, "output") and obs.usage.output:
+ total_output_tokens += obs.usage.output
+ # Also check for direct token fields
+ elif hasattr(obs, "promptTokens") and hasattr(
+ obs, "completionTokens"
+ ):
+ if obs.promptTokens:
+ total_input_tokens += obs.promptTokens
+ if obs.completionTokens:
+ total_output_tokens += obs.completionTokens
+
+ return total_input_tokens, total_output_tokens
+ except Exception as e:
+ print(f"[red]Error calculating tokens: {e}[/red]")
+ return 0, 0
+
+
+def get_trace_stats(trace: TraceWithDetails) -> Dict[str, Any]:
+ """Get comprehensive statistics for a trace.
+
+ Args:
+ trace: The trace object to analyze
+
+ Returns:
+ Dictionary containing trace statistics including cost, latency, tokens, and metadata
+ """
+ try:
+ # Get cost and token data
+ total_cost = get_total_trace_cost(trace.id)
+ input_tokens, output_tokens = get_total_tokens_used(trace.id)
+
+ # Get observation count
+ observations = fetch_observations_safe(trace_id=trace.id)
+ observation_count = len(observations)
+
+ # Extract model information from observations
+ models_used = set()
+ for obs in observations:
+ if hasattr(obs, "model") and obs.model:
+ models_used.add(obs.model)
+
+ stats = {
+ "trace_id": trace.id,
+ "timestamp": trace.timestamp,
+ "total_cost": total_cost,
+ "latency_seconds": trace.latency
+ if hasattr(trace, "latency")
+ else 0,
+ "input_tokens": input_tokens,
+ "output_tokens": output_tokens,
+ "total_tokens": input_tokens + output_tokens,
+ "observation_count": observation_count,
+ "models_used": list(models_used),
+ "metadata": trace.metadata if hasattr(trace, "metadata") else {},
+ "tags": trace.tags if hasattr(trace, "tags") else [],
+ "user_id": trace.user_id if hasattr(trace, "user_id") else None,
+ "session_id": trace.session_id
+ if hasattr(trace, "session_id")
+ else None,
+ }
+
+ # Add formatted latency
+ if stats["latency_seconds"]:
+ minutes = int(stats["latency_seconds"] // 60)
+ seconds = stats["latency_seconds"] % 60
+ stats["latency_formatted"] = f"{minutes}m {seconds:.1f}s"
+ else:
+ stats["latency_formatted"] = "0m 0.0s"
+
+ return stats
+ except Exception as e:
+ print(f"[red]Error getting trace stats: {e}[/red]")
+ return {}
+
+
+def get_traces_by_name(name: str, limit: int = 1) -> List[TraceWithDetails]:
+ """Get traces by name using Langfuse API.
+
+ Args:
+ name: The name of the trace to search for
+ limit: Maximum number of traces to return (default: 1)
+
+ Returns:
+ List of traces matching the name
+ """
+ try:
+ # Use the Langfuse API to get traces by name
+ traces_response = langfuse.get_traces(name=name, limit=limit)
+ return traces_response.data
+ except Exception as e:
+ print(f"[red]Error fetching traces by name: {e}[/red]")
+ return []
+
+
+def get_observations_for_trace(trace_id: str) -> List[ObservationsView]:
+ """Get all observations for a specific trace.
+
+ Args:
+ trace_id: The ID of the trace
+
+ Returns:
+ List of observations for the trace
+ """
+ try:
+ observations_response = langfuse.get_observations(trace_id=trace_id)
+ return observations_response.data
+ except Exception as e:
+ print(f"[red]Error fetching observations: {e}[/red]")
+ return []
+
+
+def filter_traces_by_date_range(
+ start_date: datetime, end_date: datetime, limit: Optional[int] = None
+) -> List[TraceWithDetails]:
+ """Filter traces within a specific date range.
+
+ Args:
+ start_date: Start of the date range (inclusive)
+ end_date: End of the date range (inclusive)
+ limit: Maximum number of traces to return
+
+ Returns:
+ List of traces within the date range
+ """
+ try:
+ # Ensure dates are timezone-aware
+ if start_date.tzinfo is None:
+ start_date = start_date.replace(tzinfo=timezone.utc)
+ if end_date.tzinfo is None:
+ end_date = end_date.replace(tzinfo=timezone.utc)
+
+ # Fetch all traces (or up to API maximum limit of 100)
+ all_traces = fetch_traces_safe(limit=limit or 100)
+
+ # Filter by date range
+ filtered_traces = [
+ trace
+ for trace in all_traces
+ if start_date <= trace.timestamp <= end_date
+ ]
+
+ # Sort by timestamp (most recent first)
+ filtered_traces.sort(key=lambda x: x.timestamp, reverse=True)
+
+ # Apply limit if specified
+ if limit:
+ filtered_traces = filtered_traces[:limit]
+
+ return filtered_traces
+ except Exception as e:
+ print(f"[red]Error filtering traces by date range: {e}[/red]")
+ return []
+
+
+def get_traces_last_n_days(
+ days: int, limit: Optional[int] = None
+) -> List[TraceWithDetails]:
+ """Get traces from the last N days.
+
+ Args:
+ days: Number of days to look back
+ limit: Maximum number of traces to return
+
+ Returns:
+ List of traces from the last N days
+ """
+ end_date = datetime.now(timezone.utc)
+ start_date = end_date - timedelta(days=days)
+
+ return filter_traces_by_date_range(start_date, end_date, limit)
+
+
+def get_trace_stats_batch(
+ traces: List[TraceWithDetails], show_progress: bool = True
+) -> List[Dict[str, Any]]:
+ """Get statistics for multiple traces efficiently with progress tracking.
+
+ Args:
+ traces: List of traces to analyze
+ show_progress: Whether to show progress bar
+
+ Returns:
+ List of dictionaries containing trace statistics
+ """
+ stats_list = []
+
+ for i, trace in enumerate(traces):
+ if show_progress and i % 5 == 0:
+ console.print(
+ f"[dim]Processing trace {i + 1}/{len(traces)}...[/dim]"
+ )
+
+ stats = get_trace_stats(trace)
+ stats_list.append(stats)
+
+ return stats_list
+
+
+def get_aggregate_stats_for_traces(
+ traces: List[TraceWithDetails],
+) -> Dict[str, Any]:
+ """Calculate aggregate statistics for a list of traces.
+
+ Args:
+ traces: List of traces to analyze
+
+ Returns:
+ Dictionary containing aggregate statistics
+ """
+ if not traces:
+ return {
+ "trace_count": 0,
+ "total_cost": 0.0,
+ "total_input_tokens": 0,
+ "total_output_tokens": 0,
+ "total_tokens": 0,
+ "average_cost_per_trace": 0.0,
+ "average_latency_seconds": 0.0,
+ "total_observations": 0,
+ }
+
+ total_cost = 0.0
+ total_input_tokens = 0
+ total_output_tokens = 0
+ total_latency = 0.0
+ total_observations = 0
+ all_models = set()
+
+ for trace in traces:
+ stats = get_trace_stats(trace)
+ total_cost += stats.get("total_cost", 0)
+ total_input_tokens += stats.get("input_tokens", 0)
+ total_output_tokens += stats.get("output_tokens", 0)
+ total_latency += stats.get("latency_seconds", 0)
+ total_observations += stats.get("observation_count", 0)
+ all_models.update(stats.get("models_used", []))
+
+ return {
+ "trace_count": len(traces),
+ "total_cost": total_cost,
+ "total_input_tokens": total_input_tokens,
+ "total_output_tokens": total_output_tokens,
+ "total_tokens": total_input_tokens + total_output_tokens,
+ "average_cost_per_trace": total_cost / len(traces) if traces else 0,
+ "average_latency_seconds": total_latency / len(traces)
+ if traces
+ else 0,
+ "total_observations": total_observations,
+ "models_used": list(all_models),
+ }
+
+
+def display_trace_stats_table(
+ traces: List[TraceWithDetails], title: str = "Trace Statistics"
+):
+ """Display trace statistics in a formatted table.
+
+ Args:
+ traces: List of traces to display
+ title: Title for the table
+ """
+ table = Table(title=title, show_header=True, header_style="bold magenta")
+ table.add_column("Trace ID", style="cyan", no_wrap=True)
+ table.add_column("Timestamp", style="yellow")
+ table.add_column("Cost ($)", justify="right", style="green")
+ table.add_column("Tokens (In/Out)", justify="right")
+ table.add_column("Latency", justify="right")
+ table.add_column("Observations", justify="right")
+
+ for trace in traces[:10]: # Limit to 10 for display
+ stats = get_trace_stats(trace)
+ table.add_row(
+ stats["trace_id"][:12] + "...",
+ stats["timestamp"].strftime("%Y-%m-%d %H:%M"),
+ f"${stats['total_cost']:.4f}",
+ f"{stats['input_tokens']:,}/{stats['output_tokens']:,}",
+ stats["latency_formatted"],
+ str(stats["observation_count"]),
+ )
+
+ console.print(table)
+
+
+def identify_prompt_type(observation: ObservationsView) -> str:
+ """Identify the prompt type based on keywords in the observation's input.
+
+ Examines the system prompt in observation.input['messages'][0]['content']
+ for unique keywords that identify each prompt type.
+
+ Args:
+ observation: The observation to analyze
+
+ Returns:
+ str: The prompt type name, or "unknown" if not identified
+ """
+ try:
+ # Access the system prompt from the messages
+ if hasattr(observation, "input") and observation.input:
+ messages = observation.input.get("messages", [])
+ if messages and len(messages) > 0:
+ system_content = messages[0].get("content", "")
+
+ # Check each prompt type's keywords
+ for prompt_type, keywords in PROMPT_IDENTIFIERS.items():
+ # Check if any keyword is in the system prompt
+ for keyword in keywords:
+ if keyword in system_content:
+ return prompt_type
+
+ return "unknown"
+ except Exception as e:
+ console.print(
+ f"[yellow]Warning: Could not identify prompt type: {e}[/yellow]"
+ )
+ return "unknown"
+
+
+def get_costs_by_prompt_type(trace_id: str) -> Dict[str, Dict[str, float]]:
+ """Get cost breakdown by prompt type for a given trace.
+
+ Uses observation.usage.input/output for token counts and
+ observation.calculated_total_cost for costs.
+
+ Args:
+ trace_id: The ID of the trace to analyze
+
+ Returns:
+ Dict mapping prompt_type to {
+ 'cost': float,
+ 'input_tokens': int,
+ 'output_tokens': int,
+ 'count': int # number of calls
+ }
+ """
+ try:
+ observations = fetch_observations_safe(trace_id=trace_id)
+ prompt_metrics = {}
+
+ for obs in observations:
+ # Identify prompt type
+ prompt_type = identify_prompt_type(obs)
+
+ # Initialize metrics for this prompt type if needed
+ if prompt_type not in prompt_metrics:
+ prompt_metrics[prompt_type] = {
+ "cost": 0.0,
+ "input_tokens": 0,
+ "output_tokens": 0,
+ "count": 0,
+ }
+
+ # Add cost
+ cost = 0.0
+ if (
+ hasattr(obs, "calculated_total_cost")
+ and obs.calculated_total_cost
+ ):
+ cost = obs.calculated_total_cost
+ prompt_metrics[prompt_type]["cost"] += cost
+
+ # Add tokens
+ if hasattr(obs, "usage") and obs.usage:
+ if hasattr(obs.usage, "input") and obs.usage.input:
+ prompt_metrics[prompt_type]["input_tokens"] += (
+ obs.usage.input
+ )
+ if hasattr(obs.usage, "output") and obs.usage.output:
+ prompt_metrics[prompt_type]["output_tokens"] += (
+ obs.usage.output
+ )
+
+ # Increment count
+ prompt_metrics[prompt_type]["count"] += 1
+
+ return prompt_metrics
+ except Exception as e:
+ print(f"[red]Error getting costs by prompt type: {e}[/red]")
+ return {}
+
+
+def get_prompt_type_statistics(trace_id: str) -> Dict[str, Dict[str, Any]]:
+ """Get detailed statistics for each prompt type.
+
+ Args:
+ trace_id: The ID of the trace to analyze
+
+ Returns:
+ Dict mapping prompt_type to {
+ 'cost': float,
+ 'input_tokens': int,
+ 'output_tokens': int,
+ 'count': int,
+ 'avg_cost_per_call': float,
+ 'avg_input_tokens': float,
+ 'avg_output_tokens': float,
+ 'percentage_of_total_cost': float
+ }
+ """
+ try:
+ # Get basic metrics
+ prompt_metrics = get_costs_by_prompt_type(trace_id)
+
+ # Calculate total cost for percentage calculation
+ total_cost = sum(
+ metrics["cost"] for metrics in prompt_metrics.values()
+ )
+
+ # Enhance with statistics
+ enhanced_metrics = {}
+ for prompt_type, metrics in prompt_metrics.items():
+ count = metrics["count"]
+ enhanced_metrics[prompt_type] = {
+ "cost": metrics["cost"],
+ "input_tokens": metrics["input_tokens"],
+ "output_tokens": metrics["output_tokens"],
+ "count": count,
+ "avg_cost_per_call": metrics["cost"] / count
+ if count > 0
+ else 0,
+ "avg_input_tokens": metrics["input_tokens"] / count
+ if count > 0
+ else 0,
+ "avg_output_tokens": metrics["output_tokens"] / count
+ if count > 0
+ else 0,
+ "percentage_of_total_cost": (
+ metrics["cost"] / total_cost * 100
+ )
+ if total_cost > 0
+ else 0,
+ }
+
+ return enhanced_metrics
+ except Exception as e:
+ print(f"[red]Error getting prompt type statistics: {e}[/red]")
+ return {}
+
+
+if __name__ == "__main__":
+ print(
+ "[bold cyan]ZenML Deep Research - Tracing Metadata Utilities Demo[/bold cyan]\n"
+ )
+
+ try:
+ # Fetch recent traces
+ print("[yellow]Fetching recent traces...[/yellow]")
+ traces = fetch_traces_safe(limit=5)
+
+ if not traces:
+ print("[red]No traces found![/red]")
+ exit(1)
+ except ApiError as e:
+ if e.status_code == 429:
+ print("[red]Rate limit exceeded. Please try again later.[/red]")
+ print(
+ "[yellow]Tip: Consider upgrading your Langfuse tier for higher rate limits.[/yellow]"
+ )
+ else:
+ print(f"[red]API Error: {e}[/red]")
+ exit(1)
+ except Exception as e:
+ print(f"[red]Error fetching traces: {e}[/red]")
+ exit(1)
+
+ # Demo 1: Get stats for a single trace
+ print("\n[bold]1. Single Trace Statistics:[/bold]")
+ first_trace = traces[0]
+ stats = get_trace_stats(first_trace)
+
+ console.print(f"Trace ID: [cyan]{stats['trace_id']}[/cyan]")
+ console.print(f"Timestamp: [yellow]{stats['timestamp']}[/yellow]")
+ console.print(f"Total Cost: [green]${stats['total_cost']:.4f}[/green]")
+ console.print(
+ f"Tokens - Input: [blue]{stats['input_tokens']:,}[/blue], Output: [blue]{stats['output_tokens']:,}[/blue]"
+ )
+ console.print(f"Latency: [magenta]{stats['latency_formatted']}[/magenta]")
+ console.print(f"Observations: [white]{stats['observation_count']}[/white]")
+ console.print(
+ f"Models Used: [cyan]{', '.join(stats['models_used'])}[/cyan]"
+ )
+
+ # Demo 2: Get traces from last 7 days
+ print("\n[bold]2. Traces from Last 7 Days:[/bold]")
+ recent_traces = get_traces_last_n_days(7, limit=10)
+ print(
+ f"Found [green]{len(recent_traces)}[/green] traces in the last 7 days"
+ )
+
+ if recent_traces:
+ display_trace_stats_table(recent_traces, "Last 7 Days Traces")
+
+ # Demo 3: Filter traces by date range
+ print("\n[bold]3. Filter Traces by Date Range:[/bold]")
+ end_date = datetime.now(timezone.utc)
+ start_date = end_date - timedelta(days=3)
+
+ filtered_traces = filter_traces_by_date_range(start_date, end_date)
+ print(
+ f"Found [green]{len(filtered_traces)}[/green] traces between {start_date.strftime('%Y-%m-%d')} and {end_date.strftime('%Y-%m-%d')}"
+ )
+
+ # Demo 4: Aggregate statistics
+ print("\n[bold]4. Aggregate Statistics for All Recent Traces:[/bold]")
+ agg_stats = get_aggregate_stats_for_traces(traces)
+
+ table = Table(
+ title="Aggregate Statistics",
+ show_header=True,
+ header_style="bold magenta",
+ )
+ table.add_column("Metric", style="cyan")
+ table.add_column("Value", justify="right", style="yellow")
+
+ table.add_row("Total Traces", str(agg_stats["trace_count"]))
+ table.add_row("Total Cost", f"${agg_stats['total_cost']:.4f}")
+ table.add_row(
+ "Average Cost per Trace", f"${agg_stats['average_cost_per_trace']:.4f}"
+ )
+ table.add_row("Total Input Tokens", f"{agg_stats['total_input_tokens']:,}")
+ table.add_row(
+ "Total Output Tokens", f"{agg_stats['total_output_tokens']:,}"
+ )
+ table.add_row("Total Tokens", f"{agg_stats['total_tokens']:,}")
+ table.add_row(
+ "Average Latency", f"{agg_stats['average_latency_seconds']:.1f}s"
+ )
+ table.add_row("Total Observations", str(agg_stats["total_observations"]))
+
+ console.print(table)
+
+ # Demo 5: Cost breakdown by observation
+ print("\n[bold]5. Cost Breakdown for First Trace:[/bold]")
+ observations = fetch_observations_safe(trace_id=first_trace.id)
+
+ if observations:
+ table = Table(
+ title="Observation Cost Breakdown",
+ show_header=True,
+ header_style="bold magenta",
+ )
+ table.add_column("Observation", style="cyan", no_wrap=True)
+ table.add_column("Model", style="yellow")
+ table.add_column("Tokens (In/Out)", justify="right")
+ table.add_column("Cost", justify="right", style="green")
+
+ for i, obs in enumerate(observations[:5]): # Show first 5
+ cost = 0.0
+ if hasattr(obs, "calculated_total_cost"):
+ cost = obs.calculated_total_cost or 0.0
+
+ in_tokens = 0
+ out_tokens = 0
+ if hasattr(obs, "usage") and obs.usage:
+ in_tokens = obs.usage.input or 0
+ out_tokens = obs.usage.output or 0
+ elif hasattr(obs, "promptTokens"):
+ in_tokens = obs.promptTokens or 0
+ out_tokens = obs.completionTokens or 0
+
+ table.add_row(
+ f"Obs {i + 1}",
+ obs.model if hasattr(obs, "model") else "Unknown",
+ f"{in_tokens:,}/{out_tokens:,}",
+ f"${cost:.4f}",
+ )
+
+ console.print(table)