DStream is a robust, stateless Change Data Capture (CDC) streaming solution designed to capture changes from Microsoft SQL Server and reliably deliver them to downstream systems through Azure Service Bus or any other message queue provider. Its purpose is simple: collect and forward streaming data (currently CDC, but extensible to APIs, webhooks, or other streams) to any downstream system.
- No in-memory workflows.
- No complex orchestration.
- No recovery logic inside the service.
- State (offsets, LSNs, messages) is stored externally (in SQL or the queue).
- Crash-safe: restart, reschedule, or scale out with zero warmup or coordination.
- dstream offloads durability, retries, and ordering to external queue systems (Azure Service Bus, AWS SQS, Kafka).
- Exactly-once guarantees at the batch level via atomic offset commits after successful sends.
- Reduces fault scenarios.
- Eliminates workflow recovery issues.
- Cuts operational overhead.
- Easier to debug and operate.
- Works across any cloud, queue system, source (SQL CDC, APIs, webhooks), and destination (queues, services).
The message queue is the orchestrator. The checkpoint is the resume point. The consumer defines the final behavior.
DStream operates in two main stages:
-
Ingestion Stage (Ingester):
- Monitors SQL Server tables enabled with CDC
- Captures changes (inserts, updates, deletes)
- Publishes changes to a central ingest queue
- Updates CDC offsets only after successful queue publish
- Uses distributed locking for high availability
-
Routing Stage (Router):
- Consumes messages from the ingest queue
- Routes messages to their destination topics
- Pre-creates publishers at startup for optimal performance
- Ensures reliable delivery to downstream systems
This architecture provides several benefits:
- Reliable capture and delivery of changes
- Proper sequencing of messages
- High availability through distributed locking
- Optimized performance with connection pooling
- Clear separation of concerns between ingestion and routing
- CDC Monitoring: Tracks changes (inserts, updates, deletes) on MS SQL Server tables enabled with CDC
- Reliable Offset Management: Updates CDC offsets only after successful publish to ingest queue
- Distributed Locking: Uses Azure Blob Storage for distributed locking in multi-instance deployments
- Adaptive Polling: Features adaptive backoff for table monitoring based on change frequency
- Automatic Topic Creation: Creates topics and subscriptions for each monitored table
- Optimized Publishing: Pre-creates and caches publishers at startup for better performance
- Reliable Delivery: Ensures messages are properly delivered to destination topics
- Message Preservation: Maintains original message properties during routing
- Automatic Topic Management: Creates topics and subscriptions as needed
- Flexible Configuration: HCL-based configuration with environment variable support
- Structured Logging: Built-in structured logging with configurable levels
- High Availability: Supports running multiple instances for redundancy
- Message Metadata: Includes rich metadata for proper message routing and tracking
- MS SQL Server with CDC enabled on target tables
- Azure Service Bus for message streaming
- Azure Blob Storage for distributed locking
- Go (latest version recommended)
-
Clone the repository:
git clone https://github.com/katasec/dstream.git cd dstream
-
Install dependencies:
go mod tidy
-
Configure environment variables:
export DSTREAM_DB_CONNECTION_STRING="sqlserver://user:pass@localhost:1433?database=TestDB" export DSTREAM_INGEST_CONNECTION_STRING="your-azure-service-bus-connection-string" export DSTREAM_BLOB_CONNECTION_STRING="your-azure-blob-storage-connection-string" export DSTREAM_PUBLISHER_CONNECTION_STRING="your-azure-service-bus-connection-string" export DSTREAM_LOG_LEVEL="debug" # Optional, defaults to info
DStream uses HCL for configuration. Here's an example dstream.hcl
:
ingester {
db_type = "sqlserver"
db_connection_string = "{{ env \"DSTREAM_DB_CONNECTION_STRING\" }}"
poll_interval_defaults {
poll_interval = "5s"
max_poll_interval = "2m"
}
queue {
type = "servicebus"
name = "ingest-queue"
connection_string = "{{ env \"DSTREAM_INGEST_CONNECTION_STRING\" }}"
}
locks {
type = "azure_blob"
connection_string = "{{ env \"DSTREAM_BLOB_CONNECTION_STRING\" }}"
container_name = "locks"
}
tables = ["Persons"]
tables_overrides {
overrides {
table_name = "Persons"
poll_interval = "5s"
max_poll_interval = "10m"
}
}
}
publisher {
source {
type = "azure_service_bus"
connection_string = "{{ env \"DSTREAM_PUBLISHER_CONNECTION_STRING\" }}"
}
output {
type = "azure_service_bus"
connection_string = "{{ env \"DSTREAM_PUBLISHER_CONNECTION_STRING\" }}"
}
}
The ingester captures changes from SQL Server and publishes them to the ingest queue:
# Start with debug logging
go run . ingester --log-level debug
# Start with info logging (default)
go run . ingester
The ingester will:
- Create topics for each monitored table
- Create a 'sub1' subscription for each topic
- Begin monitoring tables for changes
- Publish changes to the ingest queue
- Update CDC offsets after successful publish
The router consumes messages from the ingest queue and routes them to destination topics:
# Start with debug logging
go run . router --log-level debug
# Start with info logging (default)
go run . router
The router will:
- Pre-create publishers for all configured tables
- Begin consuming messages from the ingest queue
- Route messages to their destination topics
- Ensure reliable delivery with proper sequencing
DStream uses a consistent message format throughout the pipeline:
{
"data": {
"FirstName": "Diana",
"ID": "180",
"LastName": "Williams"
},
"metadata": {
"Destination": "server.database.table.events",
"IngestQueue": "ingest-queue",
"LSN": "0000003600000b200003",
"OperationID": 2,
"OperationType": "Insert",
"TableName": "Persons"
}
}
- Contains the actual change data
- Includes all columns from the monitored table
- Values are preserved in their original types
Destination
: Fully qualified destination topic nameIngestQueue
: Name of the central ingest queueLSN
: Log Sequence Number from SQL Server CDCOperationID
: Type of change (1=delete, 2=insert, 3=update before, 4=update after)OperationType
: Human-readable operation typeTableName
: Source table name
For production deployments:
go run . server --log-level info
DStream follows a modular architecture with clear separation of concerns:
-
CDC Monitor
- Monitors SQL Server tables for changes using CDC
- Uses adaptive polling with configurable intervals
- Tracks changes using LSN (Log Sequence Numbers)
-
Publisher Adapter
- Wraps publishers with additional metadata
- Adds destination routing information
- Provides a unified interface for all publishers
-
Publishers
- Pluggable components that handle message delivery
- Implementations available for:
- Azure Service Bus
- Azure Event Hubs
- Console (for debugging)
- Easy to add new implementations via the Publisher interface
[SQL Server] --> [CDC Monitor] --> [Publisher Adapter] --> [Publisher] --> [Destination]
| | | |
| | | |- Service Bus
| | | |- Event Hubs
| | | |- Console
| | |
| | |- Add Metadata
| | |- Route Information
| |
| |- Track LSN
| |- Adaptive Polling
|
|- CDC Enabled Tables
-
Modularity
- Clear separation between components
- Pluggable publishers for different destinations
- Easy to extend and maintain
-
Reliability
- Distributed locking for multiple instances
- Message queuing for reliable delivery
- Graceful shutdown handling
-
Observability
- Structured logging throughout
- Configurable log levels
- Clear error reporting
-
Configuration
- HCL-based configuration
- Environment variable support
- Per-table configuration options
DStream publishes changes in a standardized JSON format:
{
"data": {
"Field1": "Value1",
"Field2": "Value2"
},
"metadata": {
"Destination": "topic-or-queue-name",
"LSN": "00000034000025c80003",
"OperationID": 2,
"OperationType": "Insert|Update|Delete",
"TableName": "TableName"
}
}
The metadata includes:
- Destination for routing
- LSN for tracking
- Operation type (Insert=2, Update=4, Delete=1)
- Source table name
Contributions are welcome! Please submit a pull request or create an issue if you encounter bugs or have suggestions for new features.
This project is licensed under the MIT License. See the LICENSE file for details.
- CheckpointManager tracks LSN offsets.
- Only commits after successful batch publishing to ensure exactly-once delivery.
- Uses BackOffManager for dynamic backoff based on activity and errors.
- Dynamically optimizes batch sizes considering message size limits and table characteristics.
- Ensures single active table monitoring across instances.
- Simple, declarative config via
dstream.hcl
.
- TableMonitor and planned Publisher interfaces make sources and sinks swappable.
- Ready for future sources (Postgres, APIs) and destinations (Kafka, Event Hub).
Keep dstream lean, focused, and stateless—so it’s reliable, resilient, and boring (in the best way).