Author: Mykhailo Levytskyi
Project: agent-systems-eval
Last Updated: January 10, 2026
Purpose: Comprehensive private documentation with all technical details
- Project Overview
- Architecture Deep Dive
- Implementation Details
- Configuration & Setup
- Usage Patterns
- Troubleshooting & Debugging
- Performance Optimization
- Technical Notes
- Future Enhancements
Empirical comparison of two agent architectures for document synthesis:
- Monolithic Agent: Single LLM approach (simple, fast)
- Ensemble Agent: Multi-agent system with recursive orchestration (complex, higher quality)
- Quantify performance differences (latency, cost, quality)
- Demonstrate MLflow experiment tracking capabilities
- Implement LLM-as-a-judge evaluation
- Build production-ready agent systems with proper metrics
- Ensemble shows ~15-25% higher quality scores
- Ensemble takes ~2-3x longer (multiple iterations)
- Recursive orchestration enables adaptive quality control
- Map-reduce pattern essential for handling large documents
Design Philosophy: Simplicity and efficiency
Flow:
1. Map Phase: Sanitize + chunk + summarize each document independently
2. Reduce Phase: Synthesize summaries into final output
Key Components:
_summarize_document_chunk(): Isolated API calls per chunk_map_phase(): Parallel document processing with caching_reduce_phase(): Final synthesis from summaries- Cache location:
data/cache/summaries/
Advantages:
- Fast (single pass)
- Low token usage
- Predictable behavior
- Easy to debug
Limitations:
- No iterative refinement
- No specialized processing stages
- Quality depends on single LLM call
Design Philosophy: Quality through specialization and iteration
Architecture: CrewAI Flows with 4 specialized agents
Agents:
-
Archivist (runs once):
- Role: Document analysis and organization
- Map-reduce: Summarizes each document independently
- Reduce: Consolidates into organized structure
- Output:
organized_infofor downstream agents
-
Drafter (iterative):
- Role: Create synthesis draft
- Input: Organized info + task description + previous critique
- Output: Draft synthesis
-
Critic (iterative):
- Role: Quality assurance and feedback
- Input: Current draft + task requirements
- Output: Detailed critique with improvement suggestions
-
Orchestrator (recursive control):
- Role: Decision-making and iteration control
- Input: Draft + critique
- Output: "continue" or final draft
- Logic: Evaluates if production-ready or needs refinement
Flow Pattern:
Archivist (once)
↓
Drafter → Critic → Orchestrator
↑ ↓
└──── continue ─────┘
OR
final draft (terminate)
CrewAI Flow Implementation Details:
Key Methods:
start_archivist(): Initial kickoff, runs oncerun_drafter(): Creates/refines draftrun_critic(): Provides feedbackrun_orchestrator(): Decides continue/finalizeroute_after_orchestrator(): Router that controls flow
State Management:
class EnsembleState:
organized_info: str # From archivist (immutable)
current_draft: str # Latest draft
current_critique: str # Latest feedback
iteration_count: int # Current iteration
is_production_ready: bool # Orchestrator decision
task_description: str # Original task
run_id: int # Timestamp for cachingCritical Flow Control Pattern:
# Orchestrator returns final draft when ready
if is_production_ready:
return state.current_draft # Flow ends
# Router returns None to terminate
@router(run_orchestrator)
def route_after_orchestrator(self):
if state.is_production_ready:
return None # No more routing = flow ends
return "run_drafter" # Continue iterationWhy This Works (from CrewAI Flows docs):
- "The final output is determined by the last method that completes"
kickoff()returns output of final method- Returning
Nonefrom router = no next method = flow terminates - When orchestrator returns final draft + router returns None = clean termination
Advantages:
- Higher quality through iteration
- Adaptive behavior (orchestrator decides when ready)
- Clear separation of concerns
- Full iteration history logged
Limitations:
- Higher latency (multiple iterations)
- More token usage
- More complex to debug
- Requires CrewAI Flows
Both agents use map-reduce for document processing:
Map Phase (_map_phase or _preprocess_documents_for_archivist):
1. Sanitize document (remove references, bibliographies, appendices)
2. Chunk if > max_tokens (default: 16000 tokens)
3. For each chunk:
- Generate comprehensive summary
- Cache to disk (JSON)
4. Return: (summaries, metadata, metrics)Reduce Phase (_reduce_phase or _reduce_summaries):
1. Combine all document summaries
2. Synthesize into coherent organization
3. Return: final organized outputSanitization (utils.sanitize_document):
# Remove ~20% of tokens with zero semantic value
- References/Bibliography sections
- Appendices
- Standalone reference entries [1], [2], etc.Chunking (utils.chunk_document):
# Split at paragraph boundaries
# Target: max_tokens per chunk (default 16000)
# Validation: ensure chunks don't exceed 1.2x max_tokensCaching (utils.process_documents_with_cache):
# Cache location: data/cache/summaries/ or ensemble_summaries/
# Cache format: JSON with summary + metrics + metadata
# Cache key: MD5 hash of sanitized document
# Resume: load from cache if hash matchesFactory Pattern (llm/factory.py):
def create_llm_client(provider: str) -> LLMClient:
if provider == "ollama":
return OllamaClient(...)
elif provider == "gemini":
return GeminiClient(...)Base Interface (llm/base.py):
class LLMClient(ABC):
@abstractmethod
def generate(self, system_prompt: str, user_prompt: str) -> dict:
"""Returns: {"text": str, "usage": {...}}"""Implementations:
llm/ollama.py: Ollama client with OpenAI-compatible APIllm/gemini.py: Google Gemini client
Usage:
client = create_llm_client("ollama")
result = client.generate(system_prompt, user_prompt)
text = result["text"]
tokens = result["usage"]["total_tokens"]Purpose: Avoid hitting API quotas (primarily for Gemini free tier)
Implementation:
class RequestRateLimiter:
def __init__(self, max_per_minute=10, max_per_day=20):
self._recent_calls: deque[float] = deque()
self._day_count = 0
def acquire(self):
# Blocks until slot available
# Raises if daily limit reachedUsage:
rate_limiter = RequestRateLimiter(max_per_minute=10, max_per_day=50)
# Before each API call:
rate_limiter.acquire()Note: Only needed for remote providers with strict limits. Ollama has no limits.
Experiments:
document_synthesis_monolithic: Monolithic agent runsdocument_synthesis_ensemble: Ensemble agent runs
Logged Metrics:
# Process metrics
mlflow.log_metric("latency_seconds", ...)
mlflow.log_metric("total_tokens", ...)
mlflow.log_metric("estimated_cost_usd", ...)
mlflow.log_metric("num_iterations", ...) # Ensemble only
# Quality metrics (LLM-as-a-judge)
mlflow.log_metric("groundedness_score", ...)
mlflow.log_metric("instruction_adherence_score", ...)
mlflow.log_metric("completeness_score", ...)
# NLP metrics
mlflow.log_metric("bertscore_f1", ...)
mlflow.log_metric("rouge1_fmeasure", ...)Logged Artifacts:
mlflow.log_text(final_synthesis, "synthesis.md")
mlflow.log_dict(agent_metrics, "metrics.json")
# Ensemble only:
mlflow.log_text(iteration_history, "iterations.md")Judge Configuration (evaluate.py):
# Uses MLflow GenAI make_judge
groundedness_judge = make_judge(
name="groundedness",
model="openai:/qwen2.5:7b", # Via Ollama OpenAI-compat
instructions="...", # Detailed grading criteria
)
# Evaluation
results = evaluate_with_mlflow_judges(
task_description=task,
synthesis=output,
context=context,
judge_model="openai:/qwen2.5:7b",
)NLP Metrics (evaluate.py):
def compute_nlp_metrics(reference: str, hypothesis: str):
# BERTScore
bert_scores = bert_score.score([hypothesis], [reference], ...)
# ROUGE
rouge_scorer = RougeScorer(['rouge1', 'rougeL'], ...)
rouge_scores = rouge_scorer.score(reference, hypothesis)
return {
"bertscore_precision": ...,
"bertscore_recall": ...,
"bertscore_f1": ...,
"rouge1_fmeasure": ...,
"rougeL_fmeasure": ...,
}Key Functions:
setup_logging(name: str) -> Logger
sanitize_document(doc: str) -> str
estimate_tokens(text: str) -> int # tiktoken cl100k_base
chunk_document(doc: str, max_tokens: int) -> List[str]
load_source_documents(doc_dir: str) -> List[str] # PDF + txt
process_documents_with_cache(...) -> (summaries, metadata, metrics)Document Loading:
- Supports PDF (via PyPDF2) and text files
- Extracts text from all pages
- Returns list of document strings
Caching Strategy:
- Hash sanitized document → MD5
- Cache file:
{cache_dir}/doc_{idx}_summary.json - Resume: If hash matches, load from cache
- Invalidation: Manual (delete cache files)
Required:
# LLM Provider Selection
LLM_PROVIDER=ollama # or gemini
# Ollama Configuration
OLLAMA_BASE_URL=http://localhost:11434
OLLAMA_MODEL=qwen2.5:7b
OLLAMA_NUM_CTX=32768 # Context window
MAX_OUTPUT_TOKENS=4000
# MLflow Judges (via Ollama OpenAI-compat)
JUDGE_MODEL=openai:/qwen2.5:7b
OPENAI_BASE_URL=http://localhost:11434/v1
OPENAI_API_KEY=ollama # Dummy value for compat
# CrewAI Ensemble
CREWAI_MODEL=openai/qwen2.5:7b
MAX_ITERATIONS=5
TIMEOUT_SECONDS=1800 # 30 minutesOptional:
# Rate limiting (for remote providers)
MAX_RPM=10 # Max requests per minute (0 = disabled)
MAX_RPD=50 # Max requests per day (0 = disabled)
# Google Gemini (if LLM_PROVIDER=gemini)
GEMINI_API_KEY=your_key_here
GEMINI_MODEL=gemini-2.5-proInstallation:
# Download from ollama.com
curl -fsSL https://ollama.com/install.sh | sh
# Pull model
ollama pull qwen2.5:7bContext Window Configuration:
# Critical: Set context window to 32k tokens
export OLLAMA_NUM_CTX=32768
# Start Ollama
ollama serveVerify:
curl http://localhost:11434/api/generate -d '{
"model": "qwen2.5:7b",
"prompt": "Test",
"options": {"num_ctx": 32768}
}'Note: The implementation passes num_ctx: 32768 in every API call to override server defaults.
Core:
python >= 3.10
crewai >= 0.20.0 # Flows support
mlflow >= 2.10.0
google-generativeai >= 0.3.0 # If using Gemini
python-dotenv
NLP Metrics:
bert-score
rouge-score
tiktoken
Utilities:
PyPDF2 # PDF parsing
requests
Install:
pip install -r requirements.txtPurpose: Verify setup works with minimal API usage
python evaluate.py --testWhat it does:
- Uses only 1 paper (paper_1.pdf)
- Runs only first task
- Completes in ~5-10 minutes
- Tests both agents
- Logs to MLflow
Expected output:
Processing 1 documents...
Running task 1/1: Write executive summary...
Monolithic agent: 8.3s, 2,451 tokens
Ensemble agent: 23.7s, 6,892 tokens, 3 iterations
EVALUATION COMPLETE
Purpose: Complete comparison across all tasks
python evaluate.pyWhat it does:
- Processes all 10 papers
- Runs all 3 tasks
- Takes 1-2 hours
- Generates 6 MLflow runs (2 agents × 3 tasks)
Expected output:
Processing 10 documents...
Running task 1/3...
Monolithic: 45.2s, 18,234 tokens
Ensemble: 127.8s, 52,891 tokens, 4 iterations
Running task 2/3...
...
EVALUATION COMPLETE
Monolithic:
python monolithic.pyEnsemble:
python ensemble.pyOutput: Demonstrates agent on sample task without MLflow tracking
1. Add Documents:
cp your_paper.pdf data/source_documents/2. Define Tasks (data/tasks/synthesis_tasks.json):
{
"task_id": "custom_1",
"task_description": "Synthesize methodology sections...",
"expected_elements": [
"Research design overview",
"Data collection methods",
"Analysis approach"
]
}3. Run:
python evaluate.pyStart MLflow UI:
mlflow ui
# Open http://localhost:5000Navigate:
- Experiments → Select experiment
- Compare runs across agents
- View metrics table
- Download artifacts
Useful Views:
- Parallel Coordinates: Compare metrics across runs
- Scatter Plot: latency vs quality
- Table View: Sort by metric
Symptom: Ensemble runs forever, logs show repeated iterations
Root Cause: Router pattern issue (fixed in FLOW_FIX_SUMMARY.md)
Fix Applied:
# Orchestrator returns final draft when ready
if is_production_ready:
return state.current_draft # Not "finalize"
# Router returns None to terminate
@router(run_orchestrator)
def route_after_orchestrator(self):
if state.is_production_ready:
return None # Not "finalize" label
return "run_drafter"Verify:
grep "production-ready\|Max iterations" test_output.log
# Should see termination after approvalSymptom: context length exceeded errors
Fix:
# Set in .env
OLLAMA_NUM_CTX=32768
# Restart Ollama
pkill ollama
ollama serveVerify:
# Implementation passes this in every call:
options = {"num_ctx": 32768}Symptom: Re-processing documents on every run
Debug:
ls -la data/cache/summaries/
# Should see doc_N_summary.json files
# Check hash calculation
python -c "
import hashlib
from utils import sanitize_document
doc = open('data/source_documents/paper_1.pdf', 'rb').read()
# ... (hash calculation)
"Fix: Ensure cache directory exists and is writable
Symptom: Invalid API key or judge evaluation fails
Fix:
# Ensure OpenAI-compat endpoint is configured
OPENAI_BASE_URL=http://localhost:11434/v1
OPENAI_API_KEY=ollama # Any non-empty value works
# Verify Ollama is running
curl http://localhost:11434/api/tagsSymptom: No module named 'crewai' or version mismatch
Fix:
pip install --upgrade crewai>=0.20.0
# CrewAI Flows requires v0.20.0+Enable Debug Logging:
import logging
logging.basicConfig(level=logging.DEBUG)Monitor MLflow:
# Watch MLflow runs directory
watch -n 1 "ls -la mlruns/*/meta.yaml"Check CrewAI Flow State:
# In ensemble.py, add:
logger.info(f"State: {state.__dict__}")Validate Documents:
from utils import load_source_documents, estimate_tokens
docs = load_source_documents("data/source_documents")
for i, doc in enumerate(docs, 1):
print(f"Doc {i}: {estimate_tokens(doc)} tokens")1. Document Sanitization (saves ~20% tokens):
sanitized = sanitize_document(raw_doc)
# Removes: references, bibliographies, appendices2. Chunking Strategy:
# Default: 16000 tokens per chunk
# Reduces memory usage, enables parallel processing
chunks = chunk_document(doc, max_tokens=16000)3. Cache Summaries:
# Avoid re-summarizing same documents
# Cache hit = instant load
# Cache miss = summarize + save1. Parallel Document Processing:
# Map phase processes each document independently
# Can be parallelized with ThreadPoolExecutor
# (Not currently implemented - future enhancement)2. Reduce Iterations:
# Ensemble: Set lower max_iterations
MAX_ITERATIONS=3 # Instead of 53. Skip Ensemble for Simple Tasks:
# Use monolithic for straightforward tasks
# Reserve ensemble for complex synthesis1. Use Rate Limiter:
rate_limiter = RequestRateLimiter(
max_per_minute=10, # Free tier limit
max_per_day=50,
)2. Enable Caching:
# Never re-process same documents
# Saves ~80% of API calls on reruns3. Test Mode First:
python evaluate.py --test
# Test with 1 paper before full run1. GPU Acceleration:
# Ensure CUDA available
nvidia-smi
# Ollama will auto-detect and use GPU2. Model Selection:
# Faster models for development:
ollama pull qwen2.5:3b # Smaller, faster
# Higher quality for production:
ollama pull qwen2.5:14b # Larger, slower3. Concurrent Requests:
# Ollama can handle multiple requests
# Limited by available VRAM1. Terminal Methods Return Final Output:
# When ready to end flow, return the final result
if ready_to_finish:
return final_output # Not a label/string2. Routers Signal Termination with None:
@router(some_method)
def route_next(self):
if should_stop:
return None # No next method = flow ends
return "next_method_name"3. State Management:
# Use shared state object, not instance variables
state.field = value # Not self.field
# State persists across all methods in flow4. No Explicit Finalize Needed:
# DON'T do this:
@listen("finalize")
def finalize_output(self):
return state.result
# Instead: Return final output from terminal method5. Last Method Determines Output:
# CrewAI Flow.kickoff() returns output of last method
# Plan your flow so terminal method has final output1. Isolated Map Operations:
# Each document processed independently
# No shared state between map calls
# Enables caching and parallel processing2. Reduce Consolidation:
# Combine map results into coherent output
# This is where synthesis happens3. Chunk Validation:
# Always validate chunk size
# Chunks can exceed max_tokens due to paragraph boundaries
# Add 20% buffer for safety
if estimate_tokens(chunk) > max_tokens * 1.2:
# Re-chunk more aggressively1. Detailed Instructions:
# Provide clear grading criteria
# Include examples of each score level
# Define edge cases2. Reference vs Reference-Free:
# Reference-free: Judge quality without ground truth
# Reference-based: Compare to reference output
# This project uses reference-free3. Score Normalization:
def _score_value_to_float(value):
# Handle various response formats
# Convert "yes"/"no" to 1.0/0.0
# Convert "fully"/"partially"/"not" to 1.0/0.5/0.01. Experiment Organization:
# Separate experiments per agent type
# Enables clean comparison
mlflow.set_experiment("document_synthesis_monolithic")2. Run Naming:
# Descriptive run names
mlflow.start_run(run_name=f"{agent_type}_{task_id}")3. Artifact Logging:
# Log all intermediate outputs
# Enables debugging and analysis
mlflow.log_text(draft, "iteration_N_draft.md")4. Metric Consistency:
# Use same metric names across runs
# Enables comparison in UI
mlflow.log_metric("latency_seconds", ...)1. Parallel Document Processing:
# Use ThreadPoolExecutor for map phase
# 5-10x speedup for multi-document synthesis
from concurrent.futures import ThreadPoolExecutor2. Adaptive Chunking:
# Smart chunking based on document structure
# Respect section boundaries, not just paragraphs3. Streaming Output:
# Stream ensemble iterations to user
# Provide real-time feedback during synthesis4. Custom Judge Models:
# Train domain-specific judges
# Fine-tune on expert evaluations5. Multi-Modal Support:
# Support images, tables, charts in PDFs
# Extract and describe visual elements6. Agent Comparison Dashboard:
# Custom Streamlit/Gradio UI
# Interactive comparison and visualization7. A/B Testing Framework:
# Automated A/B tests for prompt variations
# Statistical significance testing8. Cost Prediction:
# Estimate cost before running
# Token usage prediction based on document size9. Export to Production Formats:
# Generate LaTeX, DOCX, HTML from synthesis
# Professional formatting templatesagent-systems-eval/
├── README.md # Public documentation (streamlined)
├── MASTER_README.md # This file (private, comprehensive)
├── requirements.txt # Python dependencies
├── .env.example # Environment template
├── .env # Your config (gitignored)
├── .gitignore # Git ignore rules
│
├── monolithic.py # Monolithic agent
├── ensemble.py # Ensemble agent (CrewAI Flows)
├── evaluate.py # MLflow evaluation framework
├── utils.py # Shared utilities
├── rate_limits.py # Rate limiter for API calls
│
├── llm/ # LLM client abstraction
│ ├── __init__.py
│ ├── base.py # Abstract interface
│ ├── factory.py # Client factory
│ ├── ollama.py # Ollama implementation
│ ├── gemini.py # Gemini implementation
│ └── types.py # Type definitions
│
├── data/
│ ├── source_documents/ # Input PDFs
│ ├── tasks/ # Task definitions (JSON)
│ ├── cache/ # Cached summaries
│ │ ├── summaries/ # Monolithic cache
│ │ └── ensemble_summaries/ # Ensemble cache
│ └── drafts/ # Iteration history (ensemble)
│
└── mlruns/ # MLflow tracking data
├── 0/ # Default experiment
├── {experiment_id}/ # Per-experiment runs
└── models/ # Registered models (unused)
| Variable | Default | Purpose |
|---|---|---|
LLM_PROVIDER |
ollama |
LLM provider selection |
OLLAMA_BASE_URL |
http://localhost:11434 |
Ollama API endpoint |
OLLAMA_MODEL |
qwen2.5:7b |
Ollama model name |
OLLAMA_NUM_CTX |
32768 |
Context window (tokens) |
MAX_OUTPUT_TOKENS |
4000 |
Max output length |
JUDGE_MODEL |
openai:/qwen2.5:7b |
MLflow judge model |
OPENAI_BASE_URL |
http://localhost:11434/v1 |
OpenAI-compat endpoint |
OPENAI_API_KEY |
ollama |
Dummy key for compat |
CREWAI_MODEL |
openai/qwen2.5:7b |
CrewAI model ID |
MAX_ITERATIONS |
5 |
Max ensemble iterations |
TIMEOUT_SECONDS |
1800 |
Max synthesis time (30m) |
MAX_RPM |
0 |
Rate limit (req/min) |
MAX_RPD |
0 |
Rate limit (req/day) |
Process Metrics:
latency_seconds: Total wall-clock timetotal_tokens: Sum of prompt + completion tokensprompt_tokens: Input tokenscompletion_tokens: Output tokensnum_api_calls: Count of LLM API callsestimated_cost_usd:0.0for Ollama, estimated for Gemininum_iterations: Ensemble only, iteration count
Quality Metrics (LLM Judge, 0-5 scale):
groundedness_score: Claims traceable to contextinstruction_adherence_score: Follows task requirementscompleteness_score: Addresses all expected elements
NLP Metrics (0-1 scale):
bertscore_precision: Semantic precisionbertscore_recall: Semantic recallbertscore_f1: Harmonic meanrouge1_fmeasure: Unigram overlaprougeL_fmeasure: Longest common subsequence
Setup:
pip install -r requirements.txt
cp .env.example .env
ollama pull qwen2.5:7bRun:
python evaluate.py --test # Quick test
python evaluate.py # Full evaluation
mlflow ui # View resultsDebug:
python -m pytest test_system.py # Run tests
python monolithic.py # Test monolithic
python ensemble.py # Test ensembleClean:
rm -rf data/cache/summaries/*
rm -rf data/cache/ensemble_summaries/*
rm -rf data/drafts/*
rm -rf mlruns/*Use Monolithic When:
- ✅ Simple synthesis task
- ✅ Need fast results
- ✅ Limited API budget
- ✅ Prototype/testing
Use Ensemble When:
- ✅ Complex synthesis requiring refinement
- ✅ Quality is top priority
- ✅ Need iteration transparency
- ✅ Can afford higher latency/cost
Use Test Mode When:
- ✅ First run / setup verification
- ✅ Testing prompts
- ✅ Debugging
- ✅ Quick experiments
Use Full Evaluation When:
- ✅ Production comparison
- ✅ Complete metrics needed
- ✅ Publication/reporting
- ✅ Final validation
2026-01-10:
- Created comprehensive master documentation
- Consolidated QUICKSTART.md, USAGE.md, IMPLEMENTATION.md
- Added CrewAI Flow technical details
- Documented infinite loop fix
- Added troubleshooting section
- Added performance optimization notes
2026-01-08:
- Fixed CrewAI Flow infinite loop issue
- Documented fix in FLOW_FIX_SUMMARY.md
Previous:
- Initial implementation of monolithic and ensemble agents
- MLflow integration
- LLM-as-a-judge evaluation
- Map-reduce pattern with caching
End of Master Documentation