From 880463c31132c58355eea71c56d24eac3bfa96e0 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Fri, 20 Sep 2024 23:13:00 +0100 Subject: [PATCH 1/2] Fix a panic in the `parquet_encode` processor Fixes #2885. Signed-off-by: Mihai Todor --- CHANGELOG.md | 4 ++ internal/impl/parquet/processor_encode.go | 1 + .../impl/parquet/processor_encode_test.go | 57 +++++++++++++++++++ 3 files changed, 62 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a0d91904d..2333aa79f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,10 @@ All notable changes to this project will be documented in this file. - New experimental `questdb` output. (@sklarsa) - Field `metadata_max_age` added to the `kafka_franz` input. (@Scarjit) +### Fixed + +- Fixes a panic in the `parquet_encode` processor (@mihaitodor) + ## 4.36.0 - 2024-09-11 ### Added diff --git a/internal/impl/parquet/processor_encode.go b/internal/impl/parquet/processor_encode.go index 821ecc6b1e..bae51d289c 100644 --- a/internal/impl/parquet/processor_encode.go +++ b/internal/impl/parquet/processor_encode.go @@ -270,6 +270,7 @@ func (s *parquetEncodeProcessor) ProcessBatch(ctx context.Context, batch service buf := bytes.NewBuffer(nil) pWtr := parquet.NewGenericWriter[any](buf, s.schema, parquet.Compression(s.compressionType)) + batch = batch.Copy() rows := make([]any, len(batch)) for i, m := range batch { ms, err := m.AsStructured() diff --git a/internal/impl/parquet/processor_encode_test.go b/internal/impl/parquet/processor_encode_test.go index 4e9d64cb5e..45c9642082 100644 --- a/internal/impl/parquet/processor_encode_test.go +++ b/internal/impl/parquet/processor_encode_test.go @@ -18,7 +18,9 @@ import ( "bytes" "context" "encoding/json" + "fmt" "io" + "sync" "testing" "github.com/parquet-go/parquet-go" @@ -383,3 +385,58 @@ func TestParquetEncodeProcessor(t *testing.T) { assert.JSONEq(t, string(expectedBytes), string(actualBytes)) }) } + +func TestParquetEncodeParallel(t *testing.T) { + encodeConf, err := parquetEncodeProcessorConfig().ParseYAML(` +schema: + - { name: id, type: INT64 } + - { name: as, type: DOUBLE, repeated: true } + - { name: b, type: BYTE_ARRAY } + - { name: c, type: DOUBLE } + - { name: d, type: BOOLEAN } + - { name: e, type: INT64, optional: true } + - { name: f, type: INT64 } + - { name: g, type: UTF8 } + - name: nested_stuff + optional: true + fields: + - { name: a_stuff, type: BYTE_ARRAY } + - { name: b_stuff, type: BYTE_ARRAY } +`, nil) + require.NoError(t, err) + + encodeProc, err := newParquetEncodeProcessorFromConfig(encodeConf, nil) + require.NoError(t, err) + + inBatch := service.MessageBatch{ + service.NewMessage([]byte(`{ + "id": 3, + "as": [ 0.1, 0.2, 0.3, 0.4 ], + "b": "hello world basic values", + "c": 0.5, + "d": true, + "e": 6, + "f": 7, + "g": "logical string represent", + "nested_stuff": { + "a_stuff": "a value", + "b_stuff": "b value" + }, + "canary":"not in schema" +}`)), + } + + wg := sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + t.Run(fmt.Sprintf("iteration %d", i), func(t *testing.T) { + defer wg.Done() + + encodedBatches, err := encodeProc.ProcessBatch(context.Background(), inBatch) + require.NoError(t, err) + require.Len(t, encodedBatches, 1) + require.Len(t, encodedBatches[0], 1) + }) + } + wg.Wait() +} From a385b7cfa43ba5fd58997518124caaaed9b71d7d Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Mon, 23 Sep 2024 04:56:33 -0700 Subject: [PATCH 2/2] Update internal/impl/parquet/processor_encode.go Co-authored-by: Ashley Jeffs --- internal/impl/parquet/processor_encode.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/impl/parquet/processor_encode.go b/internal/impl/parquet/processor_encode.go index bae51d289c..0ef0a268f3 100644 --- a/internal/impl/parquet/processor_encode.go +++ b/internal/impl/parquet/processor_encode.go @@ -273,7 +273,7 @@ func (s *parquetEncodeProcessor) ProcessBatch(ctx context.Context, batch service batch = batch.Copy() rows := make([]any, len(batch)) for i, m := range batch { - ms, err := m.AsStructured() + ms, err := m.AsStructuredMut() if err != nil { return nil, err }