Skip to content

Commit

Permalink
pqarrow/arrowutils: Add support for Struct and RunEndEncoded
Browse files Browse the repository at this point in the history
  • Loading branch information
metalmatze committed Nov 25, 2024
1 parent 2e79ac7 commit ab75300
Show file tree
Hide file tree
Showing 2 changed files with 245 additions and 9 deletions.
94 changes: 93 additions & 1 deletion pqarrow/arrowutils/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ func Take(ctx context.Context, r arrow.Record, indices *array.Int32) (arrow.Reco
// does not have these columns.
var customTake bool
for i := 0; i < int(r.NumCols()); i++ {
if r.Column(i).DataType().ID() == arrow.DICTIONARY || r.Column(i).DataType().ID() == arrow.LIST {
if r.Column(i).DataType().ID() == arrow.DICTIONARY ||
r.Column(i).DataType().ID() == arrow.RUN_END_ENCODED ||
r.Column(i).DataType().ID() == arrow.LIST ||
r.Column(i).DataType().ID() == arrow.STRUCT {
customTake = true
break
}
Expand Down Expand Up @@ -108,8 +111,12 @@ func Take(ctx context.Context, r arrow.Record, indices *array.Int32) (arrow.Reco
switch arr := r.Column(i).(type) {
case *array.Dictionary:
g.Go(func() error { return TakeDictColumn(ctx, arr, i, resArr, indices) })
case *array.RunEndEncoded:
g.Go(func() error { return TakeRunEndEncodedColumn(ctx, arr, i, resArr, indices) })
case *array.List:
g.Go(func() error { return TakeListColumn(ctx, arr, i, resArr, indices) })
case *array.Struct:
g.Go(func() error { return TakeStructColumn(ctx, arr, i, resArr, indices) })
default:
g.Go(func() error { return TakeColumn(ctx, col, i, resArr, indices) })
}
Expand Down Expand Up @@ -172,6 +179,7 @@ func TakeDictColumn(ctx context.Context, a *array.Dictionary, idx int, arr []arr
r.AppendNull()
continue
}
// TODO: Improve this by not copying actual values.
idxBuilder.Append(a.GetValueIndex(int(i)))
}

Expand All @@ -182,6 +190,51 @@ func TakeDictColumn(ctx context.Context, a *array.Dictionary, idx int, arr []arr
return nil
}

func TakeRunEndEncodedColumn(ctx context.Context, a *array.RunEndEncoded, idx int, arr []arrow.Array, indices *array.Int32) error {
expandedIndexBuilder := array.NewInt32Builder(compute.GetAllocator(ctx))
defer expandedIndexBuilder.Release()

dict := a.Values().(*array.Dictionary)
for i := 0; i < a.Len(); i++ {
if dict.IsNull(a.GetPhysicalIndex(i)) {
expandedIndexBuilder.AppendNull()
} else {
expandedIndexBuilder.Append(int32(dict.GetValueIndex(a.GetPhysicalIndex(i))))
}
}
expandedIndex := expandedIndexBuilder.NewInt32Array()
defer expandedIndex.Release()

expandedReorderedArr := make([]arrow.Array, 1)
if err := TakeColumn(ctx, expandedIndex, 0, expandedReorderedArr, indices); err != nil {
return err
}
expandedReordered := expandedReorderedArr[0].(*array.Int32)
defer expandedReordered.Release()

b := array.NewRunEndEncodedBuilder(
compute.GetAllocator(ctx), a.RunEndsArr().DataType(), a.Values().DataType(),
)
defer b.Release()
b.Reserve(indices.Len())

dictValues := dict.Dictionary().(*array.String)
for i := 0; i < expandedReordered.Len(); i++ {
if expandedReordered.IsNull(i) {
b.AppendNull()
continue
}
reorderedIndex := expandedReordered.Value(i)
v := dictValues.Value(int(reorderedIndex))
if err := b.AppendValueFromString(v); err != nil {
return err
}
}

arr[idx] = b.NewRunEndEncodedArray()
return nil
}

func TakeListColumn(ctx context.Context, a *array.List, idx int, arr []arrow.Array, indices *array.Int32) error {
r := array.NewBuilder(compute.GetAllocator(ctx), a.DataType()).(*array.ListBuilder)
valueBuilder, ok := r.ValueBuilder().(*array.BinaryDictionaryBuilder)
Expand Down Expand Up @@ -223,6 +276,45 @@ func TakeListColumn(ctx context.Context, a *array.List, idx int, arr []arrow.Arr
return nil
}

func TakeStructColumn(ctx context.Context, a *array.Struct, idx int, arr []arrow.Array, indices *array.Int32) error {
aType := a.Data().DataType().(*arrow.StructType)

cols := make([]arrow.Array, a.NumField())
names := make([]string, a.NumField())
defer func() {
for _, col := range cols {
if col != nil {
col.Release()
}
}
}()

for i := 0; i < a.NumField(); i++ {
names[i] = aType.Field(i).Name

switch f := a.Field(i).(type) {
case *array.RunEndEncoded:
err := TakeRunEndEncodedColumn(ctx, f, i, cols, indices)
if err != nil {
return err
}
default:
err := TakeColumn(ctx, f, i, cols, indices)
if err != nil {
return err
}
}
}

takeStruct, err := array.NewStructArray(cols, names)
if err != nil {
return err
}

arr[idx] = takeStruct
return nil
}

type multiColSorter struct {
indices *builder.OptInt32Builder
comparisons []comparator
Expand Down
160 changes: 152 additions & 8 deletions pqarrow/arrowutils/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sort"
"strings"
"testing"

"github.com/apache/arrow/go/v16/arrow"
Expand Down Expand Up @@ -281,8 +282,26 @@ func TestSortRecordBuilderReuse(t *testing.T) {
}

func TestReorderRecord(t *testing.T) {
readRunEndEncodedDictionary := func(arr *array.RunEndEncoded) string {
arrDict := arr.Values().(*array.Dictionary)
arrDictValues := arrDict.Dictionary().(*array.String)

values := make([]string, arr.Len())
for i := 0; i < arr.Len(); i++ {
physicalIndex := arr.GetPhysicalIndex(i)
if arrDict.IsNull(physicalIndex) {
values[i] = array.NullValueStr
continue
}
valueIndex := arrDict.GetValueIndex(physicalIndex)
values[i] = arrDictValues.Value(valueIndex)
}
return "[" + strings.Join(values, " ") + "]"
}

t.Run("Simple", func(t *testing.T) {
mem := memory.NewGoAllocator()
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
b := array.NewRecordBuilder(mem, arrow.NewSchema(
[]arrow.Field{
{
Expand All @@ -299,16 +318,17 @@ func TestReorderRecord(t *testing.T) {
indices := array.NewInt32Builder(mem)
indices.AppendValues([]int32{2, 1, 0}, nil)
by := indices.NewInt32Array()
result, err := Take(
compute.WithAllocator(context.Background(), mem), r, by)
defer by.Release()
result, err := Take(compute.WithAllocator(context.Background(), mem), r, by)
require.Nil(t, err)
defer result.Release()

want := []int64{1, 2, 3}
require.Equal(t, want, result.Column(0).(*array.Int64).Int64Values())
})
t.Run("WithStringDict", func(t *testing.T) {
mem := memory.NewGoAllocator()
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
b := array.NewRecordBuilder(mem, arrow.NewSchema(
[]arrow.Field{
{
Expand All @@ -332,7 +352,9 @@ func TestReorderRecord(t *testing.T) {

indices := array.NewInt32Builder(mem)
indices.AppendValues([]int32{2, 1, 4, 0, 3}, nil)
result, err := Take(compute.WithAllocator(context.Background(), mem), r, indices.NewInt32Array())
by := indices.NewInt32Array()
defer by.Release()
result, err := Take(compute.WithAllocator(context.Background(), mem), r, by)
require.NoError(t, err)
defer result.Release()

Expand All @@ -347,8 +369,56 @@ func TestReorderRecord(t *testing.T) {
require.Equal(t, want[i], got.ValueStr(i))
}
})
t.Run("RunEndEncoded", func(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)

b := array.NewRecordBuilder(mem, arrow.NewSchema(
[]arrow.Field{
{
Name: "ree",
Type: arrow.RunEndEncodedOf(
arrow.PrimitiveTypes.Int32,
&arrow.DictionaryType{
IndexType: arrow.PrimitiveTypes.Uint32,
ValueType: arrow.BinaryTypes.String,
}),
},
}, nil,
))
defer b.Release()

ree := b.Field(0).(*array.RunEndEncodedBuilder)
require.NoError(t, ree.AppendValueFromString("3"))
require.NoError(t, ree.AppendValueFromString("2"))
require.NoError(t, ree.AppendValueFromString("1"))
ree.AppendNull()
require.NoError(t, ree.AppendValueFromString("3"))
r := b.NewRecord()
defer r.Release()

indices := array.NewInt32Builder(mem)
indices.AppendValues([]int32{2, 1, 4, 0, 3}, nil)
by := indices.NewInt32Array()
defer by.Release()

// Reordering

result, err := Take(compute.WithAllocator(context.Background(), mem), r, by)
require.NoError(t, err)
defer result.Release()

// Testing

sorted := result.Column(0).(*array.RunEndEncoded)
sortedEnds := sorted.RunEndsArr().(*array.Int32)
// notice how the index to 3 is runEndEncoded
require.Equal(t, "[1 2 4 5]", sortedEnds.String())
require.Equal(t, "[1 2 3 3 (null)]", readRunEndEncodedDictionary(sorted))
})
t.Run("WithFixedSizeBinaryDict", func(t *testing.T) {
mem := memory.NewGoAllocator()
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
b := array.NewRecordBuilder(mem, arrow.NewSchema(
[]arrow.Field{
{
Expand All @@ -372,7 +442,9 @@ func TestReorderRecord(t *testing.T) {

indices := array.NewInt32Builder(mem)
indices.AppendValues([]int32{2, 1, 4, 0, 3}, nil)
result, err := Take(compute.WithAllocator(context.Background(), mem), r, indices.NewInt32Array())
by := indices.NewInt32Array()
defer by.Release()
result, err := Take(compute.WithAllocator(context.Background(), mem), r, by)
require.NoError(t, err)
defer result.Release()

Expand All @@ -389,6 +461,8 @@ func TestReorderRecord(t *testing.T) {
})
t.Run("List", func(t *testing.T) {
mem := memory.NewGoAllocator()
// mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
// defer mem.AssertSize(t, 0)
b := array.NewRecordBuilder(mem, arrow.NewSchema(
[]arrow.Field{
{
Expand Down Expand Up @@ -420,8 +494,10 @@ func TestReorderRecord(t *testing.T) {

indices := array.NewInt32Builder(mem)
indices.AppendValues([]int32{2, 1, 0, 3}, nil)
by := indices.NewInt32Array()
defer by.Release()
result, err := Take(
compute.WithAllocator(context.Background(), mem), r, indices.NewInt32Array())
compute.WithAllocator(context.Background(), mem), r, by)
require.Nil(t, err)
defer result.Release()

Expand All @@ -441,6 +517,74 @@ func TestReorderRecord(t *testing.T) {
require.Equal(t, expected[i], got.ValueStr(i), "unexpected value at %d", i)
}
})
t.Run("Struct", func(t *testing.T) {
LabelArrowType := arrow.RunEndEncodedOf(
arrow.PrimitiveTypes.Int32,
&arrow.DictionaryType{
IndexType: arrow.PrimitiveTypes.Uint32,
ValueType: arrow.BinaryTypes.String,
},
)

mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)

b := array.NewRecordBuilder(mem, arrow.NewSchema(
[]arrow.Field{
{
Name: "struct",
Type: arrow.StructOf(
arrow.Field{Name: "first", Type: LabelArrowType, Nullable: true},
arrow.Field{Name: "second", Type: LabelArrowType, Nullable: true},
arrow.Field{Name: "third", Type: arrow.PrimitiveTypes.Int64, Nullable: true},
),
},
}, &arrow.Metadata{},
))
defer b.Release()

sb := b.Field(0).(*array.StructBuilder)
firstFieldBuilder := sb.FieldBuilder(0).(*array.RunEndEncodedBuilder)
secondFieldBuilder := sb.FieldBuilder(1).(*array.RunEndEncodedBuilder)
thirdFieldBuilder := sb.FieldBuilder(2).(*array.Int64Builder)

sb.Append(true)
require.NoError(t, firstFieldBuilder.AppendValueFromString("3"))
require.NoError(t, secondFieldBuilder.AppendValueFromString("1"))
thirdFieldBuilder.Append(1)
sb.Append(true)
require.NoError(t, firstFieldBuilder.AppendValueFromString("2"))
require.NoError(t, secondFieldBuilder.AppendValueFromString("2"))
thirdFieldBuilder.Append(2)
sb.Append(true)
require.NoError(t, firstFieldBuilder.AppendValueFromString("1"))
require.NoError(t, secondFieldBuilder.AppendValueFromString("3"))
thirdFieldBuilder.Append(3)
sb.Append(true)
firstFieldBuilder.AppendNull()
require.NoError(t, secondFieldBuilder.AppendValueFromString("4"))
thirdFieldBuilder.Append(4)
sb.Append(true)
require.NoError(t, firstFieldBuilder.AppendValueFromString("3"))
require.NoError(t, secondFieldBuilder.AppendValueFromString("5"))
thirdFieldBuilder.Append(5)

r := b.NewRecord()
defer r.Release()

indices := array.NewInt32Builder(mem)
indices.AppendValues([]int32{2, 1, 4, 0, 3}, nil)
by := indices.NewInt32Array()
defer by.Release()
result, err := Take(compute.WithAllocator(context.Background(), mem), r, by)
require.Nil(t, err)
defer result.Release()
resultStruct := result.Column(0).(*array.Struct)

require.Equal(t, "[1 2 3 3 (null)]", readRunEndEncodedDictionary(resultStruct.Field(0).(*array.RunEndEncoded)))
require.Equal(t, "[3 2 5 1 4]", readRunEndEncodedDictionary(resultStruct.Field(1).(*array.RunEndEncoded)))
require.Equal(t, "[3 2 5 1 4]", resultStruct.Field(2).(*array.Int64).String())
})
}

// Use all supported sort field.
Expand Down

0 comments on commit ab75300

Please sign in to comment.