Skip to content

Commit

Permalink
AppendValueToArrowBuilder: leave single implementation (#235)
Browse files Browse the repository at this point in the history
  • Loading branch information
ninaiad authored Feb 7, 2025
1 parent 8fd57d0 commit 01510af
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 386 deletions.
83 changes: 21 additions & 62 deletions app/server/datasource/rdbms/clickhouse/acceptor_appender.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package clickhouse

import (
"errors"
"fmt"
"time"

"github.com/apache/arrow/go/v13/arrow/array"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"

"github.com/ydb-platform/fq-connector-go/app/server/conversion"
"github.com/ydb-platform/fq-connector-go/app/server/utils"
"github.com/ydb-platform/fq-connector-go/common"
)

Expand All @@ -28,41 +28,41 @@ func addAcceptorAppenderFromSQLTypeName(
switch {
case typeName == typeBool:
acceptors = append(acceptors, new(bool))
appenders = append(appenders, makeAppender[bool, uint8, *array.Uint8Builder](cc.Bool()))
appenders = append(appenders, utils.MakeAppender[bool, uint8, *array.Uint8Builder](cc.Bool()))
case typeName == typeInt8:
acceptors = append(acceptors, new(int8))
appenders = append(appenders, makeAppender[int8, int8, *array.Int8Builder](cc.Int8()))
appenders = append(appenders, utils.MakeAppender[int8, int8, *array.Int8Builder](cc.Int8()))
case typeName == typeInt16:
acceptors = append(acceptors, new(int16))
appenders = append(appenders, makeAppender[int16, int16, *array.Int16Builder](cc.Int16()))
appenders = append(appenders, utils.MakeAppender[int16, int16, *array.Int16Builder](cc.Int16()))
case typeName == typeInt32:
acceptors = append(acceptors, new(int32))
appenders = append(appenders, makeAppender[int32, int32, *array.Int32Builder](cc.Int32()))
appenders = append(appenders, utils.MakeAppender[int32, int32, *array.Int32Builder](cc.Int32()))
case typeName == typeInt64:
acceptors = append(acceptors, new(int64))
appenders = append(appenders, makeAppender[int64, int64, *array.Int64Builder](cc.Int64()))
appenders = append(appenders, utils.MakeAppender[int64, int64, *array.Int64Builder](cc.Int64()))
case typeName == typeUInt8:
acceptors = append(acceptors, new(uint8))
appenders = append(appenders, makeAppender[uint8, uint8, *array.Uint8Builder](cc.Uint8()))
appenders = append(appenders, utils.MakeAppender[uint8, uint8, *array.Uint8Builder](cc.Uint8()))
case typeName == typeUInt16:
acceptors = append(acceptors, new(uint16))
appenders = append(appenders, makeAppender[uint16, uint16, *array.Uint16Builder](cc.Uint16()))
appenders = append(appenders, utils.MakeAppender[uint16, uint16, *array.Uint16Builder](cc.Uint16()))
case typeName == typeUInt32:
acceptors = append(acceptors, new(uint32))
appenders = append(appenders, makeAppender[uint32, uint32, *array.Uint32Builder](cc.Uint32()))
appenders = append(appenders, utils.MakeAppender[uint32, uint32, *array.Uint32Builder](cc.Uint32()))
case typeName == typeUInt64:
acceptors = append(acceptors, new(uint64))
appenders = append(appenders, makeAppender[uint64, uint64, *array.Uint64Builder](cc.Uint64()))
appenders = append(appenders, utils.MakeAppender[uint64, uint64, *array.Uint64Builder](cc.Uint64()))
case typeName == typeFloat32:
acceptors = append(acceptors, new(float32))
appenders = append(appenders, makeAppender[float32, float32, *array.Float32Builder](cc.Float32()))
appenders = append(appenders, utils.MakeAppender[float32, float32, *array.Float32Builder](cc.Float32()))
case typeName == typeFload64:
acceptors = append(acceptors, new(float64))
appenders = append(appenders, makeAppender[float64, float64, *array.Float64Builder](cc.Float64()))
appenders = append(appenders, utils.MakeAppender[float64, float64, *array.Float64Builder](cc.Float64()))
case typeName == typeString, tm.isFixedString.MatchString(typeName):
// Looks like []byte would be a better option here, but clickhouse driver prefers string
acceptors = append(acceptors, new(string))
appenders = append(appenders, makeAppender[string, []byte, *array.BinaryBuilder](cc.StringToBytes()))
appenders = append(appenders, utils.MakeAppender[string, []byte, *array.BinaryBuilder](cc.StringToBytes()))
case typeName == typeDate:
acceptors = append(acceptors, new(time.Time))

Expand All @@ -74,9 +74,9 @@ func addAcceptorAppenderFromSQLTypeName(
switch ydbTypeID {
case Ydb.Type_UTF8:
appenders = append(appenders,
makeAppender[time.Time, string, *array.StringBuilder](dateToStringConverter{conv: cc.DateToString()}))
utils.MakeAppender[time.Time, string, *array.StringBuilder](dateToStringConverter{conv: cc.DateToString()}))
case Ydb.Type_DATE:
appenders = append(appenders, makeAppender[time.Time, uint16, *array.Uint16Builder](cc.Date()))
appenders = append(appenders, utils.MakeAppender[time.Time, uint16, *array.Uint16Builder](cc.Date()))
default:
return nil, nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbType, typeName, common.ErrDataTypeNotSupported)
}
Expand All @@ -91,9 +91,9 @@ func addAcceptorAppenderFromSQLTypeName(
switch ydbTypeID {
case Ydb.Type_UTF8:
appenders = append(appenders,
makeAppender[time.Time, string, *array.StringBuilder](date32ToStringConverter{conv: cc.DateToString()}))
utils.MakeAppender[time.Time, string, *array.StringBuilder](date32ToStringConverter{conv: cc.DateToString()}))
case Ydb.Type_DATE:
appenders = append(appenders, makeAppender[time.Time, uint16, *array.Uint16Builder](cc.Date()))
appenders = append(appenders, utils.MakeAppender[time.Time, uint16, *array.Uint16Builder](cc.Date()))
default:
return nil, nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbType, typeName, common.ErrDataTypeNotSupported)
}
Expand All @@ -108,9 +108,9 @@ func addAcceptorAppenderFromSQLTypeName(
switch ydbTypeID {
case Ydb.Type_UTF8:
appenders = append(appenders,
makeAppender[time.Time, string, *array.StringBuilder](dateTime64ToStringConverter{conv: cc.TimestampToString(true)}))
utils.MakeAppender[time.Time, string, *array.StringBuilder](dateTime64ToStringConverter{conv: cc.TimestampToString(true)}))
case Ydb.Type_TIMESTAMP:
appenders = append(appenders, makeAppender[time.Time, uint64, *array.Uint64Builder](cc.Timestamp()))
appenders = append(appenders, utils.MakeAppender[time.Time, uint64, *array.Uint64Builder](cc.Timestamp()))
default:
return nil, nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbType, typeName, common.ErrDataTypeNotSupported)
}
Expand All @@ -125,9 +125,9 @@ func addAcceptorAppenderFromSQLTypeName(
switch ydbTypeID {
case Ydb.Type_UTF8:
appenders = append(appenders,
makeAppender[time.Time, string, *array.StringBuilder](dateTimeToStringConverter{conv: cc.DatetimeToString()}))
utils.MakeAppender[time.Time, string, *array.StringBuilder](dateTimeToStringConverter{conv: cc.DatetimeToString()}))
case Ydb.Type_DATETIME:
appenders = append(appenders, makeAppender[time.Time, uint32, *array.Uint32Builder](cc.Datetime()))
appenders = append(appenders, utils.MakeAppender[time.Time, uint32, *array.Uint32Builder](cc.Datetime()))
default:
return nil, nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbType, typeName, common.ErrDataTypeNotSupported)
}
Expand All @@ -137,44 +137,3 @@ func addAcceptorAppenderFromSQLTypeName(

return acceptors, appenders, nil
}

