Automated data collection from BlueSky with AI-powered disaster analysis using PostgreSQL for efficient storage and deduplication.
- Automated Data Collection: Daily scheduled pulls from BlueSky
- AI Analysis: Google Gemini powered disaster severity assessment
- PostgreSQL Database: Efficient storage with automatic deduplication
- REST API: Access collected data, statistics, and analysis
- Task Queue: Celery + Redis for robust background processing
- Docker: Fully containerized with docker-compose
βββββββββββββββ ββββββββββββββββ βββββββββββββββ
β BlueSky βββββ>β FastAPI App βββββ>β PostgreSQL β
β API β β + Celery β β Database β
βββββββββββββββ ββββββββββββββββ βββββββββββββββ
β
βΌ
ββββββββββββββββ
β Gemini β
β AI β
ββββββββββββββββ
collection_runs: Tracks each data collection runposts: Stores BlueSky posts (deduplicated by post ID)disasters: AI-extracted disaster information
cp env.example .env
# Edit .env with your credentialsdocker-compose up -dThis will start:
- PostgreSQL (port 5432): Database
- Redis (port 6379): Message broker
- API (port 8000): FastAPI server
- Celery Worker: Background task processor
- Celery Beat: Task scheduler
# Check all services are running
docker-compose ps
# Test API
curl http://localhost:8000/api/
# Check statistics
curl http://localhost:8000/api/stats| Endpoint | Method | Description |
|---|---|---|
/api/ |
GET | Service info and endpoint list |
/api/health |
GET | Health check |
/api/stats |
GET | Collection statistics |
| Endpoint | Method | Description |
|---|---|---|
/api/trigger |
POST | Manually trigger data collection |
/api/task/{task_id} |
GET | Get task status |
| Endpoint | Method | Description |
|---|---|---|
/api/disasters |
GET | Get recent disasters |
/api/disasters/{id} |
GET | Get specific disaster |
/api/posts/recent |
GET | Get recent posts |
/api/runs |
GET | Get collection runs |
# Get statistics
curl http://localhost:8000/api/stats
# Trigger manual collection
curl -X POST http://localhost:8000/api/trigger
# Get recent disasters
curl http://localhost:8000/api/disasters?limit=10
# Get recent posts
curl http://localhost:8000/api/posts/recent?limit=20- β 213KB JSON files per run
- β Duplicate posts across runs
- β No query capability
- β Manual deduplication needed
- β Slow file parsing
- β Efficient normalized storage
- β Automatic deduplication
- β Fast queries and filtering
- β Historical tracking
- β Structured data analysis
Edit .env file:
# BlueSky Credentials
BlueSky_Username=your-username.bsky.social
BlueSky_Password=your-app-password
# Google Gemini API
GOOGLE_API_KEY=your-google-api-key
# Collection Settings
SEARCH_HASHTAG=#earthquake
POST_LIMIT=50
SCHEDULE_HOURS=24
# Database
DATABASE_URL=postgresql://bluesky:bluesky123@postgres:5432/bluesky
# Redis
REDIS_URL=redis://redis:6379/0# All services
docker-compose logs -f
# Specific service
docker-compose logs -f api
docker-compose logs -f celery-worker
docker-compose logs -f celery-beat# All services
docker-compose restart
# Specific service
docker-compose restart api# Connect to PostgreSQL
docker-compose exec postgres psql -U bluesky -d bluesky
# Example queries
SELECT COUNT(*) FROM posts;
SELECT * FROM disasters ORDER BY extracted_at DESC LIMIT 10;
SELECT * FROM collection_runs ORDER BY started_at DESC;- Celery Beat triggers task every 24 hours
- Celery Worker executes collection task:
- Creates new collection run in DB
- Fetches posts from BlueSky API
- Saves posts (deduplicated by post ID)
- Sends posts to Gemini AI
- Parses and saves disaster information
- Marks run as completed
- API serves data from PostgreSQL
-- Delete old collection runs (keeps last 30 days)
DELETE FROM collection_runs
WHERE started_at < NOW() - INTERVAL '30 days';
-- Vacuum database
VACUUM ANALYZE;docker-compose exec postgres pg_dump -U bluesky bluesky > backup.sqldocker-compose exec -T postgres psql -U bluesky bluesky < backup.sql# Check PostgreSQL logs
docker-compose logs postgres
# Verify connection
docker-compose exec postgres psql -U bluesky -d bluesky -c "SELECT 1;"# Check Celery worker
docker-compose logs celery-worker
# Check Celery beat
docker-compose logs celery-beat
# Verify Redis
docker-compose exec redis redis-cli ping# Check API logs
docker-compose logs api
# Restart API
docker-compose restart api- Python 3.13: Core language
- FastAPI: Web framework
- PostgreSQL 16: Database
- SQLAlchemy: ORM
- Celery: Task queue
- Redis: Message broker
- Google Gemini: AI analysis
- BlueSky API: Data source
- Docker: Containerization
BlueSky-Integration/
βββ app/
β βββ __init__.py
β βββ main.py # FastAPI app
β βββ config.py # Configuration
β βββ database.py # Database setup
β βββ models.py # SQLAlchemy models
β βββ celery_app.py # Celery config
β βββ tasks.py # Celery tasks
β βββ routes/
β β βββ api.py # API endpoints
β βββ services/
β βββ bluesky.py # BlueSky integration
β βββ analysis.py # AI analysis
β βββ database_service.py # Database operations
βββ demo.ipynb # Jupyter notebook
βββ docker-compose.yml # Multi-service setup
βββ Dockerfile # Container image
βββ requirements.txt # Python dependencies
βββ env.example # Environment template
βββ README.md # This file
- Implement Filtering: Add location/severity filters
- Add Notifications: Email/Slack alerts for high-severity events
- Visualization: Create dashboard with charts
- Export: Add CSV/JSON export endpoints
- Search: Full-text search on posts
- Aggregations: Daily/weekly summaries
MIT License - feel free to use and modify!