The from-proto command allows you to run a Substreams SQL sink directly from a protocol buffer definition without needing to set up separate schema files. This command automatically generates the SQL schema from your protobuf definitions and runs the sink in a single step.
💡 See it in action: Check out the ClickHouse Showcase for a complete working example with USDC transfers, including advanced features like materialized views and partitioning strategies.
The from-proto command streamlines the process of running a SQL sink by:
- Reading your Substreams manifest and protobuf definitions
- Automatically generating the SQL schema from proto annotations
- Creating database tables based on the proto message structure
- Running the sink to stream data from Substreams to your database
substreams-sink-sql from-proto <dsn> <manifest> [output-module]<dsn>: Database connection string (Data Source Name) - see DSN Format section below<manifest>: Path to your Substreams manifest file (substreams.yaml)[output-module]: Optional. Name of the output module to stream from (defaults to auto-detection)
-e, --substreams-endpoint: Substreams gRPC endpoint-s, --start-block: Start block number to stream from-t, --stop-block: Stop block to end stream at (default: 0, meaning no limit)--no-constraints: Skip adding database constraints (useful for fast initial imports)--block-batch-size: Number of blocks to process at a time (default: 25)
The Data Source Name (DSN) specifies how to connect to your database. The format varies by database type:
postgres://[username[:password]@]host[:port]/database[?param1=value1¶m2=value2]
Examples:
# Basic connection
postgres://localhost:5432/postgres
# With authentication (replace with actual credentials)
postgres://[username]:[password]@localhost:5432/mydb
# With SSL disabled and custom schema
postgres://localhost:5432/postgres?sslmode=disable&schemaName=orders
# Production example with SSL (replace placeholders)
postgres://[username]:[password]@[hostname]:5432/analytics?sslmode=requireCommon PostgreSQL Parameters:
sslmode: SSL connection mode (disable,require,verify-ca,verify-full)schemaName: Target schema name (defaults topublic)connect_timeout: Connection timeout in secondsapplication_name: Application name for connection tracking
clickhouse://[username[:password]@]host[:port]/database[?param1=value1¶m2=value2]
Examples:
# Basic connection (default user, no password)
clickhouse://127.0.0.1:9000/order?secure=false
# With authentication (replace with actual credentials)
clickhouse://[username]:[password]@localhost:9000/analytics
# Secure connection (replace placeholders)
clickhouse://[username]:[password]@[hostname]:9440/mydb?secure=true
# With custom settings
clickhouse://localhost:9000/analytics?secure=false&compress=true&debug=trueCommon ClickHouse Parameters:
secure: Use TLS encryption (trueorfalse)compress: Enable compression (trueorfalse)debug: Enable debug logging (trueorfalse)dial_timeout: Connection timeoutmax_execution_time: Query execution timeout
For security, avoid hardcoding credentials in commands. Use environment variables:
# Set DSN as environment variable (replace with actual credentials)
export DSN="postgres://[username]:[password]@localhost:5432/mydb?sslmode=disable"
# Use in command
substreams-sink-sql from-proto $DSN substreams.yamlPostgreSQL with Docker:
docker run --name postgres \
-e POSTGRES_PASSWORD=password \
-e POSTGRES_DB=analytics \
-p 5432:5432 -d postgres:13
export DSN="postgres://postgres:password@localhost:5432/analytics?sslmode=disable"ClickHouse with Docker:
docker run --name clickhouse \
-p 9000:9000 -p 8123:8123 \
-d clickhouse/clickhouse-server
export DSN="clickhouse://127.0.0.1:9000/default?secure=false"Start by creating a new Substreams project:
# Create a new Substreams project
substreams init
# Follow the interactive prompts to configure your project
# This will create the basic project structure with:
# - substreams.yaml (manifest file)
# - proto/ directory for protocol buffer definitions
# - src/ directory for your Rust code
# - Cargo.toml for Rust dependenciesCreate a .proto file that defines your data structure with SQL schema annotations. Here's an example based on the USDC transfers showcase:
syntax = "proto3";
import "google/protobuf/timestamp.proto";
import "sf/substreams/sink/sql/schema/v1/schema.proto";
package transfers;
message Output {
repeated Transfer transfers = 1;
}
message Transfer {
option (schema.table) = {
name: "transfers"
clickhouse_table_options: {
order_by_fields: [{name: "tx_hash"}, {name: "log_index"}]
partition_fields: [{name: "_block_timestamp_", function: toYYYYMM}]
index_fields: [{
field_name: "from_addr"
name: "from_idx"
type: bloom_filter
granularity: 4
}, {
field_name: "to_addr"
name: "to_idx"
type: bloom_filter
granularity: 4
}]
}
};
string tx_hash = 1;
uint32 log_index = 2;
string from_addr = 3;
string to_addr = 4;
string amount = 5 [(schema.field) = { convertTo: { uint256{} } }]; // Large token amounts as string
string contract_addr = 6;
google.protobuf.Timestamp timestamp = 7;
}Create a substreams.yaml file that references your protobuf:
specVersion: v0.1.0
package:
name: 'usdc-transfers'
version: v0.1.0
doc: |
USDC transfers extraction for SQL sink
protobuf:
files:
- transfers.proto
importPaths:
- ./proto
binaries:
default:
type: wasm/rust-v1
file: ./target/wasm32-unknown-unknown/release/usdc_transfers.wasm
modules:
- name: map_transfer
kind: map
inputs:
- source: sf.ethereum.type.v2.Block
output:
type: proto:transfers.Output
network: mainnet
sink:
module: map_transfer
type: sf.substreams.sink.sql.v1.Service
config: {}Create your Rust module that outputs data matching your proto definition:
use substreams::prelude::*;
use substreams_ethereum::pb::eth;
#[substreams::handlers::map]
fn map_transfer(block: eth::v2::Block) -> Result<transfers::Output, substreams::errors::Error> {
let mut output = transfers::Output::default();
// Process block data and extract USDC transfers
for transaction in block.transaction_traces {
for log in transaction.receipt.unwrap().logs {
// Check if this is a USDC transfer event
if is_usdc_transfer(&log) {
let transfer = extract_transfer_data(&log, &transaction);
output.transfers.push(transfer);
}
}
}
Ok(output)
}
// Helper functions would be implemented here
fn is_usdc_transfer(log: ð::v2::Log) -> bool {
// Implementation to check if log is USDC transfer
// ...
}
fn extract_transfer_data(log: ð::v2::Log, tx: ð::v2::TransactionTrace) -> transfers::Transfer {
// Implementation to extract transfer data from log
// ...
}Build your Substreams with:
substreams buildStart your database (PostgreSQL or ClickHouse):
PostgreSQL:
docker run --name postgres -e POSTGRES_PASSWORD=password -p 5432:5432 -d postgres:13ClickHouse:
docker run --name clickhouse -p 9000:9000 -d clickhouse/clickhouse-serverExecute the from-proto command to automatically generate schema and start streaming:
PostgreSQL:
export DSN="postgres://postgres:password@localhost:5432/postgres?sslmode=disable"
substreams-sink-sql from-proto $DSN substreams.yamlClickHouse:
export DSN="clickhouse://127.0.0.1:9000/default?secure=false"
substreams-sink-sql from-proto $DSN substreams.yamlThe from-proto command relies on special protobuf annotations to generate the SQL schema. Here are the key annotations:
message MyTable {
option (schema.table) = {
name: "my_table"
clickhouse_table_options: {
order_by_fields: [{name: "id"}]
partition_fields: [{name: "created_date", function: toYYYYMM}]
index_fields: [{
field_name: "status"
name: "status_idx"
type: bloom_filter
granularity: 4
}]
}
};
}// Primary key
string id = 1 [(schema.field) = { primary_key: true }];
// Foreign key relationship
string user_id = 2 [(schema.field) = { foreign_key: "users on id"}];
// Unique constraint
string email = 3 [(schema.field) = { unique: true }];
// Custom column name
string user_address = 4 [(schema.field) = { name: "wallet_address" }];
// String-to-numeric conversion for large values
string token_amount = 5 [(schema.field) = { convertTo: { uint256{} } }];
// Decimal conversion with specific scale
string price = 6 [(schema.field) = { convertTo: { decimal128{ scale: 18 } } }];For nested messages, you can create child tables:
message OrderItem {
option (schema.table) = {
name: "order_items",
child_of: "orders on order_id"
};
string item_id = 1;
int64 quantity = 2;
}The from-proto command automatically maps protobuf types to SQL types:
| Protobuf Type | PostgreSQL Type | ClickHouse Type |
|---|---|---|
string |
VARCHAR(255) |
String |
int32, sint32, sfixed32 |
INTEGER |
Int32 |
int64, sint64, sfixed64 |
BIGINT |
Int64 |
uint32, fixed32 |
NUMERIC |
UInt32 |
uint64, fixed64 |
NUMERIC |
UInt64 |
float |
DECIMAL |
Float32 |
double |
DOUBLE PRECISION |
Float64 |
bool |
BOOLEAN |
Bool |
bytes |
TEXT |
String |
google.protobuf.Timestamp |
TIMESTAMP |
DateTime |
repeated <type> |
<type>[] |
Array(<type>) |
For handling large numeric values that exceed standard integer ranges, you can use string-to-numeric conversion:
| String Conversion Type | PostgreSQL Type | ClickHouse Type | Use Case |
|---|---|---|---|
Int128 |
NUMERIC(39,0) |
Int128 |
128-bit signed integers |
UInt128 |
NUMERIC(39,0) |
UInt128 |
128-bit unsigned integers |
Int256 |
NUMERIC(78,0) |
Int256 |
256-bit signed integers |
UInt256 |
NUMERIC(78,0) |
UInt256 |
256-bit unsigned integers |
Decimal128 |
NUMERIC(38,scale) |
Decimal128(precision,scale) |
128-bit decimals |
Decimal256 |
NUMERIC(76,scale) |
Decimal256(precision,scale) |
256-bit decimals |
When your protobuf contains string fields that represent large numeric values, you can specify conversion:
message Transfer {
// Convert string amount to UInt256 for precise arithmetic
string amount = 1 [(schema.field) = { convertTo: { uint256{} } }];
// Convert string balance to Decimal128 with 18 decimal places
string balance = 2 [(schema.field) = { convertTo: { decimal128{ scale: 18 } } }];
}This is particularly useful for:
- Cryptocurrency amounts: Token values that exceed uint64 range
- Financial calculations: High-precision decimal arithmetic
- Large identifiers: 256-bit hashes or IDs stored as strings
Stream specific block ranges:
substreams-sink-sql from-proto $DSN substreams.yaml \
--start-block 1000000 \
--stop-block 1001000For high-throughput scenarios:
substreams-sink-sql from-proto $DSN substreams.yaml \
--no-constraints \
--block-batch-size 100For ClickHouse with additional configuration:
substreams-sink-sql from-proto $DSN substreams.yaml \
--clickhouse-sink-info-folder ./clickhouse-info \
--clickhouse-cursor-file-path ./cursor.txtThe sink automatically uses ClickHouse's ReplacingMergeTree engine with enhanced cleanup capabilities:
ENGINE = ReplacingMergeTree(_version_, _deleted_)
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1This provides:
- Automatic deduplication based on the
_version_column - Soft deletes using the
_deleted_column for handling chain reorgs - Experimental cleanup for better storage efficiency
Configure optimal data organization for query performance:
message Transfer {
option (schema.table) = {
name: "transfers"
clickhouse_table_options: {
// Order by transaction hash and block for efficient range queries
order_by_fields: [
{name: "trx_hash"},
{name: "_block_number_"},
{name: "from"},
{name: "to"}
]
// Partition by month for efficient time-based queries
partition_fields: [{name: "_block_timestamp_", function: toYYYYMM}]
// Add indexes for specific query patterns
index_fields: [{
field_name: "from"
name: "from_idx"
type: bloom_filter
granularity: 4
}]
}
};
}Create efficient pre-aggregated views that handle chain reorgs correctly:
CREATE MATERIALIZED VIEW transfers.monthly_transfers
ENGINE = SummingMergeTree()
PARTITION BY month
ORDER BY month
AS
SELECT
toYYYYMM(_block_timestamp_) AS month,
sum(if(_deleted_, -toInt256(amount), toInt256(amount))) AS volume,
sum(if(_deleted_, -1, 1)) AS transfer_count
FROM transfers.transfers
GROUP BY month;The key pattern is using if(_deleted_, -value, value) to handle reorg corrections automatically.
For current state queries, filter out deleted records:
SELECT * FROM transfers.transfers
WHERE _deleted_ = 0For aggregations with reorg safety, use the additive pattern:
SELECT
sum(if(_deleted_, -amount, amount)) AS net_volume
FROM transfers.transfers📚 Deep Dive: See the ClickHouse Showcase Deep Dive for detailed explanations of protobuf-to-schema mapping and materialized view patterns.
- Proto import errors: Ensure all required proto files are in your import paths
- Schema annotation errors: Verify you're importing
sf/substreams/sink/sql/schema/v1/schema.proto - Database connection issues: Check your DSN format and database accessibility (see DSN Format section)
- Module output type errors: Ensure your Substreams module outputs the expected proto message
- String conversion errors: Verify that string fields marked for numeric conversion contain valid numeric values
- ClickHouse partition errors: Ensure partition functions match your data types (e.g.,
toYYYYMMfor timestamps)
- Use
substreams infoto verify your manifest structure - Check database logs for schema creation issues
- Verify your protobuf definitions compile correctly
- Test with a small block range first (
--start-blockand--stop-block)
-
ClickHouse Showcase - Production-ready USDC transfers example featuring:
- Real-world protobuf definitions with ClickHouse optimizations
- String-to-UInt256 conversion for token amounts
- Monthly partitioning and efficient ordering
- Materialized views for aggregations
- Docker setup for easy testing
-
Test Project - Comprehensive test suite demonstrating:
- Complex proto definitions with relationships
- Various data types and constraints
- Child table relationships
- PostgreSQL and ClickHouse compatibility
- ClickHouse Improvements: Removed deprecated
ClickhouseReplacingFieldfunctionality - Enhanced ReplacingMergeTree: Streamlined logic with
allow_experimental_replacing_merge_with_cleanupsetting - Better Performance: Optimized table engine configuration for production workloads
- Extended Numeric Types: Added support for
Int128,UInt128,Int256,UInt256,Decimal128, andDecimal256 - String-to-Numeric Conversion: Convert string fields to native numeric types for better performance
- Enhanced Type Mapping: Improved PostgreSQL and ClickHouse type mapping system
- From v4.7.x to v4.8.0+: No breaking changes, new numeric types are opt-in via field annotations
- From v4.8.x to v4.9.0: ClickHouse users may see improved performance due to ReplacingMergeTree optimizations
- Existing deployments: Continue to work without changes, new features available for new schema definitions