Skip to content

Commit 6a5c0ef

Browse files
committed
Add support for nested object handling in PostgreSQL and ClickHouse dialects
- Introduced `AppendInlineFieldValues` method for inline field processing in dialects. - Enhanced handling for repeated and single nested fields in PostgreSQL (uses JSONB type) and ClickHouse. - Updated protobuf schema and tests to include nested types (`level1` and `list_of_level1`). - Removed unused dependencies and cleaned up the `go.sum` and `go.mod` files. - Updated CHANGELOG for version `v4.11.0` to reflect changes.
1 parent 490fdbe commit 6a5c0ef

12 files changed

Lines changed: 145 additions & 74 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## v4.11.0
9+
* Added support for nested objects in postgres from-proto mode. Table with nested objects will be created with `JSONB` type.
10+
811
## v4.10.0
912
* Added support for one level deep nested objects in clickhouse from-proto mode. Table with nested objects will be created with `Nested` type.
1013
* Fix postgres handling of type bytea[]

db_proto/sql/click_house/dialect.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"github.com/streamingfast/substreams-sink-sql/db_proto/sql/schema"
1313
pbSchmema "github.com/streamingfast/substreams-sink-sql/pb/sf/substreams/sink/sql/schema/v1"
1414
"go.uber.org/zap"
15+
"google.golang.org/protobuf/reflect/protoreflect"
16+
"google.golang.org/protobuf/types/dynamicpb"
1517
)
1618

