EventFlow is a production-grade event-driven backend system built with Python, providing real-time visibility into user actions and system events. It demonstrates modern event streaming architecture with support for high-throughput message processing, reliable delivery, and comprehensive observability.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β EventFlow System β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββββββββββ
β β β
βΌ βΌ βΌ
βββββββββββββββββββββββ βββββββββββββββββββββββ βββββββββββββββββββββββ
β Event Producers β β Message Broker β β Event Consumers β
β ββββββββββββββββ β β ββββββββββββββ β β ββββββββββββββββ β
β β’ REST API β β β’ Redis Streams β β β’ Event Workers β
β β’ Batch Processing β ββββββββββΆ β β’ Consumer Groups β ββββββββββΆ β β’ Retry Handler β
β β’ Event Factory β β β’ Message Ordering β β β’ DLQ Handler β
β β’ CloudEvents Spec β β β’ Persistence β β β’ Webhook Delivery β
βββββββββββββββββββββββ βββββββββββββββββββββββ βββββββββββββββββββββββ
β β β
β β β
ββββββββββββββββββββ¬βββββββββββββββββββββββ΄βββββββββββββββββββ¬βββββββββββββββββββββββ
β β
βΌ βΌ
βββββββββββββββββββββββ βββββββββββββββββββββββ
β Persistence Layer β β Observability β
β βββββββββββββββββ β β ββββββββββββ β
β β’ PostgreSQL β β β’ Structured Logs β
β β’ Event Store β β β’ Prometheus Metricsβ
β β’ Query Patterns β β β’ Health Checks β
β β’ Data Retention β β β’ Grafana Dashboardsβ
βββββββββββββββββββββββ βββββββββββββββββββββββ
- CloudEvents-compliant schema design for interoperability
- Multiple event types: User, System, Business, Analytics events
- Event batching with automatic flushing for optimized throughput
- Producer pool for high-concurrency scenarios
- Redis Streams for durable, ordered message delivery
- Consumer groups for horizontal scaling
- Message acknowledgment with automatic retry
- Dead Letter Queue (DLQ) for failed message handling
- Exponential backoff retry mechanism with jitter
- Circuit breaker pattern for fault tolerance
- Graceful shutdown with in-flight message completion
- Idempotent processing via unique event IDs
- Structured logging with JSON output (structlog)
- Prometheus metrics for real-time monitoring
- Health check endpoints for container orchestration
- Request tracing with correlation IDs
- RESTful endpoints built with FastAPI
- Auto-generated OpenAPI documentation
- Event replay capabilities
- Subscription management for webhooks
EventFlow/
βββ src/eventflow/
β βββ __init__.py # Package initialization
β βββ app.py # FastAPI application entry
β βββ cli.py # Command-line interface
β βββ config.py # Configuration management
β β
β βββ api/ # REST API layer
β β βββ dependencies.py # FastAPI dependencies
β β βββ schemas.py # Request/response models
β β βββ routes/
β β βββ events.py # Event CRUD endpoints
β β βββ health.py # Health check endpoints
β β βββ dlq.py # Dead letter queue management
β β βββ subscriptions.py # Webhook subscriptions
β β βββ metrics.py # Prometheus metrics endpoint
β β
β βββ core/ # Core business logic
β β βββ broker.py # Redis Streams broker
β β βββ producer.py # Event producer with batching
β β
β βββ consumers/ # Event processing
β β βββ consumer.py # Event consumer with retry
β β βββ worker.py # Background workers
β β
β βββ models/ # Database models
β β βββ database.py # SQLAlchemy models
β β
β βββ persistence/ # Data access layer
β β βββ database.py # Database connection
β β βββ repository.py # Repository pattern
β β
β βββ schemas/ # Domain schemas
β β βββ events.py # CloudEvents schemas
β β
β βββ observability/ # Monitoring & logging
β βββ logging.py # Structured logging
β βββ metrics.py # Prometheus metrics
β βββ retry.py # Retry & circuit breaker
β
βββ tests/ # Test suite
β βββ conftest.py # Pytest fixtures
β βββ test_api.py # API integration tests
β βββ test_schemas.py # Schema validation tests
β βββ test_retry.py # Retry mechanism tests
β
βββ scripts/
β βββ generate_demo_data.py # Demo data generator
β
βββ docker/ # Docker configuration
β βββ prometheus.yml # Prometheus config
β βββ grafana/ # Grafana provisioning
β
βββ Dockerfile # API server image
βββ Dockerfile.worker # Worker image
βββ docker-compose.yml # Full stack orchestration
βββ pyproject.toml # Project configuration
βββ .env.example # Environment template
βββ README.md # This file
- Python 3.11+
- Docker & Docker Compose
- Redis 7.0+
- PostgreSQL 15+
git clone https://github.com/yourusername/eventflow.git
cd eventflow
# Create environment file
cp .env.example .env# Start all services (API, Worker, PostgreSQL, Redis, Monitoring)
docker-compose up -d
# View logs
docker-compose logs -f eventflow-api
# Check health
curl http://localhost:8000/health| Service | URL | Credentials |
|---|---|---|
| EventFlow API | http://localhost:8000 | - |
| API Documentation | http://localhost:8000/docs | - |
| Prometheus | http://localhost:9090 | - |
| Grafana | http://localhost:3000 | admin/admin |
| pgAdmin | http://localhost:5050 | [email protected]/admin |
| Redis Commander | http://localhost:8081 | - |
# Create a user event
curl -X POST http://localhost:8000/api/v1/events \
-H "Content-Type: application/json" \
-d '{
"type": "user.created",
"source": "api",
"data": {
"user_id": "usr_12345",
"email": "[email protected]",
"username": "johndoe"
}
}'# Create and activate virtual environment
python -m venv .venv
.venv\Scripts\activate # Windows
source .venv/bin/activate # Linux/Mac
# Install dependencies
pip install -e ".[dev]"# Start dependencies
docker-compose up -d postgres redis
# Run database migrations
python -m eventflow.cli db upgrade
# Start API server
uvicorn eventflow.app:create_app --factory --reload
# In another terminal - Start worker
python -m eventflow.cli worker start# Generate 500 realistic events
python scripts/generate_demo_data.pyPOST /api/v1/events
Content-Type: application/json
{
"type": "user.login",
"source": "web-app",
"data": {
"user_id": "usr_123",
"ip_address": "192.168.1.1"
},
"metadata": {
"priority": "high",
"partition_key": "usr_123"
}
}GET /api/v1/events?event_type=user.login&limit=50&offset=0GET /api/v1/events/{event_id}POST /api/v1/events/replay
Content-Type: application/json
{
"stream": "events:user",
"start_time": "2024-01-01T00:00:00Z",
"end_time": "2024-01-02T00:00:00Z"
}GET /api/v1/dlq?limit=100POST /api/v1/dlq/{message_id}/retryDELETE /api/v1/dlq/{message_id}POST /api/v1/subscriptions
Content-Type: application/json
{
"name": "order-notifications",
"event_types": ["order.created", "order.completed"],
"webhook_url": "https://your-service.com/webhook",
"secret": "your-webhook-secret"
}GET /health # Basic health
GET /health/ready # Readiness probe
GET /health/live # Liveness probeGET /metrics # Prometheus metrics| Event Type | Description |
|---|---|
user.created |
New user registration |
user.updated |
User profile update |
user.deleted |
User account deletion |
user.login |
User login |
user.logout |
User logout |
| Event Type | Description |
|---|---|
system.health_check |
Service health report |
system.error |
System error occurred |
system.config_changed |
Configuration update |
| Event Type | Description |
|---|---|
order.created |
New order placed |
order.updated |
Order status change |
payment.completed |
Payment processed |
notification.sent |
Notification delivered |
| Event Type | Description |
|---|---|
page.view |
Page view tracked |
feature.used |
Feature usage logged |
conversion.tracked |
Conversion recorded |
| Variable | Default | Description |
|---|---|---|
APP_NAME |
eventflow | Application name |
APP_ENV |
development | Environment (development/staging/production) |
DEBUG |
false | Enable debug mode |
API_HOST |
0.0.0.0 | API server host |
API_PORT |
8000 | API server port |
DATABASE_URL |
postgresql+asyncpg://... | Database connection URL |
REDIS_URL |
redis://localhost:6379 | Redis connection URL |
LOG_LEVEL |
INFO | Logging level |
LOG_FORMAT |
json | Log format (json/console) |
| Variable | Default | Description |
|---|---|---|
BATCH_SIZE |
100 | Events per batch |
BATCH_TIMEOUT_MS |
1000 | Batch flush timeout |
CONSUMER_GROUP |
eventflow-workers | Redis consumer group |
MAX_RETRIES |
3 | Max retry attempts |
RETRY_BASE_DELAY |
1.0 | Base retry delay (seconds) |
# Run all tests
pytest
# Run with coverage
pytest --cov=eventflow --cov-report=html
# Run specific test file
pytest tests/test_api.py -v
# Run tests matching pattern
pytest -k "test_event" -v- Unit Tests: Schema validation, utility functions
- Integration Tests: API endpoints, database operations
- Reliability Tests: Retry mechanisms, circuit breaker
# Build images
docker-compose build
# Start in production mode
docker-compose -f docker-compose.yml up -d
# Scale workers
docker-compose up -d --scale eventflow-worker=3The containers include health checks for orchestration:
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3# View resource usage
docker stats
# View logs
docker-compose logs -f --tail=100
# Restart services
docker-compose restart eventflow-api eventflow-workerKey metrics exported:
| Metric | Type | Description |
|---|---|---|
eventflow_events_published_total |
Counter | Total events published |
eventflow_events_consumed_total |
Counter | Total events consumed |
eventflow_events_failed_total |
Counter | Failed event processing |
eventflow_event_processing_seconds |
Histogram | Processing latency |
eventflow_dlq_size |
Gauge | Dead letter queue size |
eventflow_circuit_breaker_state |
Gauge | Circuit breaker status |
Pre-configured dashboards for:
- Event throughput and latency
- Error rates and DLQ growth
- Consumer lag and processing time
- System health metrics
from eventflow.observability import retry_async, RetryConfig
config = RetryConfig(
max_retries=3,
base_delay=1.0,
max_delay=60.0,
exponential_base=2.0,
jitter=True,
)
@retry_async(config)
async def process_event(event):
# Processing logic
passfrom eventflow.observability import CircuitBreaker
breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=30.0,
half_open_max_calls=3,
)
async with breaker:
await call_external_service()- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
- Follow PEP 8 style guide
- Write tests for new features
- Update documentation as needed
- Use conventional commit messages
This project is licensed under the MIT License - see the LICENSE file for details.
- FastAPI - Modern Python web framework
- Redis - In-memory data store
- CloudEvents - Event specification
- Pydantic - Data validation
- SQLAlchemy - SQL toolkit
Built with β€οΈ for scalable event-driven systems