diff --git a/docs/modules/components/pages/outputs/snowflake_streaming.adoc b/docs/modules/components/pages/outputs/snowflake_streaming.adoc index e1ae92cd29..23f6a26f2c 100644 --- a/docs/modules/components/pages/outputs/snowflake_streaming.adoc +++ b/docs/modules/components/pages/outputs/snowflake_streaming.adoc @@ -81,6 +81,7 @@ output: mapping: "" # No default (optional) init_statement: | # No default (optional) CREATE TABLE IF NOT EXISTS mytable (amount NUMBER); + build_parallelism: 1 batching: count: 0 byte_size: 0 @@ -335,6 +336,15 @@ init_statement: |2 ALTER TABLE t1 ADD COLUMN a2 NUMBER; ``` +=== `build_parallelism` + +The maximum amount of parallelism to use when building the output for Snowflake. The metric to watch to see if you need to change this is `snowflake_build_output_latency_ns`. + + +*Type*: `int` + +*Default*: `1` + === `batching` Allows you to configure a xref:configuration:batching.adoc[batching policy]. diff --git a/internal/impl/snowflake/output_snowflake_streaming.go b/internal/impl/snowflake/output_snowflake_streaming.go index 9ba37daa70..9cdb34558a 100644 --- a/internal/impl/snowflake/output_snowflake_streaming.go +++ b/internal/impl/snowflake/output_snowflake_streaming.go @@ -21,19 +21,20 @@ import ( ) const ( - ssoFieldAccount = "account" - ssoFieldUser = "user" - ssoFieldRole = "role" - ssoFieldDB = "database" - ssoFieldSchema = "schema" - ssoFieldTable = "table" - ssoFieldKey = "private_key" - ssoFieldKeyFile = "private_key_file" - ssoFieldKeyPass = "private_key_pass" - ssoFieldInitStatement = "init_statement" - ssoFieldBatching = "batching" - ssoFieldChannelPrefix = "channel_prefix" - ssoFieldMapping = "mapping" + ssoFieldAccount = "account" + ssoFieldUser = "user" + ssoFieldRole = "role" + ssoFieldDB = "database" + ssoFieldSchema = "schema" + ssoFieldTable = "table" + ssoFieldKey = "private_key" + ssoFieldKeyFile = "private_key_file" + ssoFieldKeyPass = "private_key_pass" + ssoFieldInitStatement = "init_statement" + ssoFieldBatching = "batching" + ssoFieldChannelPrefix = "channel_prefix" + ssoFieldMapping = "mapping" + ssoFieldBuildParallelism = "build_parallelism" ) func snowflakeStreamingOutputConfig() *service.ConfigSpec { @@ -91,6 +92,7 @@ CREATE TABLE IF NOT EXISTS mytable (amount NUMBER); ALTER TABLE t1 ALTER COLUMN c1 DROP NOT NULL; ALTER TABLE t1 ADD COLUMN a2 NUMBER; `), + service.NewIntField(ssoFieldBuildParallelism).Description("The maximum amount of parallelism to use when building the output for Snowflake. The metric to watch to see if you need to change this is `snowflake_build_output_latency_ns`.").Default(1).Advanced(), service.NewBatchPolicyField(ssoFieldBatching), service.NewOutputMaxInFlightField(), service.NewStringField(ssoFieldChannelPrefix). @@ -266,6 +268,10 @@ func newSnowflakeStreamer( return nil, err } } + buildParallelism, err := conf.FieldInt(ssoFieldBuildParallelism) + if err != nil { + return nil, err + } var channelPrefix string if conf.Contains(ssoFieldChannelPrefix) { channelPrefix, err = conf.FieldString(ssoFieldChannelPrefix) @@ -331,8 +337,11 @@ func newSnowflakeStreamer( logger: mgr.Logger(), buildTime: mgr.Metrics().NewTimer("snowflake_build_output_latency_ns"), uploadTime: mgr.Metrics().NewTimer("snowflake_upload_latency_ns"), + convertTime: mgr.Metrics().NewTimer("snowflake_convert_latency_ns"), + serializeTime: mgr.Metrics().NewTimer("snowflake_serialize_latency_ns"), compressedOutput: mgr.Metrics().NewCounter("snowflake_compressed_output_size_bytes"), initStatementsFn: initStatementsFn, + buildParallelism: buildParallelism, } return o, nil } @@ -345,6 +354,9 @@ type snowflakeStreamerOutput struct { compressedOutput *service.MetricCounter uploadTime *service.MetricTimer buildTime *service.MetricTimer + convertTime *service.MetricTimer + serializeTime *service.MetricTimer + buildParallelism int channelPrefix, db, schema, table string mapping *bloblang.Executor @@ -368,11 +380,12 @@ func (o *snowflakeStreamerOutput) openNewChannel(ctx context.Context) (*streamin func (o *snowflakeStreamerOutput) openChannel(ctx context.Context, name string, id int16) (*streaming.SnowflakeIngestionChannel, error) { o.logger.Debugf("opening snowflake streaming channel: %s", name) return o.client.OpenChannel(ctx, streaming.ChannelOptions{ - ID: id, - Name: name, - DatabaseName: o.db, - SchemaName: o.schema, - TableName: o.table, + ID: id, + Name: name, + DatabaseName: o.db, + SchemaName: o.schema, + TableName: o.table, + BuildParallelism: o.buildParallelism, }) } @@ -415,12 +428,16 @@ func (o *snowflakeStreamerOutput) WriteBatch(ctx context.Context, batch service. return fmt.Errorf("unable to open snowflake streaming channel: %w", err) } } + o.logger.Debugf("inserting rows using channel %s", channel.Name) stats, err := channel.InsertRows(ctx, batch) - o.compressedOutput.Incr(int64(stats.CompressedOutputSize)) - o.uploadTime.Timing(stats.UploadTime.Nanoseconds()) - o.buildTime.Timing(stats.BuildTime.Nanoseconds()) - // If there is some kind of failure, try to reopen the channel - if err != nil { + if err == nil { + o.logger.Debugf("done inserting rows using channel %s, stats: %+v", channel.Name, stats) + o.compressedOutput.Incr(int64(stats.CompressedOutputSize)) + o.uploadTime.Timing(stats.UploadTime.Nanoseconds()) + o.buildTime.Timing(stats.BuildTime.Nanoseconds()) + o.convertTime.Timing(stats.ConvertTime.Nanoseconds()) + o.serializeTime.Timing(stats.SerializeTime.Nanoseconds()) + } else { reopened, reopenErr := o.openChannel(ctx, channel.Name, channel.ID) if reopenErr == nil { o.channelPool.Put(reopened) diff --git a/internal/impl/snowflake/streaming/int128/int128.go b/internal/impl/snowflake/streaming/int128/int128.go index 3eedb8fd97..aba363539f 100644 --- a/internal/impl/snowflake/streaming/int128/int128.go +++ b/internal/impl/snowflake/streaming/int128/int128.go @@ -227,8 +227,8 @@ func (i Num) ToBigEndian() []byte { // AppendBigEndian converts an Int128 into big endian bytes func (i Num) AppendBigEndian(b []byte) []byte { - b = binary.BigEndian.AppendUint64(b[0:8], uint64(i.hi)) - return binary.BigEndian.AppendUint64(b[8:16], i.lo) + b = binary.BigEndian.AppendUint64(b, uint64(i.hi)) + return binary.BigEndian.AppendUint64(b, i.lo) } // ToInt64 casts an Int128 to a int64 by truncating the bytes. diff --git a/internal/impl/snowflake/streaming/int128/int128_test.go b/internal/impl/snowflake/streaming/int128/int128_test.go index 765b6ce731..a74058352d 100644 --- a/internal/impl/snowflake/streaming/int128/int128_test.go +++ b/internal/impl/snowflake/streaming/int128/int128_test.go @@ -11,8 +11,10 @@ package int128 import ( + "crypto/rand" "fmt" "math" + "slices" "testing" "github.com/stretchr/testify/require" @@ -388,3 +390,17 @@ func TestFitsInPrec(t *testing.T) { require.NoError(t, err) require.True(t, n.FitsInPrecision(38), snowflakeNumberTiny) } + +func TestToBytes(t *testing.T) { + for i := 0; i < 100; i++ { + input := make([]byte, 16) + _, err := rand.Read(input) + require.NoError(t, err) + n := FromBigEndian(input) + require.Equal(t, input, n.ToBigEndian()) + require.Equal(t, input, n.AppendBigEndian(nil)) + cloned := slices.Clone(input) + require.Equal(t, input, n.AppendBigEndian(cloned)[16:32]) + require.Equal(t, input, cloned) // Make sure cloned isn't mutated + } +} diff --git a/internal/impl/snowflake/streaming/integration_test.go b/internal/impl/snowflake/streaming/integration_test.go index 0844065343..5fb9bddc49 100644 --- a/internal/impl/snowflake/streaming/integration_test.go +++ b/internal/impl/snowflake/streaming/integration_test.go @@ -88,10 +88,11 @@ func TestAllSnowflakeDatatypes(t *testing.T) { ctx := context.Background() restClient, streamClient := setup(t) channelOpts := streaming.ChannelOptions{ - Name: t.Name(), - DatabaseName: envOr("SNOWFLAKE_DB", "BABY_DATABASE"), - SchemaName: "PUBLIC", - TableName: "TEST_TABLE_KITCHEN_SINK", + Name: t.Name(), + DatabaseName: envOr("SNOWFLAKE_DB", "BABY_DATABASE"), + SchemaName: "PUBLIC", + TableName: "TEST_TABLE_KITCHEN_SINK", + BuildParallelism: 1, } _, err := restClient.RunSQL(ctx, streaming.RunSQLRequest{ Database: channelOpts.DatabaseName, diff --git a/internal/impl/snowflake/streaming/parquet.go b/internal/impl/snowflake/streaming/parquet.go index 7b5db3868d..873ac9f02c 100644 --- a/internal/impl/snowflake/streaming/parquet.go +++ b/internal/impl/snowflake/streaming/parquet.go @@ -11,9 +11,9 @@ package streaming import ( + "bytes" "encoding/binary" "fmt" - "io" "github.com/parquet-go/parquet-go" "github.com/parquet-go/parquet-go/format" @@ -21,66 +21,76 @@ import ( "github.com/segmentio/encoding/thrift" ) -func messageToRow(msg *service.Message) (map[string]any, error) { +// messageToRow converts a message into columnar form using the provided name to index mapping. +// We have to materialize the column into a row so that we can know if a column is null - the +// msg can be sparse, but the row must not be sparse. +func messageToRow(msg *service.Message, out []any, nameToPosition map[string]int) error { v, err := msg.AsStructured() if err != nil { - return nil, fmt.Errorf("error extracting object from message: %w", err) + return fmt.Errorf("error extracting object from message: %w", err) } row, ok := v.(map[string]any) if !ok { - return nil, fmt.Errorf("expected object, got: %T", v) + return fmt.Errorf("expected object, got: %T", v) } - mapped := make(map[string]any, len(row)) for k, v := range row { - mapped[normalizeColumnName(k)] = v + idx, ok := nameToPosition[normalizeColumnName(k)] + if !ok { + // TODO(schema): Unknown column, we just skip it. + // In the future we may evolve the schema based on the new data. + continue + } + out[idx] = v } - return mapped, nil + return nil } -// TODO: If the memory pressure is too great from writing all -// records buffered as a single row group, then consider -// return some kind of iterator of chunks of rows that we can -// then feed into the actual parquet construction process. -// -// If a single parquet file is too much, we can consider having multiple -// parquet files in a single bdec file. func constructRowGroup( batch service.MessageBatch, schema *parquet.Schema, - transformers map[string]*dataTransformer, -) ([]parquet.Row, error) { + transformers []*dataTransformer, +) ([]parquet.Row, []*statsBuffer, error) { // We write all of our data in a columnar fashion, but need to pivot that data so that we can feed it into // out parquet library (which sadly will redo the pivot - maybe we need a lower level abstraction...). // So create a massive matrix that we will write stuff in columnar form, but then we don't need to move any // data to create rows of the data via an in-place transpose operation. // // TODO: Consider caching/pooling this matrix as I expect many are similarily sized. - matrix := make([]parquet.Value, len(batch)*len(schema.Fields())) rowWidth := len(schema.Fields()) - for idx, field := range schema.Fields() { - // The column index is consistent between two schemas that are the same because the schema fields are always - // in sorted order. - columnIndex := idx - t := transformers[field.Name()] - t.buf.Prepare(matrix, columnIndex, rowWidth) - t.stats.Reset() + matrix := make([]parquet.Value, len(batch)*rowWidth) + nameToPosition := make(map[string]int, rowWidth) + stats := make([]*statsBuffer, rowWidth) + buffers := make([]typedBuffer, rowWidth) + for idx, t := range transformers { + leaf, ok := schema.Lookup(t.name) + if !ok { + return nil, nil, fmt.Errorf("invariant failed: unable to find column %q", t.name) + } + buffers[idx] = t.bufferFactory() + buffers[idx].Prepare(matrix, leaf.ColumnIndex, rowWidth) + stats[idx] = &statsBuffer{} + nameToPosition[t.name] = idx } // First we need to shred our record into columns, snowflake's data model // is thankfully a flat list of columns, so no dremel style record shredding // is needed + row := make([]any, rowWidth) for _, msg := range batch { - row, err := messageToRow(msg) + err := messageToRow(msg, row, nameToPosition) if err != nil { - return nil, err + return nil, nil, err } - // We **must** write a null, so iterate over the schema not the record, - // which might be sparse - for name, t := range transformers { - v := row[name] - err = t.converter.ValidateAndConvert(t.stats, v, t.buf) + for i, v := range row { + t := transformers[i] + s := stats[i] + b := buffers[i] + err = t.converter.ValidateAndConvert(s, v, b) if err != nil { - return nil, fmt.Errorf("invalid data for column %s: %w", name, err) + // TODO(schema): if this is a null value err then we can evolve the schema to mark it null. + return nil, nil, fmt.Errorf("invalid data for column %s: %w", t.name, err) } + // reset the column as nil for the next row + row[i] = nil } } // Now all our values have been written to each buffer - here is where we do our matrix @@ -90,7 +100,7 @@ func constructRowGroup( rowStart := i * rowWidth rows[i] = matrix[rowStart : rowStart+rowWidth] } - return rows, nil + return rows, stats, nil } type parquetFileData struct { @@ -99,9 +109,10 @@ type parquetFileData struct { metadata map[string]string } -func writeParquetFile(writer io.Writer, rpcnVersion string, data parquetFileData) (err error) { - pw := parquet.NewGenericWriter[map[string]any]( - writer, +func writeParquetFile(rpcnVersion string, data parquetFileData) (out []byte, err error) { + b := bytes.NewBuffer(nil) + pw := parquet.NewGenericWriter[any]( + b, data.schema, parquet.CreatedBy("RedpandaConnect", rpcnVersion, "unknown"), // Recommended by the Snowflake team to enable data page stats @@ -121,6 +132,7 @@ func writeParquetFile(writer io.Writer, rpcnVersion string, data parquetFileData return } err = pw.Close() + out = b.Bytes() return } diff --git a/internal/impl/snowflake/streaming/parquet_test.go b/internal/impl/snowflake/streaming/parquet_test.go index 5449a7772a..27c2581345 100644 --- a/internal/impl/snowflake/streaming/parquet_test.go +++ b/internal/impl/snowflake/streaming/parquet_test.go @@ -18,6 +18,7 @@ import ( "github.com/aws/smithy-go/ptr" "github.com/parquet-go/parquet-go" "github.com/redpanda-data/benthos/v4/public/service" + "github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming/int128" "github.com/stretchr/testify/require" ) @@ -26,7 +27,6 @@ func msg(s string) *service.Message { } func TestWriteParquet(t *testing.T) { - b := bytes.NewBuffer(nil) batch := service.MessageBatch{ msg(`{"a":2}`), msg(`{"a":12353}`), @@ -34,14 +34,14 @@ func TestWriteParquet(t *testing.T) { inputDataSchema := parquet.Group{ "A": parquet.Decimal(0, 18, parquet.Int32Type), } - transformers := map[string]*dataTransformer{ - "A": { + transformers := []*dataTransformer{ + { + name: "A", converter: numberConverter{ nullable: true, scale: 0, precision: 38, }, - stats: &statsBuffer{columnID: 1}, column: &columnMetadata{ Name: "A", Ordinal: 1, @@ -52,23 +52,23 @@ func TestWriteParquet(t *testing.T) { Scale: ptr.Int32(0), Nullable: true, }, - buf: &int32Buffer{}, + bufferFactory: int32TypedBufferFactory, }, } schema := parquet.NewSchema("bdec", inputDataSchema) - rows, err := constructRowGroup( + rows, stats, err := constructRowGroup( batch, schema, transformers, ) require.NoError(t, err) - err = writeParquetFile(b, "latest", parquetFileData{ + b, err := writeParquetFile("latest", parquetFileData{ schema, rows, nil, }) require.NoError(t, err) actual, err := readGeneric( - bytes.NewReader(b.Bytes()), - int64(b.Len()), + bytes.NewReader(b), + int64(len(b)), parquet.NewSchema("bdec", inputDataSchema), ) require.NoError(t, err) @@ -76,6 +76,13 @@ func TestWriteParquet(t *testing.T) { {"A": int32(2)}, {"A": int32(12353)}, }, actual) + require.Equal(t, []*statsBuffer{ + { + minIntVal: int128.FromInt64(2), + maxIntVal: int128.FromInt64(12353), + hasData: true, + }, + }, stats) } func readGeneric(r io.ReaderAt, size int64, schema *parquet.Schema) (rows []map[string]any, err error) { diff --git a/internal/impl/snowflake/streaming/schema.go b/internal/impl/snowflake/streaming/schema.go index 633ac0b2bf..54d10393d2 100644 --- a/internal/impl/snowflake/streaming/schema.go +++ b/internal/impl/snowflake/streaming/schema.go @@ -18,18 +18,16 @@ import ( "github.com/dustin/go-humanize" "github.com/parquet-go/parquet-go" - - "github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming/int128" ) type dataTransformer struct { - converter dataConverter - stats *statsBuffer - column *columnMetadata - buf typedBuffer + converter dataConverter + column *columnMetadata + bufferFactory typedBufferFactory + name string } -func convertFixedType(column columnMetadata) (parquet.Node, dataConverter, typedBuffer, error) { +func convertFixedType(column columnMetadata) (parquet.Node, dataConverter, typedBufferFactory, error) { var scale int32 var precision int32 if column.Scale != nil { @@ -41,7 +39,7 @@ func convertFixedType(column columnMetadata) (parquet.Node, dataConverter, typed isDecimal := column.Scale != nil && column.Precision != nil if (column.Scale != nil && *column.Scale != 0) || strings.ToUpper(column.PhysicalType) == "SB16" { c := numberConverter{nullable: column.Nullable, scale: scale, precision: precision} - b := &typedBufferImpl{} + b := defaultTypedBufferFactory t := parquet.FixedLenByteArrayType(16) if isDecimal { return parquet.Decimal(int(scale), int(precision), t), c, b, nil @@ -50,24 +48,24 @@ func convertFixedType(column columnMetadata) (parquet.Node, dataConverter, typed } var ptype parquet.Type var defaultPrecision int32 - var buffer typedBuffer + var bufferFactory typedBufferFactory switch strings.ToUpper(column.PhysicalType) { case "SB1": ptype = parquet.Int32Type defaultPrecision = maxPrecisionForByteWidth(1) - buffer = &int32Buffer{} + bufferFactory = int32TypedBufferFactory case "SB2": ptype = parquet.Int32Type defaultPrecision = maxPrecisionForByteWidth(2) - buffer = &int32Buffer{} + bufferFactory = int32TypedBufferFactory case "SB4": ptype = parquet.Int32Type defaultPrecision = maxPrecisionForByteWidth(4) - buffer = &int32Buffer{} + bufferFactory = int32TypedBufferFactory case "SB8": ptype = parquet.Int64Type defaultPrecision = maxPrecisionForByteWidth(8) - buffer = &int64Buffer{} + bufferFactory = int64TypedBufferFactory default: return nil, nil, nil, fmt.Errorf("unsupported physical column type: %s", column.PhysicalType) } @@ -77,30 +75,30 @@ func convertFixedType(column columnMetadata) (parquet.Node, dataConverter, typed } c := numberConverter{nullable: column.Nullable, scale: scale, precision: validationPrecision} if isDecimal { - return parquet.Decimal(int(scale), int(precision), ptype), c, buffer, nil + return parquet.Decimal(int(scale), int(precision), ptype), c, bufferFactory, nil } - return parquet.Leaf(ptype), c, buffer, nil + return parquet.Leaf(ptype), c, bufferFactory, nil } // maxJSONSize is the size that any kind of semi-structured data can be, which is 16MiB minus a small overhead const maxJSONSize = 16*humanize.MiByte - 64 // See ParquetTypeGenerator -func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, map[string]*dataTransformer, map[string]string, error) { +func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, []*dataTransformer, map[string]string, error) { groupNode := parquet.Group{} - transformers := map[string]*dataTransformer{} + transformers := make([]*dataTransformer, len(columns)) // Don't write the sfVer key as it allows us to not have to narrow the numeric types in parquet. typeMetadata := map[string]string{ /*"sfVer": "1,1"*/ } var err error - for _, column := range columns { + for idx, column := range columns { id := int(column.Ordinal) var n parquet.Node var converter dataConverter - var buffer typedBuffer + var bufferFactory typedBufferFactory = defaultTypedBufferFactory logicalType := strings.ToLower(column.LogicalType) switch logicalType { case "fixed": - n, converter, buffer, err = convertFixedType(column) + n, converter, bufferFactory, err = convertFixedType(column) if err != nil { return nil, nil, nil, err } @@ -108,17 +106,14 @@ func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, map[stri typeMetadata[fmt.Sprintf("%d:obj_enc", id)] = "1" n = parquet.String() converter = jsonArrayConverter{jsonConverter{column.Nullable, maxJSONSize}} - buffer = &typedBufferImpl{} case "object": typeMetadata[fmt.Sprintf("%d:obj_enc", id)] = "1" n = parquet.String() converter = jsonObjectConverter{jsonConverter{column.Nullable, maxJSONSize}} - buffer = &typedBufferImpl{} case "variant": typeMetadata[fmt.Sprintf("%d:obj_enc", id)] = "1" n = parquet.String() converter = jsonConverter{column.Nullable, maxJSONSize} - buffer = &typedBufferImpl{} case "any", "text", "char": n = parquet.String() byteLength := 16 * humanize.MiByte @@ -127,7 +122,6 @@ func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, map[stri } byteLength = min(byteLength, 16*humanize.MiByte) converter = binaryConverter{nullable: column.Nullable, maxLength: byteLength, utf8: true} - buffer = &typedBufferImpl{} case "binary": n = parquet.Leaf(parquet.ByteArrayType) // Why binary data defaults to 8MiB instead of the 16MiB for strings... ¯\_(ツ)_/¯ @@ -137,26 +131,22 @@ func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, map[stri } byteLength = min(byteLength, 16*humanize.MiByte) converter = binaryConverter{nullable: column.Nullable, maxLength: byteLength} - buffer = &typedBufferImpl{} case "boolean": n = parquet.Leaf(parquet.BooleanType) converter = boolConverter{column.Nullable} - buffer = &typedBufferImpl{} case "real": n = parquet.Leaf(parquet.DoubleType) converter = doubleConverter{column.Nullable} - buffer = &typedBufferImpl{} case "timestamp_tz", "timestamp_ltz", "timestamp_ntz": var scale, precision int32 var pt parquet.Type if column.PhysicalType == "SB8" { pt = parquet.Int64Type precision = maxPrecisionForByteWidth(8) - buffer = &int64Buffer{} + bufferFactory = int64TypedBufferFactory } else { pt = parquet.FixedLenByteArrayType(16) precision = maxPrecisionForByteWidth(16) - buffer = &typedBufferImpl{} } if column.Scale != nil { scale = *column.Scale @@ -176,11 +166,11 @@ func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, map[stri case "time": t := parquet.Int32Type precision := 9 - buffer = &int32Buffer{} + bufferFactory = int32TypedBufferFactory if column.PhysicalType == "SB8" { t = parquet.Int64Type precision = 18 - buffer = &int64Buffer{} + bufferFactory = int64TypedBufferFactory } scale := int32(9) if column.Scale != nil { @@ -191,7 +181,7 @@ func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, map[stri case "date": n = parquet.Leaf(parquet.Int32Type) converter = dateConverter{column.Nullable} - buffer = &int32Buffer{} + bufferFactory = int32TypedBufferFactory default: return nil, nil, nil, fmt.Errorf("unsupported logical column type: %s", column.LogicalType) } @@ -209,68 +199,16 @@ func constructParquetSchema(columns []columnMetadata) (*parquet.Schema, map[stri ) name := normalizeColumnName(column.Name) groupNode[name] = n - transformers[name] = &dataTransformer{ - converter: converter, - stats: &statsBuffer{columnID: id}, - column: &column, - buf: buffer, + transformers[idx] = &dataTransformer{ + name: name, + converter: converter, + column: &column, + bufferFactory: bufferFactory, } } return parquet.NewSchema("bdec", groupNode), transformers, typeMetadata, nil } -type statsBuffer struct { - columnID int - minIntVal, maxIntVal int128.Num - minRealVal, maxRealVal float64 - minStrVal, maxStrVal []byte - maxStrLen int - nullCount int64 - first bool -} - -func (s *statsBuffer) Reset() { - s.first = true - s.minIntVal = int128.FromInt64(0) - s.maxIntVal = int128.FromInt64(0) - s.minRealVal = 0 - s.maxRealVal = 0 - s.minStrVal = nil - s.maxStrVal = nil - s.maxStrLen = 0 - s.nullCount = 0 -} - -func computeColumnEpInfo(stats map[string]*dataTransformer) map[string]fileColumnProperties { - info := map[string]fileColumnProperties{} - for _, transformer := range stats { - stat := transformer.stats - var minStrVal *string = nil - if stat.minStrVal != nil { - s := truncateBytesAsHex(stat.minStrVal, false) - minStrVal = &s - } - var maxStrVal *string = nil - if stat.maxStrVal != nil { - s := truncateBytesAsHex(stat.maxStrVal, true) - maxStrVal = &s - } - info[transformer.column.Name] = fileColumnProperties{ - ColumnOrdinal: int32(stat.columnID), - NullCount: stat.nullCount, - MinStrValue: minStrVal, - MaxStrValue: maxStrVal, - MaxLength: int64(stat.maxStrLen), - MinIntValue: stat.minIntVal, - MaxIntValue: stat.maxIntVal, - MinRealValue: stat.minRealVal, - MaxRealValue: stat.maxRealVal, - DistinctValues: -1, - } - } - return info -} - func physicalTypeOrdinal(str string) int { switch strings.ToUpper(str) { case "ROWINDEX": diff --git a/internal/impl/snowflake/streaming/stats.go b/internal/impl/snowflake/streaming/stats.go new file mode 100644 index 0000000000..3ce3643a23 --- /dev/null +++ b/internal/impl/snowflake/streaming/stats.go @@ -0,0 +1,123 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +package streaming + +import ( + "bytes" + + "github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming/int128" +) + +type statsBuffer struct { + minIntVal, maxIntVal int128.Num + minRealVal, maxRealVal float64 + minStrVal, maxStrVal []byte + maxStrLen int + nullCount int64 + hasData bool +} + +func (s *statsBuffer) UpdateIntStats(v int128.Num) { + if !s.hasData { + s.minIntVal = v + s.maxIntVal = v + s.hasData = true + } else { + s.minIntVal = int128.Min(s.minIntVal, v) + s.maxIntVal = int128.Max(s.maxIntVal, v) + } +} + +func (s *statsBuffer) UpdateFloat64Stats(v float64) { + if !s.hasData { + s.minRealVal = v + s.maxRealVal = v + s.hasData = true + } else { + s.minRealVal = min(s.minRealVal, v) + s.maxRealVal = max(s.maxRealVal, v) + } +} + +func (s *statsBuffer) UpdateBytesStats(v []byte) { + if !s.hasData { + s.minStrVal = v + s.maxStrVal = v + s.maxStrLen = len(v) + s.hasData = true + } else { + if bytes.Compare(v, s.minStrVal) < 0 { + s.minStrVal = v + } + if bytes.Compare(v, s.maxStrVal) > 0 { + s.maxStrVal = v + } + s.maxStrLen = max(s.maxStrLen, len(v)) + } +} + +func mergeStats(a, b *statsBuffer) *statsBuffer { + c := &statsBuffer{hasData: true} + switch { + case a.hasData && b.hasData: + c.minIntVal = int128.Min(a.minIntVal, b.minIntVal) + c.maxIntVal = int128.Max(a.maxIntVal, b.maxIntVal) + c.minRealVal = min(a.minRealVal, b.minRealVal) + c.maxRealVal = max(a.maxRealVal, b.maxRealVal) + c.maxStrLen = max(a.maxStrLen, b.maxStrLen) + c.minStrVal = a.minStrVal + if bytes.Compare(b.minStrVal, a.minStrVal) < 0 { + c.minStrVal = b.minStrVal + } + c.maxStrVal = a.maxStrVal + if bytes.Compare(b.maxStrVal, a.maxStrVal) > 0 { + c.maxStrVal = b.maxStrVal + } + case a.hasData: + *c = *a + case b.hasData: + *c = *b + default: + c.hasData = false + } + c.nullCount = a.nullCount + b.nullCount + return c +} + +func computeColumnEpInfo(transformers []*dataTransformer, stats []*statsBuffer) map[string]fileColumnProperties { + info := map[string]fileColumnProperties{} + for idx, transformer := range transformers { + stat := stats[idx] + var minStrVal *string = nil + if stat.minStrVal != nil { + s := truncateBytesAsHex(stat.minStrVal, false) + minStrVal = &s + } + var maxStrVal *string = nil + if stat.maxStrVal != nil { + s := truncateBytesAsHex(stat.maxStrVal, true) + maxStrVal = &s + } + info[transformer.column.Name] = fileColumnProperties{ + ColumnOrdinal: transformer.column.Ordinal, + NullCount: stat.nullCount, + MinStrValue: minStrVal, + MaxStrValue: maxStrVal, + MaxLength: int64(stat.maxStrLen), + MinIntValue: stat.minIntVal, + MaxIntValue: stat.maxIntVal, + MinRealValue: stat.minRealVal, + MaxRealValue: stat.maxRealVal, + DistinctValues: -1, + } + } + return info +} diff --git a/internal/impl/snowflake/streaming/stats_test.go b/internal/impl/snowflake/streaming/stats_test.go new file mode 100644 index 0000000000..2ad8e600bf --- /dev/null +++ b/internal/impl/snowflake/streaming/stats_test.go @@ -0,0 +1,78 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +package streaming + +import ( + "testing" + + "github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming/int128" + "github.com/stretchr/testify/require" +) + +func TestMergeInt(t *testing.T) { + s := mergeStats(&statsBuffer{ + minIntVal: int128.FromInt64(-1), + maxIntVal: int128.FromInt64(4), + hasData: true, + }, &statsBuffer{ + minIntVal: int128.FromInt64(3), + maxIntVal: int128.FromInt64(5), + hasData: true, + }) + require.Equal(t, &statsBuffer{ + minIntVal: int128.FromInt64(-1), + maxIntVal: int128.FromInt64(5), + hasData: true, + }, s) +} + +func TestMergeReal(t *testing.T) { + s := mergeStats(&statsBuffer{ + minRealVal: -1.2, + maxRealVal: 4.5, + nullCount: 4, + hasData: true, + }, &statsBuffer{ + minRealVal: 3.4, + maxRealVal: 5.9, + nullCount: 2, + hasData: true, + }) + require.Equal(t, &statsBuffer{ + minRealVal: -1.2, + maxRealVal: 5.9, + nullCount: 6, + hasData: true, + }, s) +} + +func TestMergeStr(t *testing.T) { + s := mergeStats(&statsBuffer{ + minStrVal: []byte("aa"), + maxStrVal: []byte("bbbb"), + maxStrLen: 6, + nullCount: 1, + hasData: true, + }, &statsBuffer{ + minStrVal: []byte("aaaa"), + maxStrVal: []byte("cccccc"), + maxStrLen: 24, + nullCount: 1, + hasData: true, + }) + require.Equal(t, &statsBuffer{ + minStrVal: []byte("aa"), + maxStrVal: []byte("cccccc"), + maxStrLen: 24, + nullCount: 2, + hasData: true, + }, s) +} diff --git a/internal/impl/snowflake/streaming/streaming.go b/internal/impl/snowflake/streaming/streaming.go index 476bdefc4f..4f22879172 100644 --- a/internal/impl/snowflake/streaming/streaming.go +++ b/internal/impl/snowflake/streaming/streaming.go @@ -11,7 +11,6 @@ package streaming import ( - "bytes" "context" "crypto/aes" "crypto/md5" @@ -26,7 +25,9 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/parquet-go/parquet-go" + "github.com/parquet-go/parquet-go/format" "github.com/redpanda-data/benthos/v4/public/service" + "golang.org/x/sync/errgroup" "github.com/redpanda-data/connect/v4/internal/periodic" "github.com/redpanda-data/connect/v4/internal/typed" @@ -142,6 +143,8 @@ type ChannelOptions struct { SchemaName string // TableName is the name of the table TableName string + // The max parallelism used to build parquet files and convert message batches into rows. + BuildParallelism int } type encryptionInfo struct { @@ -186,7 +189,6 @@ func (c *SnowflakeServiceClient) OpenChannel(ctx context.Context, opts ChannelOp rowSequencer: resp.RowSequencer, transformers: transformers, fileMetadata: typeMetadata, - buffer: bytes.NewBuffer(nil), requestIDCounter: c.requestIDCounter, } return ch, nil @@ -256,9 +258,8 @@ type SnowflakeIngestionChannel struct { encryptionInfo *encryptionInfo clientSequencer int64 rowSequencer int64 - transformers map[string]*dataTransformer + transformers []*dataTransformer fileMetadata map[string]string - buffer *bytes.Buffer // This is shared among the various open channels to get some uniqueness // when naming bdec files requestIDCounter *atomic.Int64 @@ -272,72 +273,133 @@ func (c *SnowflakeIngestionChannel) nextRequestID() string { // InsertStats holds some basic statistics about the InsertRows operation type InsertStats struct { BuildTime time.Duration + ConvertTime time.Duration + SerializeTime time.Duration UploadTime time.Duration CompressedOutputSize int } +type bdecPart struct { + unencryptedLen int + parquetFile []byte + parquetMetadata format.FileMetaData + stats []*statsBuffer + convertTime time.Duration + serializeTime time.Duration +} + +func (c *SnowflakeIngestionChannel) constructBdecPart(batch service.MessageBatch, metadata map[string]string) (bdecPart, error) { + wg := &errgroup.Group{} + wg.SetLimit(c.BuildParallelism) + type rowGroup struct { + rows []parquet.Row + stats []*statsBuffer + } + rowGroups := []rowGroup{} + const maxRowGroupSize = 50_000 + convertStart := time.Now() + for i := 0; i < len(batch); i += maxRowGroupSize { + end := min(maxRowGroupSize, len(batch[i:])) + j := len(rowGroups) + rowGroups = append(rowGroups, rowGroup{}) + chunk := batch[i : i+end] + wg.Go(func() error { + rows, stats, err := constructRowGroup(chunk, c.schema, c.transformers) + rowGroups[j] = rowGroup{rows, stats} + return err + }) + } + if err := wg.Wait(); err != nil { + return bdecPart{}, err + } + convertDone := time.Now() + allRows := make([]parquet.Row, 0, len(batch)) + combinedStats := make([]*statsBuffer, len(c.schema.Fields())) + for i := range combinedStats { + combinedStats[i] = &statsBuffer{} + } + for _, rg := range rowGroups { + allRows = append(allRows, rg.rows...) + for i, s := range combinedStats { + combinedStats[i] = mergeStats(s, rg.stats[i]) + } + } + // TODO(perf): It would be really nice to be able to compress in parallel, + // that actually ends up taking quite of bit of CPU. + buf, err := writeParquetFile(c.version, parquetFileData{ + schema: c.schema, + rows: allRows, + metadata: metadata, + }) + if err != nil { + return bdecPart{}, err + } + fileMetadata, err := readParquetMetadata(buf) + if err != nil { + return bdecPart{}, fmt.Errorf("unable to parse parquet metadata: %w", err) + } + done := time.Now() + return bdecPart{ + unencryptedLen: len(buf), + parquetFile: buf, + parquetMetadata: fileMetadata, + stats: combinedStats, + convertTime: convertDone.Sub(convertStart), + serializeTime: done.Sub(convertDone), + }, err +} + // InsertRows creates a parquet file using the schema from the data, // then writes that file into the Snowflake table func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch service.MessageBatch) (InsertStats, error) { - stats := InsertStats{} - startTime := time.Now() - rows, err := constructRowGroup(batch, c.schema, c.transformers) - if err != nil { - return stats, err + insertStats := InsertStats{} + if len(batch) == 0 { + return insertStats, nil } - // Prevent multiple channels from having the same bdec file (it must be unique) - // so add the ID of the channel in the upper 16 bits and then get 48 bits of - // randomness outside that. + + startTime := time.Now() + // Prevent multiple channels from having the same bdec file (it must be globally unique) + // so add the ID of the channel in the upper 16 bits and then get 48 bits of randomness outside that. fakeThreadID := (int(c.ID) << 48) | rand.N(1<<48) blobPath := generateBlobPath(c.clientPrefix, fakeThreadID, int(c.requestIDCounter.Add(1))) // This is extra metadata that is required for functionality in snowflake. c.fileMetadata["primaryFileId"] = path.Base(blobPath) - c.buffer.Reset() - err = writeParquetFile(c.buffer, c.version, parquetFileData{ - schema: c.schema, - rows: rows, - metadata: c.fileMetadata, - }) + part, err := c.constructBdecPart(batch, c.fileMetadata) if err != nil { - return stats, err - } - unencrypted := c.buffer.Bytes() - metadata, err := readParquetMetadata(unencrypted) - if err != nil { - return stats, fmt.Errorf("unable to parse parquet metadata: %w", err) + return insertStats, fmt.Errorf("unable to construct output: %w", err) } if debug { - _ = os.WriteFile("latest_test.parquet", unencrypted, 0o644) + _ = os.WriteFile("latest_test.parquet", part.parquetFile, 0o644) } - unencryptedLen := len(unencrypted) - unencrypted = padBuffer(unencrypted, aes.BlockSize) - encrypted, err := encrypt(unencrypted, c.encryptionInfo.encryptionKey, blobPath, 0) + + unencrypted := padBuffer(part.parquetFile, aes.BlockSize) + part.parquetFile, err = encrypt(unencrypted, c.encryptionInfo.encryptionKey, blobPath, 0) if err != nil { - return stats, err + return insertStats, fmt.Errorf("unable to encrypt output: %w", err) } + uploadStartTime := time.Now() - fileMD5Hash := md5.Sum(encrypted) uploaderResult := c.uploader.Load() if uploaderResult.err != nil { - return stats, fmt.Errorf("failed to acquire stage uploader: %w", uploaderResult.err) + return insertStats, fmt.Errorf("failed to acquire stage uploader: %w", uploaderResult.err) } uploader := uploaderResult.uploader + fullMD5Hash := md5.Sum(part.parquetFile) err = backoff.Retry(func() error { - return uploader.upload(ctx, blobPath, encrypted, fileMD5Hash[:]) + return uploader.upload(ctx, blobPath, part.parquetFile, fullMD5Hash[:]) }, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), 3)) if err != nil { - return stats, err + return insertStats, fmt.Errorf("unable to upload to storage: %w", err) } uploadFinishTime := time.Now() - columnEpInfo := computeColumnEpInfo(c.transformers) resp, err := c.client.registerBlob(ctx, registerBlobRequest{ RequestID: c.nextRequestID(), Role: c.role, Blobs: []blobMetadata{ { Path: blobPath, - MD5: hex.EncodeToString(fileMD5Hash[:]), + MD5: hex.EncodeToString(fullMD5Hash[:]), BDECVersion: 3, BlobStats: blobStats{ FlushStartMs: startTime.UnixMilli(), @@ -350,15 +412,15 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic Schema: c.SchemaName, Table: c.TableName, ChunkStartOffset: 0, - ChunkLength: int32(unencryptedLen), - ChunkLengthUncompressed: totalUncompressedSize(metadata), - ChunkMD5: md5Hash(encrypted[:unencryptedLen]), + ChunkLength: int32(part.unencryptedLen), + ChunkLengthUncompressed: totalUncompressedSize(part.parquetMetadata), + ChunkMD5: md5Hash(part.parquetFile[:part.unencryptedLen]), EncryptionKeyID: c.encryptionInfo.encryptionKeyID, FirstInsertTimeInMillis: startTime.UnixMilli(), LastInsertTimeInMillis: startTime.UnixMilli(), EPS: &epInfo{ - Rows: metadata.NumRows, - Columns: columnEpInfo, + Rows: part.parquetMetadata.NumRows, + Columns: computeColumnEpInfo(c.transformers, part.stats), }, Channels: []channelMetadata{ { @@ -376,18 +438,18 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic }, }) if err != nil { - return stats, err + return insertStats, fmt.Errorf("registering output failed: %w", err) } if len(resp.Blobs) != 1 { - return stats, fmt.Errorf("unexpected number of response blobs: %d", len(resp.Blobs)) + return insertStats, fmt.Errorf("unexpected number of response blobs: %d", len(resp.Blobs)) } status := resp.Blobs[0] if len(status.Chunks) != 1 { - return stats, fmt.Errorf("unexpected number of response blob chunks: %d", len(status.Chunks)) + return insertStats, fmt.Errorf("unexpected number of response blob chunks: %d", len(status.Chunks)) } chunk := status.Chunks[0] if len(chunk.Channels) != 1 { - return stats, fmt.Errorf("unexpected number of channels for blob chunk: %d", len(chunk.Channels)) + return insertStats, fmt.Errorf("unexpected number of channels for blob chunk: %d", len(chunk.Channels)) } channel := chunk.Channels[0] if channel.StatusCode != responseSuccess { @@ -395,14 +457,16 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic if msg == "" { msg = "(no message)" } - return stats, fmt.Errorf("error response injesting data (%d): %s", channel.StatusCode, msg) + return insertStats, fmt.Errorf("error response injesting data (%d): %s", channel.StatusCode, msg) } c.rowSequencer++ c.clientSequencer = channel.ClientSequencer - stats.CompressedOutputSize = unencryptedLen - stats.BuildTime = uploadStartTime.Sub(startTime) - stats.UploadTime = uploadFinishTime.Sub(uploadStartTime) - return stats, err + insertStats.CompressedOutputSize = part.unencryptedLen + insertStats.BuildTime = uploadStartTime.Sub(startTime) + insertStats.UploadTime = uploadFinishTime.Sub(uploadStartTime) + insertStats.ConvertTime = part.convertTime + insertStats.SerializeTime = part.serializeTime + return insertStats, err } // WaitUntilCommitted waits until all the data in the channel has been committed diff --git a/internal/impl/snowflake/streaming/userdata_converter.go b/internal/impl/snowflake/streaming/userdata_converter.go index 48345694bb..d06cd289b9 100644 --- a/internal/impl/snowflake/streaming/userdata_converter.go +++ b/internal/impl/snowflake/streaming/userdata_converter.go @@ -11,12 +11,12 @@ package streaming import ( - "bytes" "encoding/json" "errors" "fmt" "time" "unicode/utf8" + "unsafe" "github.com/Jeffail/gabs/v2" "github.com/parquet-go/parquet-go" @@ -25,6 +25,8 @@ import ( "github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming/int128" ) +type typedBufferFactory func() typedBuffer + // typedBuffer is the buffer that holds columnar data before we write to the parquet file type typedBuffer interface { WriteNull() @@ -39,7 +41,6 @@ type typedBuffer interface { // the data that will be written - this buffer will not modify // the size of the data. Prepare(matrix []parquet.Value, columnIndex, rowWidth int) - Reset() } type typedBufferImpl struct { @@ -47,6 +48,13 @@ type typedBufferImpl struct { columnIndex int rowWidth int currentRow int + + // For int128 we don't make a bunch of small allocs, + // but append to this existing buffer a bunch, this + // saves GC pressure. We could optimize copies and + // reallocations, but this is simpler and seems to + // be effective for now. + scratch []byte } func (b *typedBufferImpl) WriteValue(v parquet.Value) { @@ -57,7 +65,8 @@ func (b *typedBufferImpl) WriteNull() { b.WriteValue(parquet.NullValue()) } func (b *typedBufferImpl) WriteInt128(v int128.Num) { - b.WriteValue(parquet.FixedLenByteArrayValue(v.ToBigEndian()).Level(0, 1, b.columnIndex)) + b.scratch = v.AppendBigEndian(b.scratch) + b.WriteValue(parquet.FixedLenByteArrayValue(b.scratch[len(b.scratch)-16:]).Level(0, 1, b.columnIndex)) } func (b *typedBufferImpl) WriteBool(v bool) { b.WriteValue(parquet.BooleanValue(v).Level(0, 1, b.columnIndex)) @@ -73,11 +82,13 @@ func (b *typedBufferImpl) Prepare(matrix []parquet.Value, columnIndex, rowWidth b.matrix = matrix b.columnIndex = columnIndex b.rowWidth = rowWidth -} -func (b *typedBufferImpl) Reset() { - b.Prepare(nil, 0, 0) + if b.scratch != nil { + b.scratch = b.scratch[:0] + } } +var defaultTypedBufferFactory = typedBufferFactory(func() typedBuffer { return &typedBufferImpl{} }) + type int64Buffer struct { typedBufferImpl } @@ -86,6 +97,8 @@ func (b *int64Buffer) WriteInt128(v int128.Num) { b.WriteValue(parquet.Int64Value(v.ToInt64()).Level(0, 1, b.columnIndex)) } +var int64TypedBufferFactory = typedBufferFactory(func() typedBuffer { return &int64Buffer{} }) + type int32Buffer struct { typedBufferImpl } @@ -98,6 +111,8 @@ type dataConverter interface { ValidateAndConvert(stats *statsBuffer, val any, buf typedBuffer) error } +var int32TypedBufferFactory = typedBufferFactory(func() typedBuffer { return &int32Buffer{} }) + var errNullValue = errors.New("unexpected null value") type boolConverter struct { @@ -121,14 +136,7 @@ func (c boolConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typed if v { i = int128.FromUint64(1) } - if stats.first { - stats.minIntVal = i - stats.maxIntVal = i - stats.first = false - } else { - stats.minIntVal = int128.Min(stats.minIntVal, i) - stats.maxIntVal = int128.Max(stats.maxIntVal, i) - } + stats.UpdateIntStats(i) buf.WriteBool(v) return nil } @@ -202,14 +210,7 @@ func (c numberConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typ if err != nil { return err } - if stats.first { - stats.minIntVal = v - stats.maxIntVal = v - stats.first = false - } else { - stats.minIntVal = int128.Min(stats.minIntVal, v) - stats.maxIntVal = int128.Max(stats.maxIntVal, v) - } + stats.UpdateIntStats(v) buf.WriteInt128(v) return nil } @@ -231,14 +232,7 @@ func (c doubleConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typ if err != nil { return err } - if stats.first { - stats.minRealVal = v - stats.maxRealVal = v - stats.first = false - } else { - stats.minRealVal = min(stats.minRealVal, v) - stats.maxRealVal = max(stats.maxRealVal, v) - } + stats.UpdateFloat64Stats(v) buf.WriteFloat64(v) return nil } @@ -258,9 +252,26 @@ func (c binaryConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typ buf.WriteNull() return nil } - v, err := bloblang.ValueAsBytes(val) - if err != nil { - return err + var v []byte + switch t := val.(type) { + case string: + if t != "" { + // We don't modify this byte slice at all, so this is safe to grab the bytes + // without making a copy. + // Also make sure this isn't an empty string because it's undefined what the + // value is. + v = unsafe.Slice(unsafe.StringData(t), len(t)) + } else { + v = []byte{} + } + case []byte: + v = t + default: + b, err := bloblang.ValueAsBytes(val) + if err != nil { + return err + } + v = b } if len(v) > c.maxLength { return fmt.Errorf("value too long, length: %d, max: %d", len(v), c.maxLength) @@ -268,20 +279,7 @@ func (c binaryConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typ if c.utf8 && !utf8.Valid(v) { return errors.New("invalid UTF8") } - if stats.first { - stats.minStrVal = v - stats.maxStrVal = v - stats.maxStrLen = len(v) - stats.first = false - } else { - if bytes.Compare(v, stats.minStrVal) < 0 { - stats.minStrVal = v - } - if bytes.Compare(v, stats.maxStrVal) > 0 { - stats.maxStrVal = v - } - stats.maxStrLen = max(stats.maxStrLen, len(v)) - } + stats.UpdateBytesStats(v) buf.WriteBytes(v) return nil } @@ -304,20 +302,7 @@ func (c jsonConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typed if len(v) > c.maxLength { return fmt.Errorf("value too long, length: %d, max: %d", len(v), c.maxLength) } - if stats.first { - stats.minStrVal = v - stats.maxStrVal = v - stats.maxStrLen = len(v) - stats.first = false - } else { - if bytes.Compare(v, stats.minStrVal) < 0 { - stats.minStrVal = v - } - if bytes.Compare(v, stats.maxStrVal) > 0 { - stats.maxStrVal = v - } - stats.maxStrLen = max(stats.maxStrLen, len(v)) - } + stats.UpdateBytesStats(v) buf.WriteBytes(v) return nil } @@ -398,14 +383,7 @@ func (c timestampConverter) ValidateAndConvert(stats *statsBuffer, val any, buf c.precision, ) } - if stats.first { - stats.minIntVal = v - stats.maxIntVal = v - stats.first = false - } else { - stats.minIntVal = int128.Min(stats.minIntVal, v) - stats.maxIntVal = int128.Max(stats.maxIntVal, v) - } + stats.UpdateIntStats(v) buf.WriteInt128(v) return nil } @@ -435,15 +413,7 @@ func (c timeConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typed t.Second()*int(time.Second.Nanoseconds()) + t.Nanosecond() v := int128.FromInt64(int64(nanos) / pow10TableInt64[9-c.scale]) - if stats.first { - stats.minIntVal = v - stats.maxIntVal = v - stats.first = false - } else { - stats.minIntVal = int128.Min(stats.minIntVal, v) - stats.maxIntVal = int128.Max(stats.maxIntVal, v) - } - // TODO(perf): consider switching to int64 buffers so more stuff can fit in cache + stats.UpdateIntStats(v) buf.WriteInt128(v) return nil } @@ -470,15 +440,7 @@ func (c dateConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typed return fmt.Errorf("DATE columns out of range, year: %d", t.Year()) } v := int128.FromInt64(t.Unix() / int64(24*60*60)) - if stats.first { - stats.minIntVal = v - stats.maxIntVal = v - stats.first = false - } else { - stats.minIntVal = int128.Min(stats.minIntVal, v) - stats.maxIntVal = int128.Max(stats.maxIntVal, v) - } - // TODO(perf): consider switching to int64 buffers so more stuff can fit in cache + stats.UpdateIntStats(v) buf.WriteInt128(v) return nil }