Skip to content

Commit

Permalink
Merge pull request #2888 from redpanda-data/mihaitodor-fix-panic-parq…
Browse files Browse the repository at this point in the history
…uet-encode-processor

Fix a panic in the `parquet_encode` processor
  • Loading branch information
Jeffail authored Sep 25, 2024
2 parents 02a9e11 + a385b7c commit ddc9c67
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ All notable changes to this project will be documented in this file.
- Fixed a bug in the `kafka_migrator` input which could lead to extra duplicate messages during a consumer group rebalance. (@mihaitodor)
- `kafka_migrator`, `kafka_migrator_offsets` and `kafka_migrator_bundle` components renamed to `redpanda_migrator`, `redpanda_migrator_offsets` and `redpanda_migrator_bundle` (@mihaitodor)

### Fixed

- Fixes a panic in the `parquet_encode` processor (@mihaitodor)

## 4.36.0 - 2024-09-11

### Added
Expand Down
3 changes: 2 additions & 1 deletion internal/impl/parquet/processor_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,10 @@ func (s *parquetEncodeProcessor) ProcessBatch(ctx context.Context, batch service
buf := bytes.NewBuffer(nil)
pWtr := parquet.NewGenericWriter[any](buf, s.schema, parquet.Compression(s.compressionType))

batch = batch.Copy()
rows := make([]any, len(batch))
for i, m := range batch {
ms, err := m.AsStructured()
ms, err := m.AsStructuredMut()
if err != nil {
return nil, err
}
Expand Down
57 changes: 57 additions & 0 deletions internal/impl/parquet/processor_encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"sync"
"testing"

"github.com/parquet-go/parquet-go"
Expand Down Expand Up @@ -383,3 +385,58 @@ func TestParquetEncodeProcessor(t *testing.T) {
assert.JSONEq(t, string(expectedBytes), string(actualBytes))
})
}

func TestParquetEncodeParallel(t *testing.T) {
encodeConf, err := parquetEncodeProcessorConfig().ParseYAML(`
schema:
- { name: id, type: INT64 }
- { name: as, type: DOUBLE, repeated: true }
- { name: b, type: BYTE_ARRAY }
- { name: c, type: DOUBLE }
- { name: d, type: BOOLEAN }
- { name: e, type: INT64, optional: true }
- { name: f, type: INT64 }
- { name: g, type: UTF8 }
- name: nested_stuff
optional: true
fields:
- { name: a_stuff, type: BYTE_ARRAY }
- { name: b_stuff, type: BYTE_ARRAY }
`, nil)
require.NoError(t, err)

encodeProc, err := newParquetEncodeProcessorFromConfig(encodeConf, nil)
require.NoError(t, err)

inBatch := service.MessageBatch{
service.NewMessage([]byte(`{
"id": 3,
"as": [ 0.1, 0.2, 0.3, 0.4 ],
"b": "hello world basic values",
"c": 0.5,
"d": true,
"e": 6,
"f": 7,
"g": "logical string represent",
"nested_stuff": {
"a_stuff": "a value",
"b_stuff": "b value"
},
"canary":"not in schema"
}`)),
}

wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
t.Run(fmt.Sprintf("iteration %d", i), func(t *testing.T) {
defer wg.Done()

encodedBatches, err := encodeProc.ProcessBatch(context.Background(), inBatch)
require.NoError(t, err)
require.Len(t, encodedBatches, 1)
require.Len(t, encodedBatches[0], 1)
})
}
wg.Wait()
}

0 comments on commit ddc9c67

Please sign in to comment.