func makeAppender[
IN common.ValueType,
OUT common.ValueType,
AB common.ArrowBuilder[OUT],
](conv conversion.ValuePtrConverter[IN, OUT]) func(acceptor any, builder array.Builder) error {
return func(acceptor any, builder array.Builder) error {
return appendValueToArrowBuilder[IN, OUT, AB](acceptor, builder, conv)
}
}

func appendValueToArrowBuilder[IN common.ValueType, OUT common.ValueType, AB common.ArrowBuilder[OUT]](
acceptor any,
builder array.Builder,
conv conversion.ValuePtrConverter[IN, OUT],
) error {
cast := acceptor.(*IN)

if cast == nil {
builder.AppendNull()

return nil
}

out, err := conv.Convert(cast)
if err != nil {
if errors.Is(err, common.ErrValueOutOfTypeBounds) {
// TODO: write warning to logger
builder.AppendNull()

return nil
}

return fmt.Errorf("convert value %v: %w", *cast, err)
}

//nolint:forcetypeassert
builder.(AB).Append(out)

return nil
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package clickhouse

import (
"errors"
"fmt"
"time"

"github.com/apache/arrow/go/v13/arrow/array"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"

"github.com/ydb-platform/fq-connector-go/app/server/conversion"
"github.com/ydb-platform/fq-connector-go/app/server/utils"
"github.com/ydb-platform/fq-connector-go/common"
)

Expand All @@ -28,41 +28,41 @@ func addAcceptorAppenderFromSQLTypeNameNullable(
switch {
case typeName == typeBool:
acceptors = append(acceptors, new(*bool))
appenders = append(appenders, makeAppenderNullable[bool, uint8, *array.Uint8Builder](cc.Bool()))
appenders = append(appenders, utils.MakeAppenderNullable[bool, uint8, *array.Uint8Builder](cc.Bool()))
case typeName == typeInt8:
acceptors = append(acceptors, new(*int8))
appenders = append(appenders, makeAppenderNullable[int8, int8, *array.Int8Builder](cc.Int8()))
appenders = append(appenders, utils.MakeAppenderNullable[int8, int8, *array.Int8Builder](cc.Int8()))
case typeName == typeInt16:
acceptors = append(acceptors, new(*int16))
appenders = append(appenders, makeAppenderNullable[int16, int16, *array.Int16Builder](cc.Int16()))
appenders = append(appenders, utils.MakeAppenderNullable[int16, int16, *array.Int16Builder](cc.Int16()))
case typeName == typeInt32:
acceptors = append(acceptors, new(*int32))
appenders = append(appenders, makeAppenderNullable[int32, int32, *array.Int32Builder](cc.Int32()))
appenders = append(appenders, utils.MakeAppenderNullable[int32, int32, *array.Int32Builder](cc.Int32()))
case typeName == typeInt64:
acceptors = append(acceptors, new(*int64))
appenders = append(appenders, makeAppenderNullable[int64, int64, *array.Int64Builder](cc.Int64()))
appenders = append(appenders, utils.MakeAppenderNullable[int64, int64, *array.Int64Builder](cc.Int64()))
case typeName == typeUInt8:
acceptors = append(acceptors, new(*uint8))
appenders = append(appenders, makeAppenderNullable[uint8, uint8, *array.Uint8Builder](cc.Uint8()))
appenders = append(appenders, utils.MakeAppenderNullable[uint8, uint8, *array.Uint8Builder](cc.Uint8()))
case typeName == typeUInt16:
acceptors = append(acceptors, new(*uint16))
appenders = append(appenders, makeAppenderNullable[uint16, uint16, *array.Uint16Builder](cc.Uint16()))
appenders = append(appenders, utils.MakeAppenderNullable[uint16, uint16, *array.Uint16Builder](cc.Uint16()))
case typeName == typeUInt32:
acceptors = append(acceptors, new(*uint32))
appenders = append(appenders, makeAppenderNullable[uint32, uint32, *array.Uint32Builder](cc.Uint32()))
appenders = append(appenders, utils.MakeAppenderNullable[uint32, uint32, *array.Uint32Builder](cc.Uint32()))
case typeName == typeUInt64:
acceptors = append(acceptors, new(*uint64))
appenders = append(appenders, makeAppenderNullable[uint64, uint64, *array.Uint64Builder](cc.Uint64()))
appenders = append(appenders, utils.MakeAppenderNullable[uint64, uint64, *array.Uint64Builder](cc.Uint64()))
case typeName == typeFloat32:
acceptors = append(acceptors, new(*float32))
appenders = append(appenders, makeAppenderNullable[float32, float32, *array.Float32Builder](cc.Float32()))
appenders = append(appenders, utils.MakeAppenderNullable[float32, float32, *array.Float32Builder](cc.Float32()))
case typeName == typeFload64:
acceptors = append(acceptors, new(*float64))
appenders = append(appenders, makeAppenderNullable[float64, float64, *array.Float64Builder](cc.Float64()))
appenders = append(appenders, utils.MakeAppenderNullable[float64, float64, *array.Float64Builder](cc.Float64()))
case typeName == typeString, tm.isFixedString.MatchString(typeName):
// Looks like []byte would be a better option here, but clickhouse driver prefers string
acceptors = append(acceptors, new(*string))
appenders = append(appenders, makeAppenderNullable[string, []byte, *array.BinaryBuilder](cc.StringToBytes()))
appenders = append(appenders, utils.MakeAppenderNullable[string, []byte, *array.BinaryBuilder](cc.StringToBytes()))
case typeName == typeDate:
acceptors = append(acceptors, new(*time.Time))

Expand All @@ -74,9 +74,9 @@ func addAcceptorAppenderFromSQLTypeNameNullable(
switch ydbTypeID {
case Ydb.Type_UTF8:
appenders = append(appenders,
makeAppenderNullable[time.Time, string, *array.StringBuilder](dateToStringConverter{conv: cc.DateToString()}))
utils.MakeAppenderNullable[time.Time, string, *array.StringBuilder](dateToStringConverter{conv: cc.DateToString()}))
case Ydb.Type_DATE:
appenders = append(appenders, makeAppenderNullable[time.Time, uint16, *array.Uint16Builder](cc.Date()))
appenders = append(appenders, utils.MakeAppenderNullable[time.Time, uint16, *array.Uint16Builder](cc.Date()))
default:
return nil, nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbType, typeName, common.ErrDataTypeNotSupported)
}
Expand All @@ -91,9 +91,9 @@ func addAcceptorAppenderFromSQLTypeNameNullable(
switch ydbTypeID {
case Ydb.Type_UTF8:
appenders = append(appenders,
makeAppenderNullable[time.Time, string, *array.StringBuilder](date32ToStringConverter{conv: cc.DateToString()}))
utils.MakeAppenderNullable[time.Time, string, *array.StringBuilder](date32ToStringConverter{conv: cc.DateToString()}))
case Ydb.Type_DATE:
appenders = append(appenders, makeAppenderNullable[time.Time, uint16, *array.Uint16Builder](cc.Date()))
appenders = append(appenders, utils.MakeAppenderNullable[time.Time, uint16, *array.Uint16Builder](cc.Date()))
default:
return nil, nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbType, typeName, common.ErrDataTypeNotSupported)
}
Expand All @@ -108,10 +108,10 @@ func addAcceptorAppenderFromSQLTypeNameNullable(
switch ydbTypeID {
case Ydb.Type_UTF8:
appenders = append(appenders,
makeAppenderNullable[time.Time, string, *array.StringBuilder](
utils.MakeAppenderNullable[time.Time, string, *array.StringBuilder](
dateTime64ToStringConverter{conv: cc.TimestampToString(true)}))
case Ydb.Type_TIMESTAMP:
appenders = append(appenders, makeAppenderNullable[time.Time, uint64, *array.Uint64Builder](cc.Timestamp()))
appenders = append(appenders, utils.MakeAppenderNullable[time.Time, uint64, *array.Uint64Builder](cc.Timestamp()))
default:
return nil, nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbType, typeName, common.ErrDataTypeNotSupported)
}
Expand All @@ -126,9 +126,9 @@ func addAcceptorAppenderFromSQLTypeNameNullable(
switch ydbTypeID {
case Ydb.Type_UTF8:
appenders = append(appenders,
makeAppenderNullable[time.Time, string, *array.StringBuilder](dateTimeToStringConverter{conv: cc.DatetimeToString()}))
utils.MakeAppenderNullable[time.Time, string, *array.StringBuilder](dateTimeToStringConverter{conv: cc.DatetimeToString()}))
case Ydb.Type_DATETIME:
appenders = append(appenders, makeAppenderNullable[time.Time, uint32, *array.Uint32Builder](cc.Datetime()))
appenders = append(appenders, utils.MakeAppenderNullable[time.Time, uint32, *array.Uint32Builder](cc.Datetime()))
default:
return nil, nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbType, typeName, common.ErrDataTypeNotSupported)
}
Expand All @@ -138,50 +138,3 @@ func addAcceptorAppenderFromSQLTypeNameNullable(

return acceptors, appenders, nil
}

func makeAppenderNullable[
IN common.ValueType,
OUT common.ValueType,
AB common.ArrowBuilder[OUT],
](conv conversion.ValuePtrConverter[IN, OUT]) func(acceptor any, builder array.Builder) error {
return func(acceptor any, builder array.Builder) error {
return appendValueToArrowBuilderNullable[IN, OUT, AB](acceptor, builder, conv)
}
}

func appendValueToArrowBuilderNullable[IN common.ValueType, OUT common.ValueType, AB common.ArrowBuilder[OUT]](
acceptor any,
builder array.Builder,
conv conversion.ValuePtrConverter[IN, OUT],
) error {
cast := acceptor.(**IN)

if *cast == nil {
builder.AppendNull()

return nil
}

value := *cast

out, err := conv.Convert(value)
if err != nil {
if errors.Is(err, common.ErrValueOutOfTypeBounds) {
// TODO: write warning to logger
builder.AppendNull()

return nil
}

return fmt.Errorf("convert value %v: %w", value, err)
}

//nolint:forcetypeassert
builder.(AB).Append(out)

// Without that ClickHouse native driver would return invalid values for NULLABLE(bool) columns;
// TODO: research it.
*cast = nil

return nil
}
Loading

0 comments on commit 01510af

Please sign in to comment.