This guide covers patterns and strategies for deploying ML and LLM systems to production.
flowchart TD
subgraph Strategies["Deployment Strategies"]
Direct["Direct Deployment"]
BG["Blue-Green"]
Canary["Canary"]
Shadow["Shadow"]
AB["A/B Testing"]
end
subgraph Risk["Risk Level"]
Direct --> |"High Risk"| R1["Fast, no safety net"]
BG --> |"Medium Risk"| R2["Instant rollback"]
Canary --> |"Low Risk"| R3["Gradual rollout"]
Shadow --> |"No Risk"| R4["No user impact"]
AB --> |"Controlled"| R5["Statistical validation"]
end
style Direct fill:#ffcdd2
style BG fill:#fff9c4
style Canary fill:#c8e6c9
style Shadow fill:#e3f2fd
style AB fill:#f3e5f5
Deploy new version directly to production.
flowchart LR
subgraph Before
V1["Version 1 (100%)"]
end
subgraph After
V2["Version 2 (100%)"]
end
Before --> |"Deploy"| After
style V1 fill:#ffcdd2
style V2 fill:#c8e6c9
When to use:
- Low-risk changes
- Bug fixes
- Small improvements
Implementation:
# Simple deployment script
def deploy_direct(model_version: str):
# Stop old service
stop_service("ml-service")
# Update model
update_model_symlink(model_version)
# Start new service
start_service("ml-service")
# Verify health
if not health_check("ml-service"):
rollback(previous_version)Run two identical environments, switch traffic instantly.
flowchart TD
subgraph Before["Before Switch"]
Traffic1["Traffic 100%"] --> Blue1["Blue (v1) - Active"]
Green1["Green (v2) - Standby"]
end
subgraph After["After Switch"]
Traffic2["Traffic 100%"] --> Green2["Green (v2) - Active"]
Blue2["Blue (v1) - Standby"]
end
Before --> |"Instant Switch"| After
style Blue1 fill:#2196f3
style Green1 fill:#4caf50
style Blue2 fill:#2196f3
style Green2 fill:#4caf50
When to use:
- Need instant rollback capability
- Zero-downtime deployments
- Major version changes
Implementation:
class BlueGreenDeployer:
def __init__(self, load_balancer):
self.lb = load_balancer
def deploy(self, new_version: str):
# Determine current and target environments
current = self.lb.get_active_environment() # "blue" or "green"
target = "green" if current == "blue" else "blue"
# Deploy to inactive environment
self.deploy_to_environment(target, new_version)
# Run health checks
if not self.health_check(target):
raise DeploymentError(f"Health check failed for {target}")
# Run smoke tests
if not self.smoke_test(target):
raise DeploymentError(f"Smoke test failed for {target}")
# Switch traffic
self.lb.switch_to(target)
# Keep old environment for rollback
return {"active": target, "standby": current}
def rollback(self):
current = self.lb.get_active_environment()
standby = "green" if current == "blue" else "blue"
self.lb.switch_to(standby)Gradually roll out to a subset of users.
flowchart LR
Traffic["Traffic"] --> Split{"Load Balancer"}
Split --> |"95%"| Stable["Stable v1"]
Split --> |"5%"| Canary["Canary v2"]
subgraph Progression["Canary Progression"]
P1["5%"] --> P2["25%"]
P2 --> P3["50%"]
P3 --> P4["100%"]
end
style Stable fill:#2196f3
style Canary fill:#ff9800
When to use:
- High-risk changes
- New model versions
- Testing in production
Implementation:
class CanaryDeployer:
def __init__(self, load_balancer, metrics):
self.lb = load_balancer
self.metrics = metrics
def deploy_canary(self, new_version: str, stages: list = None):
stages = stages or [5, 25, 50, 100] # Percentage stages
# Deploy canary
self.deploy_to_environment("canary", new_version)
for percentage in stages:
# Update traffic split
self.lb.set_canary_percentage(percentage)
# Wait and observe
time.sleep(300) # 5 minutes
# Check metrics
if not self.check_canary_health():
self.rollback_canary()
raise DeploymentError(f"Canary failed at {percentage}%")
print(f"Canary at {percentage}% - metrics healthy")
# Promote canary to stable
self.promote_canary()
def check_canary_health(self) -> bool:
stable_metrics = self.metrics.get("stable")
canary_metrics = self.metrics.get("canary")
# Compare error rates
if canary_metrics["error_rate"] > stable_metrics["error_rate"] * 1.5:
return False
# Compare latency
if canary_metrics["p95_latency"] > stable_metrics["p95_latency"] * 1.2:
return False
return TrueRun new version in parallel without affecting users.
flowchart LR
Traffic["Traffic"] --> Fork{"Fork Request"}
Fork --> |"Primary"| Stable["Stable v1"]
Fork --> |"Copy"| Shadow["Shadow v2"]
Stable --> Response["Response to User"]
Shadow --> Log["Log Only (Compare)"]
style Stable fill:#2196f3
style Shadow fill:#9e9e9e
style Response fill:#c8e6c9
style Log fill:#fff9c4
When to use:
- Testing new models safely
- Comparing model performance
- Validating before canary
Implementation:
class ShadowDeployer:
def __init__(self, stable_service, shadow_service, logger):
self.stable = stable_service
self.shadow = shadow_service
self.logger = logger
async def handle_request(self, request):
# Run both in parallel
stable_task = asyncio.create_task(
self.stable.process(request)
)
shadow_task = asyncio.create_task(
self.shadow.process(request)
)
# Wait for stable (this is what we return)
stable_response = await stable_task
# Log shadow result (don't wait if slow)
try:
shadow_response = await asyncio.wait_for(
shadow_task, timeout=5.0
)
self.compare_and_log(request, stable_response, shadow_response)
except asyncio.TimeoutError:
self.logger.warning("Shadow request timed out")
return stable_response
def compare_and_log(self, request, stable, shadow):
self.logger.info({
"request": request,
"stable_response": stable,
"shadow_response": shadow,
"match": stable == shadow,
"stable_latency": stable.latency,
"shadow_latency": shadow.latency
})Split traffic to compare versions with statistical rigor.
flowchart TD
Traffic["Traffic"] --> Hash{"User Hash"}
Hash --> |"50%"| Control["Control A"]
Hash --> |"50%"| Treatment["Treatment B"]
Control --> MetricsA["Collect Metrics"]
Treatment --> MetricsB["Collect Metrics"]
MetricsA --> Analysis["Statistical Analysis"]
MetricsB --> Analysis
Analysis --> Decision{"Significant Difference?"}
Decision --> |"Yes"| Winner["Deploy Winner"]
Decision --> |"No"| Continue["Continue Test"]
style Control fill:#2196f3
style Treatment fill:#ff9800
style Winner fill:#c8e6c9
When to use:
- Measuring business impact
- Comparing different approaches
- Data-driven decisions
Implementation:
class ABTestManager:
def __init__(self, config_store, metrics_store):
self.config = config_store
self.metrics = metrics_store
def create_experiment(self, name: str, variants: list,
traffic_split: dict):
experiment = {
"name": name,
"variants": variants,
"traffic_split": traffic_split,
"status": "running",
"created_at": datetime.utcnow().isoformat()
}
self.config.save(f"experiments/{name}", experiment)
return experiment
def get_variant(self, experiment_name: str, user_id: str) -> str:
experiment = self.config.get(f"experiments/{experiment_name}")
# Consistent hashing for user assignment
hash_value = hash(f"{experiment_name}:{user_id}") % 100
cumulative = 0
for variant, percentage in experiment["traffic_split"].items():
cumulative += percentage
if hash_value < cumulative:
return variant
return list(experiment["variants"])[0] # Default
def record_metric(self, experiment_name: str, variant: str,
metric_name: str, value: float):
self.metrics.record({
"experiment": experiment_name,
"variant": variant,
"metric": metric_name,
"value": value,
"timestamp": datetime.utcnow().isoformat()
})
def analyze_results(self, experiment_name: str) -> dict:
data = self.metrics.get_experiment_data(experiment_name)
# Statistical analysis
results = {}
for metric in data["metrics"]:
control = data["control"][metric]
treatment = data["treatment"][metric]
# T-test
t_stat, p_value = stats.ttest_ind(control, treatment)
results[metric] = {
"control_mean": np.mean(control),
"treatment_mean": np.mean(treatment),
"lift": (np.mean(treatment) - np.mean(control)) / np.mean(control),
"p_value": p_value,
"significant": p_value < 0.05
}
return resultsfrom fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
class LLMGateway:
def __init__(self):
self.providers = {
"openai": OpenAIProvider(),
"anthropic": AnthropicProvider(),
"local": LocalModelProvider()
}
self.router = RequestRouter()
self.rate_limiter = RateLimiter()
self.cache = ResponseCache()
async def handle_request(self, request: LLMRequest) -> LLMResponse:
# Rate limiting
if not await self.rate_limiter.allow(request.user_id):
raise RateLimitExceeded()
# Check cache
cached = await self.cache.get(request)
if cached:
return cached
# Route to appropriate provider
provider = self.router.select_provider(request)
# Make request with retry
response = await self.make_request_with_retry(provider, request)
# Cache response
await self.cache.set(request, response)
return response
async def make_request_with_retry(self, provider, request, max_retries=3):
for attempt in range(max_retries):
try:
return await provider.generate(request)
except RateLimitError:
# Try fallback provider
provider = self.router.get_fallback(provider)
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)class ProductionPromptManager:
def __init__(self, storage, cache):
self.storage = storage
self.cache = cache
def get_prompt(self, name: str, version: str = "production") -> str:
# Check cache
cache_key = f"prompt:{name}:{version}"
cached = self.cache.get(cache_key)
if cached:
return cached
# Load from storage
if version == "production":
prompt = self.storage.get_production_prompt(name)
else:
prompt = self.storage.get_prompt(name, version)
# Cache for 5 minutes
self.cache.set(cache_key, prompt, ttl=300)
return prompt
def deploy_prompt(self, name: str, version: str):
# Validate prompt exists
prompt = self.storage.get_prompt(name, version)
if not prompt:
raise ValueError(f"Prompt {name}:{version} not found")
# Run evaluation
eval_results = self.evaluate_prompt(name, version)
if eval_results["score"] < 0.8:
raise ValueError(f"Prompt failed evaluation: {eval_results}")
# Deploy
self.storage.set_production_prompt(name, version)
# Invalidate cache
self.cache.delete(f"prompt:{name}:production")
return {"deployed": True, "evaluation": eval_results}class AutoRollback:
def __init__(self, deployer, metrics, thresholds):
self.deployer = deployer
self.metrics = metrics
self.thresholds = thresholds
async def monitor_deployment(self, deployment_id: str):
start_time = time.time()
observation_period = 600 # 10 minutes
while time.time() - start_time < observation_period:
current_metrics = await self.metrics.get_current()
# Check error rate
if current_metrics["error_rate"] > self.thresholds["max_error_rate"]:
await self.trigger_rollback(deployment_id, "High error rate")
return
# Check latency
if current_metrics["p95_latency"] > self.thresholds["max_latency"]:
await self.trigger_rollback(deployment_id, "High latency")
return
# Check quality (for LLM)
if current_metrics.get("quality_score", 1) < self.thresholds.get("min_quality", 0):
await self.trigger_rollback(deployment_id, "Low quality")
return
await asyncio.sleep(30)
print(f"Deployment {deployment_id} stable")
async def trigger_rollback(self, deployment_id: str, reason: str):
print(f"Rolling back {deployment_id}: {reason}")
await self.deployer.rollback(deployment_id)
await self.alert(f"Auto-rollback triggered: {reason}")class RollbackManager:
def __init__(self, deployer, version_store):
self.deployer = deployer
self.versions = version_store
def get_rollback_candidates(self, service: str, limit: int = 5) -> list:
"""Get recent stable versions for rollback."""
return self.versions.get_recent_stable(service, limit)
def rollback(self, service: str, target_version: str = None):
"""Rollback to specific version or previous stable."""
if target_version is None:
target_version = self.versions.get_previous_stable(service)
# Validate target version exists
if not self.versions.exists(service, target_version):
raise ValueError(f"Version {target_version} not found")
# Perform rollback
self.deployer.deploy(service, target_version)
# Mark current as unstable
current = self.versions.get_current(service)
self.versions.mark_unstable(service, current)
return {
"rolled_back_from": current,
"rolled_back_to": target_version
}Before deploying:
- All tests passing
- Evaluation metrics meet thresholds
- Rollback plan documented
- Monitoring dashboards ready
- Alerts configured
- Team notified
During deployment:
- Monitor error rates
- Monitor latency
- Monitor quality metrics
- Watch for anomalies
- Be ready to rollback
After deployment:
- Verify all health checks pass
- Confirm metrics are stable
- Document any issues
- Update runbooks if needed
- Implement blue-green: Set up blue-green deployment for your service
- Add canary releases: Implement gradual rollout with metrics checks
- Build shadow testing: Run new models in shadow mode
- Create rollback automation: Implement automatic rollback on metric degradation