1719
const staticSqlCreatDatabase = `
@@ -197,6 +199,53 @@ func (d *DialectClickHouse) FullTableName(table *schema.Table) string {
197199
return tableName(d.schemaName, table.Name)
198200
}
199201

202+
func (d *DialectClickHouse) AppendInlineFieldValues(fieldValues []any, fd protoreflect.FieldDescriptor, fv protoreflect.Value, dm *dynamicpb.Message) ([]any, error) {
203+
if fd.IsList() {
204+
// Handle as array of nested columns - flatten into multiple arrays
205+
list := fv.List()
206+
if list.Len() > 0 {
207+
firstMessage := list.Get(0).Message().Interface().(*dynamicpb.Message)
208+
nestedFields := firstMessage.Descriptor().Fields()
209+
210+
// For each nested field, create an array of values from all list elements
211+
for j := 0; j < nestedFields.Len(); j++ {
212+
nestedFd := nestedFields.Get(j)
213+
var nestedValues []interface{}
214+
215+
// Collect values for this nested field from all list elements
216+
for k := 0; k < list.Len(); k++ {
217+
fm := list.Get(k).Message().Interface().(*dynamicpb.Message)
218+
nestedValue := fm.Get(nestedFd)
219+
nestedValues = append(nestedValues, nestedValue.Interface())
220+
}
221+
222+
fieldValues = append(fieldValues, nestedValues)
223+
}
224+
} else {
225+
// Empty list - need to get field count from descriptor
226+
// Get the message descriptor for this field type
227+
msgDesc := fd.Message()
228+
nestedFields := msgDesc.Fields()
229+
230+
// Append empty arrays for each nested field
231+
for j := 0; j < nestedFields.Len(); j++ {
232+
fieldValues = append(fieldValues, []interface{}{})
233+
}
234+
}
235+
} else {
236+
// Handle as nested column - extract each field as an array
237+
fm := fv.Message().Interface().(*dynamicpb.Message)
238+
nestedFields := fm.Descriptor().Fields()
239+
for j := 0; j < nestedFields.Len(); j++ {
240+
nestedFd := nestedFields.Get(j)
241+
nestedValue := fm.Get(nestedFd)
242+
// Wrap the single value in an array (array of size 1)
243+
fieldValues = append(fieldValues, []interface{}{nestedValue.Interface()})
244+
}
245+
}
246+
return fieldValues, nil
247+
}
248+
200249
func (d *DialectClickHouse) SchemaHash() string {
201250
h := fnv.New64a()
202251

db_proto/sql/database.go

Lines changed: 10 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -144,35 +144,11 @@ func (d *BaseDatabase) WalkMessageDescriptorAndInsertWithDialect(dm *dynamicpb.M
144144
// Check if this is an inline nested array
145145
fieldInfo := proto.FieldInfo(fd)
146146
if fieldInfo != nil && fieldInfo.Inline {
147-
// Handle as array of nested columns - flatten into multiple arrays
148-
if list.Len() > 0 {
149-
firstMessage := list.Get(0).Message().Interface().(*dynamicpb.Message)
150-
nestedFields := firstMessage.Descriptor().Fields()
151-
152-
// For each nested field, create an array of values from all list elements
153-
for j := 0; j < nestedFields.Len(); j++ {
154-
nestedFd := nestedFields.Get(j)
155-
var nestedValues []interface{}
156-
157-
// Collect values for this nested field from all list elements
158-
for k := 0; k < list.Len(); k++ {
159-
fm := list.Get(k).Message().Interface().(*dynamicpb.Message)
160-
nestedValue := fm.Get(nestedFd)
161-
nestedValues = append(nestedValues, nestedValue.Interface())
162-
}
163-
164-
fieldValues = append(fieldValues, nestedValues)
165-
}
166-
} else {
167-
// Empty list - need to get field count from descriptor
168-
// Get the message descriptor for this field type
169-
msgDesc := fd.Message()
170-
nestedFields := msgDesc.Fields()
171-
172-
// Append empty arrays for each nested field
173-
for j := 0; j < nestedFields.Len(); j++ {
174-
fieldValues = append(fieldValues, []interface{}{})
175-
}
147+
// Delegate to dialect for inline handling
148+
var err error
149+
fieldValues, err = dialect.AppendInlineFieldValues(fieldValues, fd, fv, dm)
150+
if err != nil {
151+
return 0, fmt.Errorf("appending inline field values for %q: %w", string(fd.Name()), err)
176152
}
177153
} else if list.Len() > 0 {
178154
// Array of messages - process as child tables
@@ -204,13 +180,11 @@ func (d *BaseDatabase) WalkMessageDescriptorAndInsertWithDialect(dm *dynamicpb.M
204180
// Check if this field should be treated as a nested (inline) column
205181
fieldInfo := proto.FieldInfo(fd)
206182
if fieldInfo != nil && fieldInfo.Inline {
207-
// Handle as nested column - extract each field as an array
208-
nestedFields := fm.Descriptor().Fields()
209-
for j := 0; j < nestedFields.Len(); j++ {
210-
nestedFd := nestedFields.Get(j)
211-
nestedValue := fm.Get(nestedFd)
212-
// Wrap the single value in an array (array of size 1)
213-
fieldValues = append(fieldValues, []interface{}{nestedValue.Interface()})
183+
// Delegate to dialect for inline handling
184+
var err error
185+
fieldValues, err = dialect.AppendInlineFieldValues(fieldValues, fd, fv, dm)
186+
if err != nil {
187+
return 0, fmt.Errorf("appending inline field values for %q: %w", string(fd.Name()), err)
214188
}
215189
continue
216190
}

db_proto/sql/dialect.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"github.com/streamingfast/substreams-sink-sql/db_proto/sql/schema"
55
"go.uber.org/zap"
66
"golang.org/x/exp/maps"
7+
"google.golang.org/protobuf/reflect/protoreflect"
8+
"google.golang.org/protobuf/types/dynamicpb"
79
)
810

911
const DialectTableBlock = "_blocks_"
@@ -21,6 +23,7 @@ type Dialect interface {
2123
GetTables() []*schema.Table
2224
UseVersionField() bool
2325
UseDeletedField() bool
26+
AppendInlineFieldValues(fieldValues []any, fd protoreflect.FieldDescriptor, fv protoreflect.Value, dm *dynamicpb.Message) ([]any, error)
2427
}
2528

2629
type BaseDialect struct {

db_proto/sql/postgres/dialect.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,14 @@ import (
77
"sort"
88
"strings"
99

10+
"github.com/lib/pq"
1011
"github.com/streamingfast/substreams-sink-sql/bytes"
1112
sql2 "github.com/streamingfast/substreams-sink-sql/db_proto/sql"
1213
"github.com/streamingfast/substreams-sink-sql/db_proto/sql/schema"
1314
"go.uber.org/zap"
15+
"google.golang.org/protobuf/encoding/protojson"
16+
"google.golang.org/protobuf/reflect/protoreflect"
17+
"google.golang.org/protobuf/types/dynamicpb"
1418
)
1519

1620
const postgresStaticSql = `
@@ -134,6 +138,9 @@ func (d *DialectPostgres) createTable(table *schema.Table) error {
134138
switch {
135139
case f.IsRepeated:
136140
// Arrays are now supported, continue processing
141+
case f.Nested != nil:
142+
// Nested types are not supported, continue processing
143+
fmt.Println("found nested type")
137144
case f.IsMessage && !IsWellKnownType(f.FieldDescriptor):
138145
childTable, found := d.TableRegistry[f.Message]
139146
if !found {
@@ -205,6 +212,32 @@ func (d *DialectPostgres) FullTableName(table *schema.Table) string {
205212
return tableName(d.schemaName, table.Name)
206213
}
207214

215+
func (d *DialectPostgres) AppendInlineFieldValues(fieldValues []any, fd protoreflect.FieldDescriptor, fv protoreflect.Value, dm *dynamicpb.Message) ([]any, error) {
216+
if fd.IsList() {
217+
// For repeated inline messages, append the list of JSON strings
218+
list := fv.List()
219+
var jsonStrings []string
220+
for j := 0; j < list.Len(); j++ {
221+
fm := list.Get(j).Message().Interface().(*dynamicpb.Message)
222+
jsonBytes, err := protojson.Marshal(fm)
223+
if err != nil {
224+
return nil, fmt.Errorf("failed to marshal protobuf message to JSON: %w", err)
225+
}
226+
jsonStrings = append(jsonStrings, string(jsonBytes))
227+
}
228+
fieldValues = append(fieldValues, pq.Array(jsonStrings))
229+
} else {
230+
// For single inline message, append the JSON string
231+
fm := fv.Message().Interface().(*dynamicpb.Message)
232+
jsonBytes, err := protojson.Marshal(fm)
233+
if err != nil {
234+
return nil, fmt.Errorf("failed to marshal protobuf message to JSON: %w", err)
235+
}
236+
fieldValues = append(fieldValues, string(jsonBytes))
237+
}
238+
return fieldValues, nil
239+
}
240+
208241
func (d *DialectPostgres) SchemaHash() string {
209242
h := fnv.New64a()
210243

db_proto/sql/postgres/row_inserter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func createInsertFromDescriptor(table *schema.Table, dialect sql2.Dialect) (stri
110110
if field.IsExtension { //not a direct child
111111
continue
112112
}
113-
if field.IsRepeated {
113+
if field.IsRepeated && field.Nested == nil {
114114
// Check if it's a repeated message (which should be skipped) or repeated scalar (which should be processed)
115115
if field.IsMessage {
116116
continue

db_proto/sql/postgres/types.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/streamingfast/substreams-sink-sql/bytes"
1111
"github.com/streamingfast/substreams-sink-sql/db_proto/sql/schema"
1212
v1 "github.com/streamingfast/substreams-sink-sql/pb/sf/substreams/sink/sql/schema/v1"
13+
"google.golang.org/protobuf/encoding/protojson"
1314
"google.golang.org/protobuf/reflect/protoreflect"
1415
"google.golang.org/protobuf/types/known/timestamppb"
1516
)
@@ -28,6 +29,7 @@ const (
2829
TypeVarchar DataType = "VARCHAR(255)"
2930
TypeBytea DataType = "BYTEA"
3031
TypeTimestamp DataType = "TIMESTAMP"
32+
TypeJSONB DataType = "JSONB"
3133
)
3234

3335
func (s DataType) String() string {
@@ -52,11 +54,15 @@ func MapFieldType(fd protoreflect.FieldDescriptor, bytesEncoding bytes.Encoding,
5254

5355
switch kind {
5456
case protoreflect.MessageKind:
55-
switch string(fd.Message().FullName()) {
56-
case "google.protobuf.Timestamp":
57-
baseType = TypeTimestamp
58-
default:
59-
panic(fmt.Sprintf("Message type not supported: %s", string(fd.Message().FullName())))
57+
if column.Nested != nil {
58+
baseType = TypeJSONB
59+
} else {
60+
switch string(fd.Message().FullName()) {
61+
case "google.protobuf.Timestamp":
62+
baseType = TypeTimestamp
63+
default:
64+
panic(fmt.Sprintf("Message type not supported: %s", string(fd.Message().FullName())))
65+
}
6066
}
6167
case protoreflect.BoolKind:
6268
baseType = TypeBool
@@ -160,7 +166,22 @@ func ValueToString(value any, bytesEncoding bytes.Encoding) (s string) {
160166
elements = append(elements, ValueToString(elem, bytesEncoding))
161167
}
162168
s = "array[" + strings.Join(elements, ",") + "]"
169+
case protoreflect.Message:
170+
jsonBytes, err := protojson.Marshal(v.Interface())
171+
if err != nil {
172+
panic(fmt.Sprintf("failed to marshal protobuf message to JSON: %v", err))
173+
}
174+
s = "'" + strings.ReplaceAll(strings.ReplaceAll(string(jsonBytes), "'", "''"), "\\", "\\\\") + "'"
175+
return
163176
default:
177+
if msg, ok := v.(protoreflect.ProtoMessage); ok {
178+
jsonBytes, err := protojson.Marshal(msg)
179+
if err != nil {
180+
panic(fmt.Sprintf("failed to marshal protobuf message to JSON: %v", err))
181+
}
182+
s = "'" + strings.ReplaceAll(strings.ReplaceAll(string(jsonBytes), "'", "''"), "\\", "\\\\") + "'"
183+
return
184+
}
164185
panic(fmt.Sprintf("unsupported type: %T", v))
165186
}
166187
return

db_proto/test/substreams/order/proto/test/relations/relations.proto

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ message TypesTest {
8585
string str_2_decimal256 = 105 [(schema.field) = { convertTo: { decimal256{ scale: 4 }}}];
8686

8787
// Optional uint256 field to test empty string handling
88-
optional string optional_str_2_uint256 = 106 [(schema.field) = { convertTo: { uint256{}} }];
88+
optional string optional_str_2_uint256 = 106 [(schema.field) = { convertTo: { uint256{}} }];
8989

90-
// NestedLevel1 level1 = 200 [(schema.field) = {inline: true}];
91-
// repeated NestedLevel1 list_of_level1 = 201 [(schema.field) = {inline: true}];
90+
NestedLevel1 level1 = 200 [(schema.field) = {inline: true}];
91+
repeated NestedLevel1 list_of_level1 = 201 [(schema.field) = {inline: true}];
9292

9393
}
9494

db_proto/test/substreams/order/src/lib.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -64,19 +64,19 @@ fn map_output(block: Block) -> model::relations::Output {
6464
str_2_decimal128: "17014118346046923173168.9988".to_string(),
6565
str_2_decimal256: "17014118346046923173168.9988".to_string(),
6666
optional_str_2_uint256: None,
67-
// level1: Some(NestedLevel1 {
68-
// name: "level1.name".to_string(),
69-
// desc: "level1.desc".to_string(),
70-
// }),
71-
// list_of_level1: vec![
72-
// NestedLevel1 {
73-
// name: "name.1".to_string(),
74-
// desc: "desc,1".to_string(),
75-
// },
76-
// NestedLevel1 {
77-
// name: "name.2".to_string(),
78-
// desc: "desc,2".to_string(),
79-
// }],
67+
level1: Some(NestedLevel1 {
68+
name: "level1.name".to_string(),
69+
desc: "level1.desc".to_string(),
70+
}),
71+
list_of_level1: vec![
72+
NestedLevel1 {
73+
name: "name.1".to_string(),
74+
desc: "desc,1".to_string(),
75+
},
76+
NestedLevel1 {
77+
name: "name.2".to_string(),
78+
desc: "desc,2".to_string(),
79+
}],
8080
},
8181
}),
8282
});

db_proto/test/substreams/order/src/pb/test.relations.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ pub struct TypesTest {
117117
/// Optional uint256 field to test empty string handling
118118
#[prost(string, optional, tag="106")]
119119
pub optional_str_2_uint256: ::core::option::Option<::prost::alloc::string::String>,
120+
#[prost(message, optional, tag="200")]
121+
pub level1: ::core::option::Option<NestedLevel1>,
122+
#[prost(message, repeated, tag="201")]
123+
pub list_of_level1: ::prost::alloc::vec::Vec<NestedLevel1>,
120124
}
121125
#[allow(clippy::derive_partial_eq_without_eq)]
122126
#[derive(Clone, PartialEq, ::prost::Message)]

0 commit comments

Comments
 (0)