From 3e852f26dd100f412d0f1d338397f7781bc59ff8 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Fri, 1 Nov 2024 01:43:54 +0000 Subject: [PATCH 1/8] snowflake: optimize performance of common data paths --- .../impl/snowflake/streaming/int128/int128.go | 4 +- .../snowflake/streaming/int128/int128_test.go | 16 ++++++++ internal/impl/snowflake/streaming/parquet.go | 15 ++++---- .../snowflake/streaming/userdata_converter.go | 37 +++++++++++++++++-- 4 files changed, 59 insertions(+), 13 deletions(-) diff --git a/internal/impl/snowflake/streaming/int128/int128.go b/internal/impl/snowflake/streaming/int128/int128.go index 3eedb8fd97..aba363539f 100644 --- a/internal/impl/snowflake/streaming/int128/int128.go +++ b/internal/impl/snowflake/streaming/int128/int128.go @@ -227,8 +227,8 @@ func (i Num) ToBigEndian() []byte { // AppendBigEndian converts an Int128 into big endian bytes func (i Num) AppendBigEndian(b []byte) []byte { - b = binary.BigEndian.AppendUint64(b[0:8], uint64(i.hi)) - return binary.BigEndian.AppendUint64(b[8:16], i.lo) + b = binary.BigEndian.AppendUint64(b, uint64(i.hi)) + return binary.BigEndian.AppendUint64(b, i.lo) } // ToInt64 casts an Int128 to a int64 by truncating the bytes. diff --git a/internal/impl/snowflake/streaming/int128/int128_test.go b/internal/impl/snowflake/streaming/int128/int128_test.go index 765b6ce731..a74058352d 100644 --- a/internal/impl/snowflake/streaming/int128/int128_test.go +++ b/internal/impl/snowflake/streaming/int128/int128_test.go @@ -11,8 +11,10 @@ package int128 import ( + "crypto/rand" "fmt" "math" + "slices" "testing" "github.com/stretchr/testify/require" @@ -388,3 +390,17 @@ func TestFitsInPrec(t *testing.T) { require.NoError(t, err) require.True(t, n.FitsInPrecision(38), snowflakeNumberTiny) } + +func TestToBytes(t *testing.T) { + for i := 0; i < 100; i++ { + input := make([]byte, 16) + _, err := rand.Read(input) + require.NoError(t, err) + n := FromBigEndian(input) + require.Equal(t, input, n.ToBigEndian()) + require.Equal(t, input, n.AppendBigEndian(nil)) + cloned := slices.Clone(input) + require.Equal(t, input, n.AppendBigEndian(cloned)[16:32]) + require.Equal(t, input, cloned) // Make sure cloned isn't mutated + } +} diff --git a/internal/impl/snowflake/streaming/parquet.go b/internal/impl/snowflake/streaming/parquet.go index 7b5db3868d..809fa2f0cc 100644 --- a/internal/impl/snowflake/streaming/parquet.go +++ b/internal/impl/snowflake/streaming/parquet.go @@ -21,20 +21,19 @@ import ( "github.com/segmentio/encoding/thrift" ) -func messageToRow(msg *service.Message) (map[string]any, error) { +func messageToRow(msg *service.Message, out map[string]any) error { v, err := msg.AsStructured() if err != nil { - return nil, fmt.Errorf("error extracting object from message: %w", err) + return fmt.Errorf("error extracting object from message: %w", err) } row, ok := v.(map[string]any) if !ok { - return nil, fmt.Errorf("expected object, got: %T", v) + return fmt.Errorf("expected object, got: %T", v) } - mapped := make(map[string]any, len(row)) for k, v := range row { - mapped[normalizeColumnName(k)] = v + out[normalizeColumnName(k)] = v } - return mapped, nil + return nil } // TODO: If the memory pressure is too great from writing all @@ -68,8 +67,10 @@ func constructRowGroup( // First we need to shred our record into columns, snowflake's data model // is thankfully a flat list of columns, so no dremel style record shredding // is needed + row := map[string]any{} for _, msg := range batch { - row, err := messageToRow(msg) + clear(row) + err := messageToRow(msg, row) if err != nil { return nil, err } diff --git a/internal/impl/snowflake/streaming/userdata_converter.go b/internal/impl/snowflake/streaming/userdata_converter.go index 48345694bb..7e9ee763ee 100644 --- a/internal/impl/snowflake/streaming/userdata_converter.go +++ b/internal/impl/snowflake/streaming/userdata_converter.go @@ -17,6 +17,7 @@ import ( "fmt" "time" "unicode/utf8" + "unsafe" "github.com/Jeffail/gabs/v2" "github.com/parquet-go/parquet-go" @@ -47,6 +48,13 @@ type typedBufferImpl struct { columnIndex int rowWidth int currentRow int + + // For int128 we don't make a bunch of small allocs, + // but append to this existing buffer a bunch, this + // saves GC pressure. We could optimize copies and + // reallocations, but this is simpler and seems to + // be effective for now. + scratch []byte } func (b *typedBufferImpl) WriteValue(v parquet.Value) { @@ -57,7 +65,8 @@ func (b *typedBufferImpl) WriteNull() { b.WriteValue(parquet.NullValue()) } func (b *typedBufferImpl) WriteInt128(v int128.Num) { - b.WriteValue(parquet.FixedLenByteArrayValue(v.ToBigEndian()).Level(0, 1, b.columnIndex)) + b.scratch = v.AppendBigEndian(b.scratch) + b.WriteValue(parquet.FixedLenByteArrayValue(b.scratch[len(b.scratch)-16:]).Level(0, 1, b.columnIndex)) } func (b *typedBufferImpl) WriteBool(v bool) { b.WriteValue(parquet.BooleanValue(v).Level(0, 1, b.columnIndex)) @@ -73,6 +82,9 @@ func (b *typedBufferImpl) Prepare(matrix []parquet.Value, columnIndex, rowWidth b.matrix = matrix b.columnIndex = columnIndex b.rowWidth = rowWidth + if b.scratch != nil { + b.scratch = b.scratch[:0] + } } func (b *typedBufferImpl) Reset() { b.Prepare(nil, 0, 0) @@ -258,9 +270,26 @@ func (c binaryConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typ buf.WriteNull() return nil } - v, err := bloblang.ValueAsBytes(val) - if err != nil { - return err + var v []byte + switch t := val.(type) { + case string: + if t != "" { + // We don't modify this byte slice at all, so this is safe to grab the bytes + // without making a copy. + // Also make sure this isn't an empty string because it's undefined what the + // value is. + v = unsafe.Slice(unsafe.StringData(t), len(t)) + } else { + v = []byte{} + } + case []byte: + v = t + default: + b, err := bloblang.ValueAsBytes(val) + if err != nil { + return err + } + v = b } if len(v) > c.maxLength { return fmt.Errorf("value too long, length: %d, max: %d", len(v), c.maxLength) From 49abbd3d92824d9f2066c2964f7d7857b90967e0 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Fri, 1 Nov 2024 14:29:36 +0000 Subject: [PATCH 2/8] snowflake: optimize row writing Remove a map assignment in the hot path of converting data, the loop here seems to be most of the cost, so use an array + offset lookup approach for matching values to the transformer/converter. We still *have* to have a map lookup due to the user giving us data by name (I don't think we want to encourage people to get the ordinals correct). If this loop is *still* slow we could consider parallizing the conversion here with multiple goroutines. Stats would need to be threadsafe or we'd need a converter per goroutine and merge the stats at the end (probably not too bad). --- internal/impl/snowflake/streaming/parquet.go | 47 ++++++++++++------- .../impl/snowflake/streaming/parquet_test.go | 5 +- internal/impl/snowflake/streaming/schema.go | 12 +++-- .../impl/snowflake/streaming/streaming.go | 2 +- 4 files changed, 40 insertions(+), 26 deletions(-) diff --git a/internal/impl/snowflake/streaming/parquet.go b/internal/impl/snowflake/streaming/parquet.go index 809fa2f0cc..bd57c0bf79 100644 --- a/internal/impl/snowflake/streaming/parquet.go +++ b/internal/impl/snowflake/streaming/parquet.go @@ -21,7 +21,10 @@ import ( "github.com/segmentio/encoding/thrift" ) -func messageToRow(msg *service.Message, out map[string]any) error { +// messageToRow converts a message into columnar form using the provided name to index mapping. +// We have to materialize the column into a row so that we can know if a column is null - the +// msg can be sparse, but the row must not be sparse. +func messageToRow(msg *service.Message, out []any, nameToPosition map[string]int) error { v, err := msg.AsStructured() if err != nil { return fmt.Errorf("error extracting object from message: %w", err) @@ -31,7 +34,13 @@ func messageToRow(msg *service.Message, out map[string]any) error { return fmt.Errorf("expected object, got: %T", v) } for k, v := range row { - out[normalizeColumnName(k)] = v + idx, ok := nameToPosition[normalizeColumnName(k)] + if !ok { + // TODO(schema): Unknown column, we just skip it. + // In the future we may evolve the schema based on the new data. + continue + } + out[idx] = v } return nil } @@ -46,7 +55,7 @@ func messageToRow(msg *service.Message, out map[string]any) error { func constructRowGroup( batch service.MessageBatch, schema *parquet.Schema, - transformers map[string]*dataTransformer, + transformers []*dataTransformer, ) ([]parquet.Row, error) { // We write all of our data in a columnar fashion, but need to pivot that data so that we can feed it into // out parquet library (which sadly will redo the pivot - maybe we need a lower level abstraction...). @@ -54,34 +63,36 @@ func constructRowGroup( // data to create rows of the data via an in-place transpose operation. // // TODO: Consider caching/pooling this matrix as I expect many are similarily sized. - matrix := make([]parquet.Value, len(batch)*len(schema.Fields())) rowWidth := len(schema.Fields()) - for idx, field := range schema.Fields() { - // The column index is consistent between two schemas that are the same because the schema fields are always - // in sorted order. - columnIndex := idx - t := transformers[field.Name()] - t.buf.Prepare(matrix, columnIndex, rowWidth) + matrix := make([]parquet.Value, len(batch)*rowWidth) + nameToPosition := make(map[string]int, rowWidth) + for idx, t := range transformers { + leaf, ok := schema.Lookup(t.name) + if !ok { + return nil, fmt.Errorf("invariant failed: unable to find column %q", t.name) + } + t.buf.Prepare(matrix, leaf.ColumnIndex, rowWidth) t.stats.Reset() + nameToPosition[t.name] = idx } // First we need to shred our record into columns, snowflake's data model // is thankfully a flat list of columns, so no dremel style record shredding // is needed - row := map[string]any{} + row := make([]any, rowWidth) for _, msg := range batch { - clear(row) - err := messageToRow(msg, row) + err := messageToRow(msg, row, nameToPosition) if err != nil { return nil, err } - // We **must** write a null, so iterate over the schema not the record, - // which might be sparse - for name, t := range transformers { - v := row[name] + for i, v := range row { + t := transformers[i] err = t.converter.ValidateAndConvert(t.stats, v, t.buf) if err != nil { - return nil, fmt.Errorf("invalid data for column %s: %w", name, err) + // TODO(schema): if this is a null value err then we can evolve the schema to mark it null. + return nil, fmt.Errorf("invalid data for column %s: %w", t.name, err) } + // reset the column as nil for the next row + row[i] = nil } } // Now all our values have been written to each buffer - here is where we do our matrix diff --git a/internal/impl/snowflake/streaming/parquet_test.go b/internal/impl/snowflake/streaming/parquet_test.go index 5449a7772a..3862df20c9 100644 --- a/internal/impl/snowflake/streaming/parquet_test.go +++ b/internal/impl/snowflake/streaming/parquet_test.go @@ -34,8 +34,9 @@ func TestWriteParquet(t *testing.T) { inputDataSchema := parquet.Group{ "A": parquet.Decimal(0, 18, parquet.Int32Type), } - transformers := map[string]*dataTransformer{ - "A": { + transformers := []*dataTransformer{ + { + name: "A", converter: numberConverter{ nullable: true, scale: 0, diff --git a/internal/impl/snowflake/streaming/schema.go b/internal/impl/snowflake/streaming/schema.go index 633ac0b2bf..e4ad27c158 100644 --- a/internal/impl/snowflake/streaming/schema.go +++ b/internal/impl/snowflake/streaming/schema.go @@ -27,6 +27,7 @@ type dataTransformer struct { stats *statsBuffer column *columnMetadata buf typedBuffer + name string } func convertFixedType(column columnMetadata) (parquet.Node, dataConverter, typedBuffer, error) { @@ -86,13 +87,13 @@ func convertFixedType(column columnMetadata) (parquet.Node, dataConverter, typed const maxJSONSize = 16*humanize.MiByte - 64 // See ParquetTypeGenerator -func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, map[string]*dataTransformer, map[string]string, error) { +func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, []*dataTransformer, map[string]string, error) { groupNode := parquet.Group{} - transformers := map[string]*dataTransformer{} + transformers := make([]*dataTransformer, len(columns)) // Don't write the sfVer key as it allows us to not have to narrow the numeric types in parquet. typeMetadata := map[string]string{ /*"sfVer": "1,1"*/ } var err error - for _, column := range columns { + for idx, column := range columns { id := int(column.Ordinal) var n parquet.Node var converter dataConverter @@ -209,7 +210,8 @@ func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, map[stri ) name := normalizeColumnName(column.Name) groupNode[name] = n - transformers[name] = &dataTransformer{ + transformers[idx] = &dataTransformer{ + name: name, converter: converter, stats: &statsBuffer{columnID: id}, column: &column, @@ -241,7 +243,7 @@ func (s *statsBuffer) Reset() { s.nullCount = 0 } -func computeColumnEpInfo(stats map[string]*dataTransformer) map[string]fileColumnProperties { +func computeColumnEpInfo(stats []*dataTransformer) map[string]fileColumnProperties { info := map[string]fileColumnProperties{} for _, transformer := range stats { stat := transformer.stats diff --git a/internal/impl/snowflake/streaming/streaming.go b/internal/impl/snowflake/streaming/streaming.go index 476bdefc4f..38bbe5e64c 100644 --- a/internal/impl/snowflake/streaming/streaming.go +++ b/internal/impl/snowflake/streaming/streaming.go @@ -256,7 +256,7 @@ type SnowflakeIngestionChannel struct { encryptionInfo *encryptionInfo clientSequencer int64 rowSequencer int64 - transformers map[string]*dataTransformer + transformers []*dataTransformer fileMetadata map[string]string buffer *bytes.Buffer // This is shared among the various open channels to get some uniqueness From fca12b6fb3e6f7a8b0c85f9b56710f8dba899c08 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Fri, 1 Nov 2024 15:27:33 +0000 Subject: [PATCH 3/8] snowflake: detach stats from transformers Let stats be an article of constructing the row group, instead of a persistent object that is shared and reset. There are no allocations saved by reusing stats and now we can parallelize row group construction, as there is no shared mutable state in the transformers outside of the typed buffer. --- internal/impl/snowflake/streaming/parquet.go | 16 ++-- .../impl/snowflake/streaming/parquet_test.go | 11 ++- internal/impl/snowflake/streaming/schema.go | 56 ------------- internal/impl/snowflake/streaming/stats.go | 84 +++++++++++++++++++ .../impl/snowflake/streaming/stats_test.go | 78 +++++++++++++++++ .../impl/snowflake/streaming/streaming.go | 36 ++++---- .../snowflake/streaming/userdata_converter.go | 32 +++---- 7 files changed, 214 insertions(+), 99 deletions(-) create mode 100644 internal/impl/snowflake/streaming/stats.go create mode 100644 internal/impl/snowflake/streaming/stats_test.go diff --git a/internal/impl/snowflake/streaming/parquet.go b/internal/impl/snowflake/streaming/parquet.go index bd57c0bf79..fc9d82d815 100644 --- a/internal/impl/snowflake/streaming/parquet.go +++ b/internal/impl/snowflake/streaming/parquet.go @@ -56,7 +56,7 @@ func constructRowGroup( batch service.MessageBatch, schema *parquet.Schema, transformers []*dataTransformer, -) ([]parquet.Row, error) { +) ([]parquet.Row, []*statsBuffer, error) { // We write all of our data in a columnar fashion, but need to pivot that data so that we can feed it into // out parquet library (which sadly will redo the pivot - maybe we need a lower level abstraction...). // So create a massive matrix that we will write stuff in columnar form, but then we don't need to move any @@ -66,13 +66,14 @@ func constructRowGroup( rowWidth := len(schema.Fields()) matrix := make([]parquet.Value, len(batch)*rowWidth) nameToPosition := make(map[string]int, rowWidth) + stats := make([]*statsBuffer, rowWidth) for idx, t := range transformers { leaf, ok := schema.Lookup(t.name) if !ok { - return nil, fmt.Errorf("invariant failed: unable to find column %q", t.name) + return nil, nil, fmt.Errorf("invariant failed: unable to find column %q", t.name) } t.buf.Prepare(matrix, leaf.ColumnIndex, rowWidth) - t.stats.Reset() + stats[idx] = &statsBuffer{} nameToPosition[t.name] = idx } // First we need to shred our record into columns, snowflake's data model @@ -82,14 +83,15 @@ func constructRowGroup( for _, msg := range batch { err := messageToRow(msg, row, nameToPosition) if err != nil { - return nil, err + return nil, nil, err } for i, v := range row { t := transformers[i] - err = t.converter.ValidateAndConvert(t.stats, v, t.buf) + s := stats[i] + err = t.converter.ValidateAndConvert(s, v, t.buf) if err != nil { // TODO(schema): if this is a null value err then we can evolve the schema to mark it null. - return nil, fmt.Errorf("invalid data for column %s: %w", t.name, err) + return nil, nil, fmt.Errorf("invalid data for column %s: %w", t.name, err) } // reset the column as nil for the next row row[i] = nil @@ -102,7 +104,7 @@ func constructRowGroup( rowStart := i * rowWidth rows[i] = matrix[rowStart : rowStart+rowWidth] } - return rows, nil + return rows, stats, nil } type parquetFileData struct { diff --git a/internal/impl/snowflake/streaming/parquet_test.go b/internal/impl/snowflake/streaming/parquet_test.go index 3862df20c9..ef781d681c 100644 --- a/internal/impl/snowflake/streaming/parquet_test.go +++ b/internal/impl/snowflake/streaming/parquet_test.go @@ -18,6 +18,7 @@ import ( "github.com/aws/smithy-go/ptr" "github.com/parquet-go/parquet-go" "github.com/redpanda-data/benthos/v4/public/service" + "github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming/int128" "github.com/stretchr/testify/require" ) @@ -42,7 +43,6 @@ func TestWriteParquet(t *testing.T) { scale: 0, precision: 38, }, - stats: &statsBuffer{columnID: 1}, column: &columnMetadata{ Name: "A", Ordinal: 1, @@ -57,7 +57,7 @@ func TestWriteParquet(t *testing.T) { }, } schema := parquet.NewSchema("bdec", inputDataSchema) - rows, err := constructRowGroup( + rows, stats, err := constructRowGroup( batch, schema, transformers, @@ -77,6 +77,13 @@ func TestWriteParquet(t *testing.T) { {"A": int32(2)}, {"A": int32(12353)}, }, actual) + require.Equal(t, []*statsBuffer{ + { + minIntVal: int128.FromInt64(2), + maxIntVal: int128.FromInt64(12353), + hasData: true, + }, + }, stats) } func readGeneric(r io.ReaderAt, size int64, schema *parquet.Schema) (rows []map[string]any, err error) { diff --git a/internal/impl/snowflake/streaming/schema.go b/internal/impl/snowflake/streaming/schema.go index e4ad27c158..5e289607aa 100644 --- a/internal/impl/snowflake/streaming/schema.go +++ b/internal/impl/snowflake/streaming/schema.go @@ -18,13 +18,10 @@ import ( "github.com/dustin/go-humanize" "github.com/parquet-go/parquet-go" - - "github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming/int128" ) type dataTransformer struct { converter dataConverter - stats *statsBuffer column *columnMetadata buf typedBuffer name string @@ -213,7 +210,6 @@ func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, []*dataT transformers[idx] = &dataTransformer{ name: name, converter: converter, - stats: &statsBuffer{columnID: id}, column: &column, buf: buffer, } @@ -221,58 +217,6 @@ func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, []*dataT return parquet.NewSchema("bdec", groupNode), transformers, typeMetadata, nil } -type statsBuffer struct { - columnID int - minIntVal, maxIntVal int128.Num - minRealVal, maxRealVal float64 - minStrVal, maxStrVal []byte - maxStrLen int - nullCount int64 - first bool -} - -func (s *statsBuffer) Reset() { - s.first = true - s.minIntVal = int128.FromInt64(0) - s.maxIntVal = int128.FromInt64(0) - s.minRealVal = 0 - s.maxRealVal = 0 - s.minStrVal = nil - s.maxStrVal = nil - s.maxStrLen = 0 - s.nullCount = 0 -} - -func computeColumnEpInfo(stats []*dataTransformer) map[string]fileColumnProperties { - info := map[string]fileColumnProperties{} - for _, transformer := range stats { - stat := transformer.stats - var minStrVal *string = nil - if stat.minStrVal != nil { - s := truncateBytesAsHex(stat.minStrVal, false) - minStrVal = &s - } - var maxStrVal *string = nil - if stat.maxStrVal != nil { - s := truncateBytesAsHex(stat.maxStrVal, true) - maxStrVal = &s - } - info[transformer.column.Name] = fileColumnProperties{ - ColumnOrdinal: int32(stat.columnID), - NullCount: stat.nullCount, - MinStrValue: minStrVal, - MaxStrValue: maxStrVal, - MaxLength: int64(stat.maxStrLen), - MinIntValue: stat.minIntVal, - MaxIntValue: stat.maxIntVal, - MinRealValue: stat.minRealVal, - MaxRealValue: stat.maxRealVal, - DistinctValues: -1, - } - } - return info -} - func physicalTypeOrdinal(str string) int { switch strings.ToUpper(str) { case "ROWINDEX": diff --git a/internal/impl/snowflake/streaming/stats.go b/internal/impl/snowflake/streaming/stats.go new file mode 100644 index 0000000000..789ed5bac3 --- /dev/null +++ b/internal/impl/snowflake/streaming/stats.go @@ -0,0 +1,84 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +package streaming + +import ( + "bytes" + + "github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming/int128" +) + +type statsBuffer struct { + minIntVal, maxIntVal int128.Num + minRealVal, maxRealVal float64 + minStrVal, maxStrVal []byte + maxStrLen int + nullCount int64 + hasData bool +} + +func mergeStats(a, b *statsBuffer) *statsBuffer { + c := &statsBuffer{hasData: true} + switch { + case a.hasData && b.hasData: + c.minIntVal = int128.Min(a.minIntVal, b.minIntVal) + c.maxIntVal = int128.Max(a.maxIntVal, b.maxIntVal) + c.minRealVal = min(a.minRealVal, b.minRealVal) + c.maxRealVal = max(a.maxRealVal, b.maxRealVal) + c.maxStrLen = max(a.maxStrLen, b.maxStrLen) + c.minStrVal = a.minStrVal + if bytes.Compare(b.minStrVal, a.minStrVal) < 0 { + c.minStrVal = b.minStrVal + } + c.maxStrVal = a.maxStrVal + if bytes.Compare(b.maxStrVal, a.maxStrVal) > 0 { + c.maxStrVal = b.maxStrVal + } + case a.hasData: + *c = *a + case b.hasData: + *c = *b + default: + c.hasData = false + } + c.nullCount = a.nullCount + b.nullCount + return c +} + +func computeColumnEpInfo(transformers []*dataTransformer, stats []*statsBuffer) map[string]fileColumnProperties { + info := map[string]fileColumnProperties{} + for idx, transformer := range transformers { + stat := stats[idx] + var minStrVal *string = nil + if stat.minStrVal != nil { + s := truncateBytesAsHex(stat.minStrVal, false) + minStrVal = &s + } + var maxStrVal *string = nil + if stat.maxStrVal != nil { + s := truncateBytesAsHex(stat.maxStrVal, true) + maxStrVal = &s + } + info[transformer.column.Name] = fileColumnProperties{ + ColumnOrdinal: int32(transformer.column.Ordinal), + NullCount: stat.nullCount, + MinStrValue: minStrVal, + MaxStrValue: maxStrVal, + MaxLength: int64(stat.maxStrLen), + MinIntValue: stat.minIntVal, + MaxIntValue: stat.maxIntVal, + MinRealValue: stat.minRealVal, + MaxRealValue: stat.maxRealVal, + DistinctValues: -1, + } + } + return info +} diff --git a/internal/impl/snowflake/streaming/stats_test.go b/internal/impl/snowflake/streaming/stats_test.go new file mode 100644 index 0000000000..2ad8e600bf --- /dev/null +++ b/internal/impl/snowflake/streaming/stats_test.go @@ -0,0 +1,78 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +package streaming + +import ( + "testing" + + "github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming/int128" + "github.com/stretchr/testify/require" +) + +func TestMergeInt(t *testing.T) { + s := mergeStats(&statsBuffer{ + minIntVal: int128.FromInt64(-1), + maxIntVal: int128.FromInt64(4), + hasData: true, + }, &statsBuffer{ + minIntVal: int128.FromInt64(3), + maxIntVal: int128.FromInt64(5), + hasData: true, + }) + require.Equal(t, &statsBuffer{ + minIntVal: int128.FromInt64(-1), + maxIntVal: int128.FromInt64(5), + hasData: true, + }, s) +} + +func TestMergeReal(t *testing.T) { + s := mergeStats(&statsBuffer{ + minRealVal: -1.2, + maxRealVal: 4.5, + nullCount: 4, + hasData: true, + }, &statsBuffer{ + minRealVal: 3.4, + maxRealVal: 5.9, + nullCount: 2, + hasData: true, + }) + require.Equal(t, &statsBuffer{ + minRealVal: -1.2, + maxRealVal: 5.9, + nullCount: 6, + hasData: true, + }, s) +} + +func TestMergeStr(t *testing.T) { + s := mergeStats(&statsBuffer{ + minStrVal: []byte("aa"), + maxStrVal: []byte("bbbb"), + maxStrLen: 6, + nullCount: 1, + hasData: true, + }, &statsBuffer{ + minStrVal: []byte("aaaa"), + maxStrVal: []byte("cccccc"), + maxStrLen: 24, + nullCount: 1, + hasData: true, + }) + require.Equal(t, &statsBuffer{ + minStrVal: []byte("aa"), + maxStrVal: []byte("cccccc"), + maxStrLen: 24, + nullCount: 2, + hasData: true, + }, s) +} diff --git a/internal/impl/snowflake/streaming/streaming.go b/internal/impl/snowflake/streaming/streaming.go index 38bbe5e64c..22d29bd0c3 100644 --- a/internal/impl/snowflake/streaming/streaming.go +++ b/internal/impl/snowflake/streaming/streaming.go @@ -279,11 +279,11 @@ type InsertStats struct { // InsertRows creates a parquet file using the schema from the data, // then writes that file into the Snowflake table func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch service.MessageBatch) (InsertStats, error) { - stats := InsertStats{} + insertStats := InsertStats{} startTime := time.Now() - rows, err := constructRowGroup(batch, c.schema, c.transformers) + rows, dataStats, err := constructRowGroup(batch, c.schema, c.transformers) if err != nil { - return stats, err + return insertStats, err } // Prevent multiple channels from having the same bdec file (it must be unique) // so add the ID of the channel in the upper 16 bits and then get 48 bits of @@ -299,12 +299,12 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic metadata: c.fileMetadata, }) if err != nil { - return stats, err + return insertStats, err } unencrypted := c.buffer.Bytes() metadata, err := readParquetMetadata(unencrypted) if err != nil { - return stats, fmt.Errorf("unable to parse parquet metadata: %w", err) + return insertStats, fmt.Errorf("unable to parse parquet metadata: %w", err) } if debug { _ = os.WriteFile("latest_test.parquet", unencrypted, 0o644) @@ -313,24 +313,24 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic unencrypted = padBuffer(unencrypted, aes.BlockSize) encrypted, err := encrypt(unencrypted, c.encryptionInfo.encryptionKey, blobPath, 0) if err != nil { - return stats, err + return insertStats, err } uploadStartTime := time.Now() fileMD5Hash := md5.Sum(encrypted) uploaderResult := c.uploader.Load() if uploaderResult.err != nil { - return stats, fmt.Errorf("failed to acquire stage uploader: %w", uploaderResult.err) + return insertStats, fmt.Errorf("failed to acquire stage uploader: %w", uploaderResult.err) } uploader := uploaderResult.uploader err = backoff.Retry(func() error { return uploader.upload(ctx, blobPath, encrypted, fileMD5Hash[:]) }, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), 3)) if err != nil { - return stats, err + return insertStats, err } uploadFinishTime := time.Now() - columnEpInfo := computeColumnEpInfo(c.transformers) + columnEpInfo := computeColumnEpInfo(c.transformers, dataStats) resp, err := c.client.registerBlob(ctx, registerBlobRequest{ RequestID: c.nextRequestID(), Role: c.role, @@ -376,18 +376,18 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic }, }) if err != nil { - return stats, err + return insertStats, err } if len(resp.Blobs) != 1 { - return stats, fmt.Errorf("unexpected number of response blobs: %d", len(resp.Blobs)) + return insertStats, fmt.Errorf("unexpected number of response blobs: %d", len(resp.Blobs)) } status := resp.Blobs[0] if len(status.Chunks) != 1 { - return stats, fmt.Errorf("unexpected number of response blob chunks: %d", len(status.Chunks)) + return insertStats, fmt.Errorf("unexpected number of response blob chunks: %d", len(status.Chunks)) } chunk := status.Chunks[0] if len(chunk.Channels) != 1 { - return stats, fmt.Errorf("unexpected number of channels for blob chunk: %d", len(chunk.Channels)) + return insertStats, fmt.Errorf("unexpected number of channels for blob chunk: %d", len(chunk.Channels)) } channel := chunk.Channels[0] if channel.StatusCode != responseSuccess { @@ -395,14 +395,14 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic if msg == "" { msg = "(no message)" } - return stats, fmt.Errorf("error response injesting data (%d): %s", channel.StatusCode, msg) + return insertStats, fmt.Errorf("error response injesting data (%d): %s", channel.StatusCode, msg) } c.rowSequencer++ c.clientSequencer = channel.ClientSequencer - stats.CompressedOutputSize = unencryptedLen - stats.BuildTime = uploadStartTime.Sub(startTime) - stats.UploadTime = uploadFinishTime.Sub(uploadStartTime) - return stats, err + insertStats.CompressedOutputSize = unencryptedLen + insertStats.BuildTime = uploadStartTime.Sub(startTime) + insertStats.UploadTime = uploadFinishTime.Sub(uploadStartTime) + return insertStats, err } // WaitUntilCommitted waits until all the data in the channel has been committed diff --git a/internal/impl/snowflake/streaming/userdata_converter.go b/internal/impl/snowflake/streaming/userdata_converter.go index 7e9ee763ee..dca8692eb1 100644 --- a/internal/impl/snowflake/streaming/userdata_converter.go +++ b/internal/impl/snowflake/streaming/userdata_converter.go @@ -133,10 +133,10 @@ func (c boolConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typed if v { i = int128.FromUint64(1) } - if stats.first { + if !stats.hasData { stats.minIntVal = i stats.maxIntVal = i - stats.first = false + stats.hasData = true } else { stats.minIntVal = int128.Min(stats.minIntVal, i) stats.maxIntVal = int128.Max(stats.maxIntVal, i) @@ -214,10 +214,10 @@ func (c numberConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typ if err != nil { return err } - if stats.first { + if !stats.hasData { stats.minIntVal = v stats.maxIntVal = v - stats.first = false + stats.hasData = true } else { stats.minIntVal = int128.Min(stats.minIntVal, v) stats.maxIntVal = int128.Max(stats.maxIntVal, v) @@ -243,10 +243,10 @@ func (c doubleConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typ if err != nil { return err } - if stats.first { + if !stats.hasData { stats.minRealVal = v stats.maxRealVal = v - stats.first = false + stats.hasData = true } else { stats.minRealVal = min(stats.minRealVal, v) stats.maxRealVal = max(stats.maxRealVal, v) @@ -297,11 +297,11 @@ func (c binaryConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typ if c.utf8 && !utf8.Valid(v) { return errors.New("invalid UTF8") } - if stats.first { + if !stats.hasData { stats.minStrVal = v stats.maxStrVal = v stats.maxStrLen = len(v) - stats.first = false + stats.hasData = true } else { if bytes.Compare(v, stats.minStrVal) < 0 { stats.minStrVal = v @@ -333,11 +333,11 @@ func (c jsonConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typed if len(v) > c.maxLength { return fmt.Errorf("value too long, length: %d, max: %d", len(v), c.maxLength) } - if stats.first { + if !stats.hasData { stats.minStrVal = v stats.maxStrVal = v stats.maxStrLen = len(v) - stats.first = false + stats.hasData = true } else { if bytes.Compare(v, stats.minStrVal) < 0 { stats.minStrVal = v @@ -427,10 +427,10 @@ func (c timestampConverter) ValidateAndConvert(stats *statsBuffer, val any, buf c.precision, ) } - if stats.first { + if !stats.hasData { stats.minIntVal = v stats.maxIntVal = v - stats.first = false + stats.hasData = true } else { stats.minIntVal = int128.Min(stats.minIntVal, v) stats.maxIntVal = int128.Max(stats.maxIntVal, v) @@ -464,10 +464,10 @@ func (c timeConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typed t.Second()*int(time.Second.Nanoseconds()) + t.Nanosecond() v := int128.FromInt64(int64(nanos) / pow10TableInt64[9-c.scale]) - if stats.first { + if !stats.hasData { stats.minIntVal = v stats.maxIntVal = v - stats.first = false + stats.hasData = true } else { stats.minIntVal = int128.Min(stats.minIntVal, v) stats.maxIntVal = int128.Max(stats.maxIntVal, v) @@ -499,10 +499,10 @@ func (c dateConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typed return fmt.Errorf("DATE columns out of range, year: %d", t.Year()) } v := int128.FromInt64(t.Unix() / int64(24*60*60)) - if stats.first { + if !stats.hasData { stats.minIntVal = v stats.maxIntVal = v - stats.first = false + stats.hasData = true } else { stats.minIntVal = int128.Min(stats.minIntVal, v) stats.maxIntVal = int128.Max(stats.maxIntVal, v) From 9a919ee95c5dd608838789409c7e687bf943e77c Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Fri, 1 Nov 2024 18:13:50 +0000 Subject: [PATCH 4/8] snowflake: parallelize building of parquet rows There is a lot of latency to build a parquet file, we can speed things up by doing the most expensive part of `constructRowGroup` in parallel. The actual construction of the parquet is still expensive, especially in the compression. We can't parallelize that with this library, but that is the next step if this is still slow. Hopefully this will help max out CPU in the benchmarks I'm running. --- internal/impl/snowflake/streaming/parquet.go | 24 ++-- .../impl/snowflake/streaming/parquet_test.go | 9 +- internal/impl/snowflake/streaming/schema.go | 54 ++++---- internal/impl/snowflake/streaming/stats.go | 2 +- .../impl/snowflake/streaming/streaming.go | 120 +++++++++++++----- .../snowflake/streaming/userdata_converter.go | 12 +- 6 files changed, 132 insertions(+), 89 deletions(-) diff --git a/internal/impl/snowflake/streaming/parquet.go b/internal/impl/snowflake/streaming/parquet.go index fc9d82d815..873ac9f02c 100644 --- a/internal/impl/snowflake/streaming/parquet.go +++ b/internal/impl/snowflake/streaming/parquet.go @@ -11,9 +11,9 @@ package streaming import ( + "bytes" "encoding/binary" "fmt" - "io" "github.com/parquet-go/parquet-go" "github.com/parquet-go/parquet-go/format" @@ -45,13 +45,6 @@ func messageToRow(msg *service.Message, out []any, nameToPosition map[string]int return nil } -// TODO: If the memory pressure is too great from writing all -// records buffered as a single row group, then consider -// return some kind of iterator of chunks of rows that we can -// then feed into the actual parquet construction process. -// -// If a single parquet file is too much, we can consider having multiple -// parquet files in a single bdec file. func constructRowGroup( batch service.MessageBatch, schema *parquet.Schema, @@ -67,12 +60,14 @@ func constructRowGroup( matrix := make([]parquet.Value, len(batch)*rowWidth) nameToPosition := make(map[string]int, rowWidth) stats := make([]*statsBuffer, rowWidth) + buffers := make([]typedBuffer, rowWidth) for idx, t := range transformers { leaf, ok := schema.Lookup(t.name) if !ok { return nil, nil, fmt.Errorf("invariant failed: unable to find column %q", t.name) } - t.buf.Prepare(matrix, leaf.ColumnIndex, rowWidth) + buffers[idx] = t.bufferFactory() + buffers[idx].Prepare(matrix, leaf.ColumnIndex, rowWidth) stats[idx] = &statsBuffer{} nameToPosition[t.name] = idx } @@ -88,7 +83,8 @@ func constructRowGroup( for i, v := range row { t := transformers[i] s := stats[i] - err = t.converter.ValidateAndConvert(s, v, t.buf) + b := buffers[i] + err = t.converter.ValidateAndConvert(s, v, b) if err != nil { // TODO(schema): if this is a null value err then we can evolve the schema to mark it null. return nil, nil, fmt.Errorf("invalid data for column %s: %w", t.name, err) @@ -113,9 +109,10 @@ type parquetFileData struct { metadata map[string]string } -func writeParquetFile(writer io.Writer, rpcnVersion string, data parquetFileData) (err error) { - pw := parquet.NewGenericWriter[map[string]any]( - writer, +func writeParquetFile(rpcnVersion string, data parquetFileData) (out []byte, err error) { + b := bytes.NewBuffer(nil) + pw := parquet.NewGenericWriter[any]( + b, data.schema, parquet.CreatedBy("RedpandaConnect", rpcnVersion, "unknown"), // Recommended by the Snowflake team to enable data page stats @@ -135,6 +132,7 @@ func writeParquetFile(writer io.Writer, rpcnVersion string, data parquetFileData return } err = pw.Close() + out = b.Bytes() return } diff --git a/internal/impl/snowflake/streaming/parquet_test.go b/internal/impl/snowflake/streaming/parquet_test.go index ef781d681c..27c2581345 100644 --- a/internal/impl/snowflake/streaming/parquet_test.go +++ b/internal/impl/snowflake/streaming/parquet_test.go @@ -27,7 +27,6 @@ func msg(s string) *service.Message { } func TestWriteParquet(t *testing.T) { - b := bytes.NewBuffer(nil) batch := service.MessageBatch{ msg(`{"a":2}`), msg(`{"a":12353}`), @@ -53,7 +52,7 @@ func TestWriteParquet(t *testing.T) { Scale: ptr.Int32(0), Nullable: true, }, - buf: &int32Buffer{}, + bufferFactory: int32TypedBufferFactory, }, } schema := parquet.NewSchema("bdec", inputDataSchema) @@ -63,13 +62,13 @@ func TestWriteParquet(t *testing.T) { transformers, ) require.NoError(t, err) - err = writeParquetFile(b, "latest", parquetFileData{ + b, err := writeParquetFile("latest", parquetFileData{ schema, rows, nil, }) require.NoError(t, err) actual, err := readGeneric( - bytes.NewReader(b.Bytes()), - int64(b.Len()), + bytes.NewReader(b), + int64(len(b)), parquet.NewSchema("bdec", inputDataSchema), ) require.NoError(t, err) diff --git a/internal/impl/snowflake/streaming/schema.go b/internal/impl/snowflake/streaming/schema.go index 5e289607aa..54d10393d2 100644 --- a/internal/impl/snowflake/streaming/schema.go +++ b/internal/impl/snowflake/streaming/schema.go @@ -21,13 +21,13 @@ import ( ) type dataTransformer struct { - converter dataConverter - column *columnMetadata - buf typedBuffer - name string + converter dataConverter + column *columnMetadata + bufferFactory typedBufferFactory + name string } -func convertFixedType(column columnMetadata) (parquet.Node, dataConverter, typedBuffer, error) { +func convertFixedType(column columnMetadata) (parquet.Node, dataConverter, typedBufferFactory, error) { var scale int32 var precision int32 if column.Scale != nil { @@ -39,7 +39,7 @@ func convertFixedType(column columnMetadata) (parquet.Node, dataConverter, typed isDecimal := column.Scale != nil && column.Precision != nil if (column.Scale != nil && *column.Scale != 0) || strings.ToUpper(column.PhysicalType) == "SB16" { c := numberConverter{nullable: column.Nullable, scale: scale, precision: precision} - b := &typedBufferImpl{} + b := defaultTypedBufferFactory t := parquet.FixedLenByteArrayType(16) if isDecimal { return parquet.Decimal(int(scale), int(precision), t), c, b, nil @@ -48,24 +48,24 @@ func convertFixedType(column columnMetadata) (parquet.Node, dataConverter, typed } var ptype parquet.Type var defaultPrecision int32 - var buffer typedBuffer + var bufferFactory typedBufferFactory switch strings.ToUpper(column.PhysicalType) { case "SB1": ptype = parquet.Int32Type defaultPrecision = maxPrecisionForByteWidth(1) - buffer = &int32Buffer{} + bufferFactory = int32TypedBufferFactory case "SB2": ptype = parquet.Int32Type defaultPrecision = maxPrecisionForByteWidth(2) - buffer = &int32Buffer{} + bufferFactory = int32TypedBufferFactory case "SB4": ptype = parquet.Int32Type defaultPrecision = maxPrecisionForByteWidth(4) - buffer = &int32Buffer{} + bufferFactory = int32TypedBufferFactory case "SB8": ptype = parquet.Int64Type defaultPrecision = maxPrecisionForByteWidth(8) - buffer = &int64Buffer{} + bufferFactory = int64TypedBufferFactory default: return nil, nil, nil, fmt.Errorf("unsupported physical column type: %s", column.PhysicalType) } @@ -75,9 +75,9 @@ func convertFixedType(column columnMetadata) (parquet.Node, dataConverter, typed } c := numberConverter{nullable: column.Nullable, scale: scale, precision: validationPrecision} if isDecimal { - return parquet.Decimal(int(scale), int(precision), ptype), c, buffer, nil + return parquet.Decimal(int(scale), int(precision), ptype), c, bufferFactory, nil } - return parquet.Leaf(ptype), c, buffer, nil + return parquet.Leaf(ptype), c, bufferFactory, nil } // maxJSONSize is the size that any kind of semi-structured data can be, which is 16MiB minus a small overhead @@ -94,11 +94,11 @@ func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, []*dataT id := int(column.Ordinal) var n parquet.Node var converter dataConverter - var buffer typedBuffer + var bufferFactory typedBufferFactory = defaultTypedBufferFactory logicalType := strings.ToLower(column.LogicalType) switch logicalType { case "fixed": - n, converter, buffer, err = convertFixedType(column) + n, converter, bufferFactory, err = convertFixedType(column) if err != nil { return nil, nil, nil, err } @@ -106,17 +106,14 @@ func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, []*dataT typeMetadata[fmt.Sprintf("%d:obj_enc", id)] = "1" n = parquet.String() converter = jsonArrayConverter{jsonConverter{column.Nullable, maxJSONSize}} - buffer = &typedBufferImpl{} case "object": typeMetadata[fmt.Sprintf("%d:obj_enc", id)] = "1" n = parquet.String() converter = jsonObjectConverter{jsonConverter{column.Nullable, maxJSONSize}} - buffer = &typedBufferImpl{} case "variant": typeMetadata[fmt.Sprintf("%d:obj_enc", id)] = "1" n = parquet.String() converter = jsonConverter{column.Nullable, maxJSONSize} - buffer = &typedBufferImpl{} case "any", "text", "char": n = parquet.String() byteLength := 16 * humanize.MiByte @@ -125,7 +122,6 @@ func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, []*dataT } byteLength = min(byteLength, 16*humanize.MiByte) converter = binaryConverter{nullable: column.Nullable, maxLength: byteLength, utf8: true} - buffer = &typedBufferImpl{} case "binary": n = parquet.Leaf(parquet.ByteArrayType) // Why binary data defaults to 8MiB instead of the 16MiB for strings... ¯\_(ツ)_/¯ @@ -135,26 +131,22 @@ func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, []*dataT } byteLength = min(byteLength, 16*humanize.MiByte) converter = binaryConverter{nullable: column.Nullable, maxLength: byteLength} - buffer = &typedBufferImpl{} case "boolean": n = parquet.Leaf(parquet.BooleanType) converter = boolConverter{column.Nullable} - buffer = &typedBufferImpl{} case "real": n = parquet.Leaf(parquet.DoubleType) converter = doubleConverter{column.Nullable} - buffer = &typedBufferImpl{} case "timestamp_tz", "timestamp_ltz", "timestamp_ntz": var scale, precision int32 var pt parquet.Type if column.PhysicalType == "SB8" { pt = parquet.Int64Type precision = maxPrecisionForByteWidth(8) - buffer = &int64Buffer{} + bufferFactory = int64TypedBufferFactory } else { pt = parquet.FixedLenByteArrayType(16) precision = maxPrecisionForByteWidth(16) - buffer = &typedBufferImpl{} } if column.Scale != nil { scale = *column.Scale @@ -174,11 +166,11 @@ func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, []*dataT case "time": t := parquet.Int32Type precision := 9 - buffer = &int32Buffer{} + bufferFactory = int32TypedBufferFactory if column.PhysicalType == "SB8" { t = parquet.Int64Type precision = 18 - buffer = &int64Buffer{} + bufferFactory = int64TypedBufferFactory } scale := int32(9) if column.Scale != nil { @@ -189,7 +181,7 @@ func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, []*dataT case "date": n = parquet.Leaf(parquet.Int32Type) converter = dateConverter{column.Nullable} - buffer = &int32Buffer{} + bufferFactory = int32TypedBufferFactory default: return nil, nil, nil, fmt.Errorf("unsupported logical column type: %s", column.LogicalType) } @@ -208,10 +200,10 @@ func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, []*dataT name := normalizeColumnName(column.Name) groupNode[name] = n transformers[idx] = &dataTransformer{ - name: name, - converter: converter, - column: &column, - buf: buffer, + name: name, + converter: converter, + column: &column, + bufferFactory: bufferFactory, } } return parquet.NewSchema("bdec", groupNode), transformers, typeMetadata, nil diff --git a/internal/impl/snowflake/streaming/stats.go b/internal/impl/snowflake/streaming/stats.go index 789ed5bac3..7034b98909 100644 --- a/internal/impl/snowflake/streaming/stats.go +++ b/internal/impl/snowflake/streaming/stats.go @@ -68,7 +68,7 @@ func computeColumnEpInfo(transformers []*dataTransformer, stats []*statsBuffer) maxStrVal = &s } info[transformer.column.Name] = fileColumnProperties{ - ColumnOrdinal: int32(transformer.column.Ordinal), + ColumnOrdinal: transformer.column.Ordinal, NullCount: stat.nullCount, MinStrValue: minStrVal, MaxStrValue: maxStrVal, diff --git a/internal/impl/snowflake/streaming/streaming.go b/internal/impl/snowflake/streaming/streaming.go index 22d29bd0c3..56e5a5e57e 100644 --- a/internal/impl/snowflake/streaming/streaming.go +++ b/internal/impl/snowflake/streaming/streaming.go @@ -11,7 +11,6 @@ package streaming import ( - "bytes" "context" "crypto/aes" "crypto/md5" @@ -26,7 +25,9 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/parquet-go/parquet-go" + "github.com/parquet-go/parquet-go/format" "github.com/redpanda-data/benthos/v4/public/service" + "golang.org/x/sync/errgroup" "github.com/redpanda-data/connect/v4/internal/periodic" "github.com/redpanda-data/connect/v4/internal/typed" @@ -186,7 +187,6 @@ func (c *SnowflakeServiceClient) OpenChannel(ctx context.Context, opts ChannelOp rowSequencer: resp.RowSequencer, transformers: transformers, fileMetadata: typeMetadata, - buffer: bytes.NewBuffer(nil), requestIDCounter: c.requestIDCounter, } return ch, nil @@ -258,7 +258,6 @@ type SnowflakeIngestionChannel struct { rowSequencer int64 transformers []*dataTransformer fileMetadata map[string]string - buffer *bytes.Buffer // This is shared among the various open channels to get some uniqueness // when naming bdec files requestIDCounter *atomic.Int64 @@ -276,68 +275,119 @@ type InsertStats struct { CompressedOutputSize int } +type bdecPart struct { + unencryptedLen int + parquetFile []byte + parquetMetadata format.FileMetaData + stats []*statsBuffer +} + +func (c *SnowflakeIngestionChannel) constructBdecPart(batch service.MessageBatch, metadata map[string]string) (bdecPart, error) { + wg := &errgroup.Group{} + type rowGroup struct { + rows []parquet.Row + stats []*statsBuffer + } + rowGroups := []rowGroup{} + const maxRowGroupSize = 50_000 + for i := 0; i < len(batch); i += maxRowGroupSize { + end := min(maxRowGroupSize, len(batch[i:])) + j := len(rowGroups) + rowGroups = append(rowGroups, rowGroup{}) + chunk := batch[i : i+end] + wg.Go(func() error { + rows, stats, err := constructRowGroup(chunk, c.schema, c.transformers) + rowGroups[j] = rowGroup{rows, stats} + return err + }) + } + if err := wg.Wait(); err != nil { + return bdecPart{}, err + } + allRows := make([]parquet.Row, 0, len(batch)) + combinedStats := make([]*statsBuffer, len(c.schema.Fields())) + for i := range combinedStats { + combinedStats[i] = &statsBuffer{} + } + for _, rg := range rowGroups { + allRows = append(allRows, rg.rows...) + for i, s := range combinedStats { + combinedStats[i] = mergeStats(s, rg.stats[i]) + } + } + // TODO(perf): It would be really nice to be able to compress in parallel, + // that actually ends up taking quite of bit of CPU. + buf, err := writeParquetFile(c.version, parquetFileData{ + schema: c.schema, + rows: allRows, + metadata: metadata, + }) + if err != nil { + return bdecPart{}, err + } + fileMetadata, err := readParquetMetadata(buf) + if err != nil { + return bdecPart{}, fmt.Errorf("unable to parse parquet metadata: %w", err) + } + return bdecPart{ + unencryptedLen: len(buf), + parquetFile: buf, + parquetMetadata: fileMetadata, + stats: combinedStats, + }, err +} + // InsertRows creates a parquet file using the schema from the data, // then writes that file into the Snowflake table func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch service.MessageBatch) (InsertStats, error) { insertStats := InsertStats{} - startTime := time.Now() - rows, dataStats, err := constructRowGroup(batch, c.schema, c.transformers) - if err != nil { - return insertStats, err + if len(batch) == 0 { + return insertStats, nil } - // Prevent multiple channels from having the same bdec file (it must be unique) - // so add the ID of the channel in the upper 16 bits and then get 48 bits of - // randomness outside that. + + startTime := time.Now() + // Prevent multiple channels from having the same bdec file (it must be globally unique) + // so add the ID of the channel in the upper 16 bits and then get 48 bits of randomness outside that. fakeThreadID := (int(c.ID) << 48) | rand.N(1<<48) blobPath := generateBlobPath(c.clientPrefix, fakeThreadID, int(c.requestIDCounter.Add(1))) // This is extra metadata that is required for functionality in snowflake. c.fileMetadata["primaryFileId"] = path.Base(blobPath) - c.buffer.Reset() - err = writeParquetFile(c.buffer, c.version, parquetFileData{ - schema: c.schema, - rows: rows, - metadata: c.fileMetadata, - }) + part, err := c.constructBdecPart(batch, c.fileMetadata) if err != nil { return insertStats, err } - unencrypted := c.buffer.Bytes() - metadata, err := readParquetMetadata(unencrypted) - if err != nil { - return insertStats, fmt.Errorf("unable to parse parquet metadata: %w", err) - } if debug { - _ = os.WriteFile("latest_test.parquet", unencrypted, 0o644) + _ = os.WriteFile("latest_test.parquet", part.parquetFile, 0o644) } - unencryptedLen := len(unencrypted) - unencrypted = padBuffer(unencrypted, aes.BlockSize) - encrypted, err := encrypt(unencrypted, c.encryptionInfo.encryptionKey, blobPath, 0) + + unencrypted := padBuffer(part.parquetFile, aes.BlockSize) + part.parquetFile, err = encrypt(unencrypted, c.encryptionInfo.encryptionKey, blobPath, 0) if err != nil { return insertStats, err } + uploadStartTime := time.Now() - fileMD5Hash := md5.Sum(encrypted) uploaderResult := c.uploader.Load() if uploaderResult.err != nil { return insertStats, fmt.Errorf("failed to acquire stage uploader: %w", uploaderResult.err) } uploader := uploaderResult.uploader + fullMD5Hash := md5.Sum(part.parquetFile) err = backoff.Retry(func() error { - return uploader.upload(ctx, blobPath, encrypted, fileMD5Hash[:]) + return uploader.upload(ctx, blobPath, part.parquetFile, fullMD5Hash[:]) }, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), 3)) if err != nil { return insertStats, err } uploadFinishTime := time.Now() - columnEpInfo := computeColumnEpInfo(c.transformers, dataStats) resp, err := c.client.registerBlob(ctx, registerBlobRequest{ RequestID: c.nextRequestID(), Role: c.role, Blobs: []blobMetadata{ { Path: blobPath, - MD5: hex.EncodeToString(fileMD5Hash[:]), + MD5: hex.EncodeToString(fullMD5Hash[:]), BDECVersion: 3, BlobStats: blobStats{ FlushStartMs: startTime.UnixMilli(), @@ -350,15 +400,15 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic Schema: c.SchemaName, Table: c.TableName, ChunkStartOffset: 0, - ChunkLength: int32(unencryptedLen), - ChunkLengthUncompressed: totalUncompressedSize(metadata), - ChunkMD5: md5Hash(encrypted[:unencryptedLen]), + ChunkLength: int32(part.unencryptedLen), + ChunkLengthUncompressed: totalUncompressedSize(part.parquetMetadata), + ChunkMD5: md5Hash(part.parquetFile[:part.unencryptedLen]), EncryptionKeyID: c.encryptionInfo.encryptionKeyID, FirstInsertTimeInMillis: startTime.UnixMilli(), LastInsertTimeInMillis: startTime.UnixMilli(), EPS: &epInfo{ - Rows: metadata.NumRows, - Columns: columnEpInfo, + Rows: part.parquetMetadata.NumRows, + Columns: computeColumnEpInfo(c.transformers, part.stats), }, Channels: []channelMetadata{ { @@ -399,7 +449,7 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic } c.rowSequencer++ c.clientSequencer = channel.ClientSequencer - insertStats.CompressedOutputSize = unencryptedLen + insertStats.CompressedOutputSize = part.unencryptedLen insertStats.BuildTime = uploadStartTime.Sub(startTime) insertStats.UploadTime = uploadFinishTime.Sub(uploadStartTime) return insertStats, err diff --git a/internal/impl/snowflake/streaming/userdata_converter.go b/internal/impl/snowflake/streaming/userdata_converter.go index dca8692eb1..27eb73e7f3 100644 --- a/internal/impl/snowflake/streaming/userdata_converter.go +++ b/internal/impl/snowflake/streaming/userdata_converter.go @@ -26,6 +26,8 @@ import ( "github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming/int128" ) +type typedBufferFactory func() typedBuffer + // typedBuffer is the buffer that holds columnar data before we write to the parquet file type typedBuffer interface { WriteNull() @@ -40,7 +42,6 @@ type typedBuffer interface { // the data that will be written - this buffer will not modify // the size of the data. Prepare(matrix []parquet.Value, columnIndex, rowWidth int) - Reset() } type typedBufferImpl struct { @@ -86,9 +87,8 @@ func (b *typedBufferImpl) Prepare(matrix []parquet.Value, columnIndex, rowWidth b.scratch = b.scratch[:0] } } -func (b *typedBufferImpl) Reset() { - b.Prepare(nil, 0, 0) -} + +var defaultTypedBufferFactory = typedBufferFactory(func() typedBuffer { return &typedBufferImpl{} }) type int64Buffer struct { typedBufferImpl @@ -98,6 +98,8 @@ func (b *int64Buffer) WriteInt128(v int128.Num) { b.WriteValue(parquet.Int64Value(v.ToInt64()).Level(0, 1, b.columnIndex)) } +var int64TypedBufferFactory = typedBufferFactory(func() typedBuffer { return &int64Buffer{} }) + type int32Buffer struct { typedBufferImpl } @@ -110,6 +112,8 @@ type dataConverter interface { ValidateAndConvert(stats *statsBuffer, val any, buf typedBuffer) error } +var int32TypedBufferFactory = typedBufferFactory(func() typedBuffer { return &int32Buffer{} }) + var errNullValue = errors.New("unexpected null value") type boolConverter struct { From 40e042a24daccf975ebf43d584a444bf8f0d8282 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Fri, 1 Nov 2024 18:43:26 +0000 Subject: [PATCH 5/8] snowflake: add timing metrics for conversion and serialization --- .../impl/snowflake/output_snowflake_streaming.go | 14 +++++++++++--- internal/impl/snowflake/streaming/streaming.go | 12 ++++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/internal/impl/snowflake/output_snowflake_streaming.go b/internal/impl/snowflake/output_snowflake_streaming.go index 9ba37daa70..65908ff14f 100644 --- a/internal/impl/snowflake/output_snowflake_streaming.go +++ b/internal/impl/snowflake/output_snowflake_streaming.go @@ -331,6 +331,8 @@ func newSnowflakeStreamer( logger: mgr.Logger(), buildTime: mgr.Metrics().NewTimer("snowflake_build_output_latency_ns"), uploadTime: mgr.Metrics().NewTimer("snowflake_upload_latency_ns"), + convertTime: mgr.Metrics().NewTimer("snowflake_convert_latency_ns"), + serializeTime: mgr.Metrics().NewTimer("snowflake_serialize_latency_ns"), compressedOutput: mgr.Metrics().NewCounter("snowflake_compressed_output_size_bytes"), initStatementsFn: initStatementsFn, } @@ -345,6 +347,8 @@ type snowflakeStreamerOutput struct { compressedOutput *service.MetricCounter uploadTime *service.MetricTimer buildTime *service.MetricTimer + convertTime *service.MetricTimer + serializeTime *service.MetricTimer channelPrefix, db, schema, table string mapping *bloblang.Executor @@ -416,9 +420,13 @@ func (o *snowflakeStreamerOutput) WriteBatch(ctx context.Context, batch service. } } stats, err := channel.InsertRows(ctx, batch) - o.compressedOutput.Incr(int64(stats.CompressedOutputSize)) - o.uploadTime.Timing(stats.UploadTime.Nanoseconds()) - o.buildTime.Timing(stats.BuildTime.Nanoseconds()) + if err == nil { + o.compressedOutput.Incr(int64(stats.CompressedOutputSize)) + o.uploadTime.Timing(stats.UploadTime.Nanoseconds()) + o.buildTime.Timing(stats.BuildTime.Nanoseconds()) + o.convertTime.Timing(stats.ConvertTime.Nanoseconds()) + o.serializeTime.Timing(stats.SerializeTime.Nanoseconds()) + } // If there is some kind of failure, try to reopen the channel if err != nil { reopened, reopenErr := o.openChannel(ctx, channel.Name, channel.ID) diff --git a/internal/impl/snowflake/streaming/streaming.go b/internal/impl/snowflake/streaming/streaming.go index 56e5a5e57e..8936906708 100644 --- a/internal/impl/snowflake/streaming/streaming.go +++ b/internal/impl/snowflake/streaming/streaming.go @@ -24,6 +24,7 @@ import ( "time" "github.com/cenkalti/backoff/v4" + "github.com/dustin/go-humanize" "github.com/parquet-go/parquet-go" "github.com/parquet-go/parquet-go/format" "github.com/redpanda-data/benthos/v4/public/service" @@ -271,6 +272,8 @@ func (c *SnowflakeIngestionChannel) nextRequestID() string { // InsertStats holds some basic statistics about the InsertRows operation type InsertStats struct { BuildTime time.Duration + ConvertTime time.Duration + SerializeTime time.Duration UploadTime time.Duration CompressedOutputSize int } @@ -280,6 +283,8 @@ type bdecPart struct { parquetFile []byte parquetMetadata format.FileMetaData stats []*statsBuffer + convertTime time.Duration + serializeTime time.Duration } func (c *SnowflakeIngestionChannel) constructBdecPart(batch service.MessageBatch, metadata map[string]string) (bdecPart, error) { @@ -290,6 +295,7 @@ func (c *SnowflakeIngestionChannel) constructBdecPart(batch service.MessageBatch } rowGroups := []rowGroup{} const maxRowGroupSize = 50_000 + convertStart := time.Now() for i := 0; i < len(batch); i += maxRowGroupSize { end := min(maxRowGroupSize, len(batch[i:])) j := len(rowGroups) @@ -304,6 +310,7 @@ func (c *SnowflakeIngestionChannel) constructBdecPart(batch service.MessageBatch if err := wg.Wait(); err != nil { return bdecPart{}, err } + convertDone := time.Now() allRows := make([]parquet.Row, 0, len(batch)) combinedStats := make([]*statsBuffer, len(c.schema.Fields())) for i := range combinedStats { @@ -329,11 +336,14 @@ func (c *SnowflakeIngestionChannel) constructBdecPart(batch service.MessageBatch if err != nil { return bdecPart{}, fmt.Errorf("unable to parse parquet metadata: %w", err) } + done := time.Now() return bdecPart{ unencryptedLen: len(buf), parquetFile: buf, parquetMetadata: fileMetadata, stats: combinedStats, + convertTime: convertDone.Sub(convertStart), + serializeTime: done.Sub(convertDone), }, err } @@ -452,6 +462,8 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic insertStats.CompressedOutputSize = part.unencryptedLen insertStats.BuildTime = uploadStartTime.Sub(startTime) insertStats.UploadTime = uploadFinishTime.Sub(uploadStartTime) + insertStats.ConvertTime = part.convertTime + insertStats.SerializeTime = part.serializeTime return insertStats, err } From c19f28668eecf2e17af99e57bd1da5000a40c028 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Fri, 1 Nov 2024 18:51:46 +0000 Subject: [PATCH 6/8] snowflake: add debug logging for when rows are written --- internal/impl/snowflake/output_snowflake_streaming.go | 6 +++--- internal/impl/snowflake/streaming/streaming.go | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/internal/impl/snowflake/output_snowflake_streaming.go b/internal/impl/snowflake/output_snowflake_streaming.go index 65908ff14f..c19fcb6fa5 100644 --- a/internal/impl/snowflake/output_snowflake_streaming.go +++ b/internal/impl/snowflake/output_snowflake_streaming.go @@ -419,16 +419,16 @@ func (o *snowflakeStreamerOutput) WriteBatch(ctx context.Context, batch service. return fmt.Errorf("unable to open snowflake streaming channel: %w", err) } } + o.logger.Debugf("inserting rows using channel %s", channel.Name) stats, err := channel.InsertRows(ctx, batch) if err == nil { + o.logger.Debugf("done inserting rows using channel %s, stats: %+v", channel.Name, stats) o.compressedOutput.Incr(int64(stats.CompressedOutputSize)) o.uploadTime.Timing(stats.UploadTime.Nanoseconds()) o.buildTime.Timing(stats.BuildTime.Nanoseconds()) o.convertTime.Timing(stats.ConvertTime.Nanoseconds()) o.serializeTime.Timing(stats.SerializeTime.Nanoseconds()) - } - // If there is some kind of failure, try to reopen the channel - if err != nil { + } else { reopened, reopenErr := o.openChannel(ctx, channel.Name, channel.ID) if reopenErr == nil { o.channelPool.Put(reopened) diff --git a/internal/impl/snowflake/streaming/streaming.go b/internal/impl/snowflake/streaming/streaming.go index 8936906708..c1dacc742d 100644 --- a/internal/impl/snowflake/streaming/streaming.go +++ b/internal/impl/snowflake/streaming/streaming.go @@ -24,7 +24,6 @@ import ( "time" "github.com/cenkalti/backoff/v4" - "github.com/dustin/go-humanize" "github.com/parquet-go/parquet-go" "github.com/parquet-go/parquet-go/format" "github.com/redpanda-data/benthos/v4/public/service" From a176ab9087ca06afa17a4a003bbe2fb793cf61aa Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Fri, 1 Nov 2024 20:56:48 +0000 Subject: [PATCH 7/8] snowflake: simplify stats --- internal/impl/snowflake/streaming/stats.go | 39 +++++++++ .../snowflake/streaming/userdata_converter.go | 87 ++----------------- 2 files changed, 47 insertions(+), 79 deletions(-) diff --git a/internal/impl/snowflake/streaming/stats.go b/internal/impl/snowflake/streaming/stats.go index 7034b98909..3ce3643a23 100644 --- a/internal/impl/snowflake/streaming/stats.go +++ b/internal/impl/snowflake/streaming/stats.go @@ -25,6 +25,45 @@ type statsBuffer struct { hasData bool } +func (s *statsBuffer) UpdateIntStats(v int128.Num) { + if !s.hasData { + s.minIntVal = v + s.maxIntVal = v + s.hasData = true + } else { + s.minIntVal = int128.Min(s.minIntVal, v) + s.maxIntVal = int128.Max(s.maxIntVal, v) + } +} + +func (s *statsBuffer) UpdateFloat64Stats(v float64) { + if !s.hasData { + s.minRealVal = v + s.maxRealVal = v + s.hasData = true + } else { + s.minRealVal = min(s.minRealVal, v) + s.maxRealVal = max(s.maxRealVal, v) + } +} + +func (s *statsBuffer) UpdateBytesStats(v []byte) { + if !s.hasData { + s.minStrVal = v + s.maxStrVal = v + s.maxStrLen = len(v) + s.hasData = true + } else { + if bytes.Compare(v, s.minStrVal) < 0 { + s.minStrVal = v + } + if bytes.Compare(v, s.maxStrVal) > 0 { + s.maxStrVal = v + } + s.maxStrLen = max(s.maxStrLen, len(v)) + } +} + func mergeStats(a, b *statsBuffer) *statsBuffer { c := &statsBuffer{hasData: true} switch { diff --git a/internal/impl/snowflake/streaming/userdata_converter.go b/internal/impl/snowflake/streaming/userdata_converter.go index 27eb73e7f3..d06cd289b9 100644 --- a/internal/impl/snowflake/streaming/userdata_converter.go +++ b/internal/impl/snowflake/streaming/userdata_converter.go @@ -11,7 +11,6 @@ package streaming import ( - "bytes" "encoding/json" "errors" "fmt" @@ -137,14 +136,7 @@ func (c boolConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typed if v { i = int128.FromUint64(1) } - if !stats.hasData { - stats.minIntVal = i - stats.maxIntVal = i - stats.hasData = true - } else { - stats.minIntVal = int128.Min(stats.minIntVal, i) - stats.maxIntVal = int128.Max(stats.maxIntVal, i) - } + stats.UpdateIntStats(i) buf.WriteBool(v) return nil } @@ -218,14 +210,7 @@ func (c numberConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typ if err != nil { return err } - if !stats.hasData { - stats.minIntVal = v - stats.maxIntVal = v - stats.hasData = true - } else { - stats.minIntVal = int128.Min(stats.minIntVal, v) - stats.maxIntVal = int128.Max(stats.maxIntVal, v) - } + stats.UpdateIntStats(v) buf.WriteInt128(v) return nil } @@ -247,14 +232,7 @@ func (c doubleConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typ if err != nil { return err } - if !stats.hasData { - stats.minRealVal = v - stats.maxRealVal = v - stats.hasData = true - } else { - stats.minRealVal = min(stats.minRealVal, v) - stats.maxRealVal = max(stats.maxRealVal, v) - } + stats.UpdateFloat64Stats(v) buf.WriteFloat64(v) return nil } @@ -301,20 +279,7 @@ func (c binaryConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typ if c.utf8 && !utf8.Valid(v) { return errors.New("invalid UTF8") } - if !stats.hasData { - stats.minStrVal = v - stats.maxStrVal = v - stats.maxStrLen = len(v) - stats.hasData = true - } else { - if bytes.Compare(v, stats.minStrVal) < 0 { - stats.minStrVal = v - } - if bytes.Compare(v, stats.maxStrVal) > 0 { - stats.maxStrVal = v - } - stats.maxStrLen = max(stats.maxStrLen, len(v)) - } + stats.UpdateBytesStats(v) buf.WriteBytes(v) return nil } @@ -337,20 +302,7 @@ func (c jsonConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typed if len(v) > c.maxLength { return fmt.Errorf("value too long, length: %d, max: %d", len(v), c.maxLength) } - if !stats.hasData { - stats.minStrVal = v - stats.maxStrVal = v - stats.maxStrLen = len(v) - stats.hasData = true - } else { - if bytes.Compare(v, stats.minStrVal) < 0 { - stats.minStrVal = v - } - if bytes.Compare(v, stats.maxStrVal) > 0 { - stats.maxStrVal = v - } - stats.maxStrLen = max(stats.maxStrLen, len(v)) - } + stats.UpdateBytesStats(v) buf.WriteBytes(v) return nil } @@ -431,14 +383,7 @@ func (c timestampConverter) ValidateAndConvert(stats *statsBuffer, val any, buf c.precision, ) } - if !stats.hasData { - stats.minIntVal = v - stats.maxIntVal = v - stats.hasData = true - } else { - stats.minIntVal = int128.Min(stats.minIntVal, v) - stats.maxIntVal = int128.Max(stats.maxIntVal, v) - } + stats.UpdateIntStats(v) buf.WriteInt128(v) return nil } @@ -468,15 +413,7 @@ func (c timeConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typed t.Second()*int(time.Second.Nanoseconds()) + t.Nanosecond() v := int128.FromInt64(int64(nanos) / pow10TableInt64[9-c.scale]) - if !stats.hasData { - stats.minIntVal = v - stats.maxIntVal = v - stats.hasData = true - } else { - stats.minIntVal = int128.Min(stats.minIntVal, v) - stats.maxIntVal = int128.Max(stats.maxIntVal, v) - } - // TODO(perf): consider switching to int64 buffers so more stuff can fit in cache + stats.UpdateIntStats(v) buf.WriteInt128(v) return nil } @@ -503,15 +440,7 @@ func (c dateConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typed return fmt.Errorf("DATE columns out of range, year: %d", t.Year()) } v := int128.FromInt64(t.Unix() / int64(24*60*60)) - if !stats.hasData { - stats.minIntVal = v - stats.maxIntVal = v - stats.hasData = true - } else { - stats.minIntVal = int128.Min(stats.minIntVal, v) - stats.maxIntVal = int128.Max(stats.maxIntVal, v) - } - // TODO(perf): consider switching to int64 buffers so more stuff can fit in cache + stats.UpdateIntStats(v) buf.WriteInt128(v) return nil } From 7c62dca2599421446112d0090cac8a60f7d2b2fb Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Sun, 3 Nov 2024 02:43:41 +0000 Subject: [PATCH 8/8] snowflake: make build parallelism a config option --- .../pages/outputs/snowflake_streaming.adoc | 10 +++++ .../snowflake/output_snowflake_streaming.go | 45 +++++++++++-------- .../snowflake/streaming/integration_test.go | 9 ++-- .../impl/snowflake/streaming/streaming.go | 11 +++-- 4 files changed, 49 insertions(+), 26 deletions(-) diff --git a/docs/modules/components/pages/outputs/snowflake_streaming.adoc b/docs/modules/components/pages/outputs/snowflake_streaming.adoc index e1ae92cd29..23f6a26f2c 100644 --- a/docs/modules/components/pages/outputs/snowflake_streaming.adoc +++ b/docs/modules/components/pages/outputs/snowflake_streaming.adoc @@ -81,6 +81,7 @@ output: mapping: "" # No default (optional) init_statement: | # No default (optional) CREATE TABLE IF NOT EXISTS mytable (amount NUMBER); + build_parallelism: 1 batching: count: 0 byte_size: 0 @@ -335,6 +336,15 @@ init_statement: |2 ALTER TABLE t1 ADD COLUMN a2 NUMBER; ``` +=== `build_parallelism` + +The maximum amount of parallelism to use when building the output for Snowflake. The metric to watch to see if you need to change this is `snowflake_build_output_latency_ns`. + + +*Type*: `int` + +*Default*: `1` + === `batching` Allows you to configure a xref:configuration:batching.adoc[batching policy]. diff --git a/internal/impl/snowflake/output_snowflake_streaming.go b/internal/impl/snowflake/output_snowflake_streaming.go index c19fcb6fa5..9cdb34558a 100644 --- a/internal/impl/snowflake/output_snowflake_streaming.go +++ b/internal/impl/snowflake/output_snowflake_streaming.go @@ -21,19 +21,20 @@ import ( ) const ( - ssoFieldAccount = "account" - ssoFieldUser = "user" - ssoFieldRole = "role" - ssoFieldDB = "database" - ssoFieldSchema = "schema" - ssoFieldTable = "table" - ssoFieldKey = "private_key" - ssoFieldKeyFile = "private_key_file" - ssoFieldKeyPass = "private_key_pass" - ssoFieldInitStatement = "init_statement" - ssoFieldBatching = "batching" - ssoFieldChannelPrefix = "channel_prefix" - ssoFieldMapping = "mapping" + ssoFieldAccount = "account" + ssoFieldUser = "user" + ssoFieldRole = "role" + ssoFieldDB = "database" + ssoFieldSchema = "schema" + ssoFieldTable = "table" + ssoFieldKey = "private_key" + ssoFieldKeyFile = "private_key_file" + ssoFieldKeyPass = "private_key_pass" + ssoFieldInitStatement = "init_statement" + ssoFieldBatching = "batching" + ssoFieldChannelPrefix = "channel_prefix" + ssoFieldMapping = "mapping" + ssoFieldBuildParallelism = "build_parallelism" ) func snowflakeStreamingOutputConfig() *service.ConfigSpec { @@ -91,6 +92,7 @@ CREATE TABLE IF NOT EXISTS mytable (amount NUMBER); ALTER TABLE t1 ALTER COLUMN c1 DROP NOT NULL; ALTER TABLE t1 ADD COLUMN a2 NUMBER; `), + service.NewIntField(ssoFieldBuildParallelism).Description("The maximum amount of parallelism to use when building the output for Snowflake. The metric to watch to see if you need to change this is `snowflake_build_output_latency_ns`.").Default(1).Advanced(), service.NewBatchPolicyField(ssoFieldBatching), service.NewOutputMaxInFlightField(), service.NewStringField(ssoFieldChannelPrefix). @@ -266,6 +268,10 @@ func newSnowflakeStreamer( return nil, err } } + buildParallelism, err := conf.FieldInt(ssoFieldBuildParallelism) + if err != nil { + return nil, err + } var channelPrefix string if conf.Contains(ssoFieldChannelPrefix) { channelPrefix, err = conf.FieldString(ssoFieldChannelPrefix) @@ -335,6 +341,7 @@ func newSnowflakeStreamer( serializeTime: mgr.Metrics().NewTimer("snowflake_serialize_latency_ns"), compressedOutput: mgr.Metrics().NewCounter("snowflake_compressed_output_size_bytes"), initStatementsFn: initStatementsFn, + buildParallelism: buildParallelism, } return o, nil } @@ -349,6 +356,7 @@ type snowflakeStreamerOutput struct { buildTime *service.MetricTimer convertTime *service.MetricTimer serializeTime *service.MetricTimer + buildParallelism int channelPrefix, db, schema, table string mapping *bloblang.Executor @@ -372,11 +380,12 @@ func (o *snowflakeStreamerOutput) openNewChannel(ctx context.Context) (*streamin func (o *snowflakeStreamerOutput) openChannel(ctx context.Context, name string, id int16) (*streaming.SnowflakeIngestionChannel, error) { o.logger.Debugf("opening snowflake streaming channel: %s", name) return o.client.OpenChannel(ctx, streaming.ChannelOptions{ - ID: id, - Name: name, - DatabaseName: o.db, - SchemaName: o.schema, - TableName: o.table, + ID: id, + Name: name, + DatabaseName: o.db, + SchemaName: o.schema, + TableName: o.table, + BuildParallelism: o.buildParallelism, }) } diff --git a/internal/impl/snowflake/streaming/integration_test.go b/internal/impl/snowflake/streaming/integration_test.go index 0844065343..5fb9bddc49 100644 --- a/internal/impl/snowflake/streaming/integration_test.go +++ b/internal/impl/snowflake/streaming/integration_test.go @@ -88,10 +88,11 @@ func TestAllSnowflakeDatatypes(t *testing.T) { ctx := context.Background() restClient, streamClient := setup(t) channelOpts := streaming.ChannelOptions{ - Name: t.Name(), - DatabaseName: envOr("SNOWFLAKE_DB", "BABY_DATABASE"), - SchemaName: "PUBLIC", - TableName: "TEST_TABLE_KITCHEN_SINK", + Name: t.Name(), + DatabaseName: envOr("SNOWFLAKE_DB", "BABY_DATABASE"), + SchemaName: "PUBLIC", + TableName: "TEST_TABLE_KITCHEN_SINK", + BuildParallelism: 1, } _, err := restClient.RunSQL(ctx, streaming.RunSQLRequest{ Database: channelOpts.DatabaseName, diff --git a/internal/impl/snowflake/streaming/streaming.go b/internal/impl/snowflake/streaming/streaming.go index c1dacc742d..4f22879172 100644 --- a/internal/impl/snowflake/streaming/streaming.go +++ b/internal/impl/snowflake/streaming/streaming.go @@ -143,6 +143,8 @@ type ChannelOptions struct { SchemaName string // TableName is the name of the table TableName string + // The max parallelism used to build parquet files and convert message batches into rows. + BuildParallelism int } type encryptionInfo struct { @@ -288,6 +290,7 @@ type bdecPart struct { func (c *SnowflakeIngestionChannel) constructBdecPart(batch service.MessageBatch, metadata map[string]string) (bdecPart, error) { wg := &errgroup.Group{} + wg.SetLimit(c.BuildParallelism) type rowGroup struct { rows []parquet.Row stats []*statsBuffer @@ -363,7 +366,7 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic c.fileMetadata["primaryFileId"] = path.Base(blobPath) part, err := c.constructBdecPart(batch, c.fileMetadata) if err != nil { - return insertStats, err + return insertStats, fmt.Errorf("unable to construct output: %w", err) } if debug { _ = os.WriteFile("latest_test.parquet", part.parquetFile, 0o644) @@ -372,7 +375,7 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic unencrypted := padBuffer(part.parquetFile, aes.BlockSize) part.parquetFile, err = encrypt(unencrypted, c.encryptionInfo.encryptionKey, blobPath, 0) if err != nil { - return insertStats, err + return insertStats, fmt.Errorf("unable to encrypt output: %w", err) } uploadStartTime := time.Now() @@ -386,7 +389,7 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic return uploader.upload(ctx, blobPath, part.parquetFile, fullMD5Hash[:]) }, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), 3)) if err != nil { - return insertStats, err + return insertStats, fmt.Errorf("unable to upload to storage: %w", err) } uploadFinishTime := time.Now() @@ -435,7 +438,7 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic }, }) if err != nil { - return insertStats, err + return insertStats, fmt.Errorf("registering output failed: %w", err) } if len(resp.Blobs) != 1 { return insertStats, fmt.Errorf("unexpected number of response blobs: %d", len(resp.Blobs))