diff --git a/app/server/datasource/rdbms/clickhouse/acceptor_appender.go b/app/server/datasource/rdbms/clickhouse/acceptor_appender.go index 3b03ae31..ff71a44c 100644 --- a/app/server/datasource/rdbms/clickhouse/acceptor_appender.go +++ b/app/server/datasource/rdbms/clickhouse/acceptor_appender.go @@ -1,7 +1,6 @@ package clickhouse import ( - "errors" "fmt" "time" @@ -9,6 +8,7 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "github.com/ydb-platform/fq-connector-go/app/server/conversion" + "github.com/ydb-platform/fq-connector-go/app/server/utils" "github.com/ydb-platform/fq-connector-go/common" ) @@ -28,41 +28,41 @@ func addAcceptorAppenderFromSQLTypeName( switch { case typeName == typeBool: acceptors = append(acceptors, new(bool)) - appenders = append(appenders, makeAppender[bool, uint8, *array.Uint8Builder](cc.Bool())) + appenders = append(appenders, utils.MakeAppender[bool, uint8, *array.Uint8Builder](cc.Bool())) case typeName == typeInt8: acceptors = append(acceptors, new(int8)) - appenders = append(appenders, makeAppender[int8, int8, *array.Int8Builder](cc.Int8())) + appenders = append(appenders, utils.MakeAppender[int8, int8, *array.Int8Builder](cc.Int8())) case typeName == typeInt16: acceptors = append(acceptors, new(int16)) - appenders = append(appenders, makeAppender[int16, int16, *array.Int16Builder](cc.Int16())) + appenders = append(appenders, utils.MakeAppender[int16, int16, *array.Int16Builder](cc.Int16())) case typeName == typeInt32: acceptors = append(acceptors, new(int32)) - appenders = append(appenders, makeAppender[int32, int32, *array.Int32Builder](cc.Int32())) + appenders = append(appenders, utils.MakeAppender[int32, int32, *array.Int32Builder](cc.Int32())) case typeName == typeInt64: acceptors = append(acceptors, new(int64)) - appenders = append(appenders, makeAppender[int64, int64, *array.Int64Builder](cc.Int64())) + appenders = append(appenders, utils.MakeAppender[int64, int64, *array.Int64Builder](cc.Int64())) case typeName == typeUInt8: acceptors = append(acceptors, new(uint8)) - appenders = append(appenders, makeAppender[uint8, uint8, *array.Uint8Builder](cc.Uint8())) + appenders = append(appenders, utils.MakeAppender[uint8, uint8, *array.Uint8Builder](cc.Uint8())) case typeName == typeUInt16: acceptors = append(acceptors, new(uint16)) - appenders = append(appenders, makeAppender[uint16, uint16, *array.Uint16Builder](cc.Uint16())) + appenders = append(appenders, utils.MakeAppender[uint16, uint16, *array.Uint16Builder](cc.Uint16())) case typeName == typeUInt32: acceptors = append(acceptors, new(uint32)) - appenders = append(appenders, makeAppender[uint32, uint32, *array.Uint32Builder](cc.Uint32())) + appenders = append(appenders, utils.MakeAppender[uint32, uint32, *array.Uint32Builder](cc.Uint32())) case typeName == typeUInt64: acceptors = append(acceptors, new(uint64)) - appenders = append(appenders, makeAppender[uint64, uint64, *array.Uint64Builder](cc.Uint64())) + appenders = append(appenders, utils.MakeAppender[uint64, uint64, *array.Uint64Builder](cc.Uint64())) case typeName == typeFloat32: acceptors = append(acceptors, new(float32)) - appenders = append(appenders, makeAppender[float32, float32, *array.Float32Builder](cc.Float32())) + appenders = append(appenders, utils.MakeAppender[float32, float32, *array.Float32Builder](cc.Float32())) case typeName == typeFload64: acceptors = append(acceptors, new(float64)) - appenders = append(appenders, makeAppender[float64, float64, *array.Float64Builder](cc.Float64())) + appenders = append(appenders, utils.MakeAppender[float64, float64, *array.Float64Builder](cc.Float64())) case typeName == typeString, tm.isFixedString.MatchString(typeName): // Looks like []byte would be a better option here, but clickhouse driver prefers string acceptors = append(acceptors, new(string)) - appenders = append(appenders, makeAppender[string, []byte, *array.BinaryBuilder](cc.StringToBytes())) + appenders = append(appenders, utils.MakeAppender[string, []byte, *array.BinaryBuilder](cc.StringToBytes())) case typeName == typeDate: acceptors = append(acceptors, new(time.Time)) @@ -74,9 +74,9 @@ func addAcceptorAppenderFromSQLTypeName( switch ydbTypeID { case Ydb.Type_UTF8: appenders = append(appenders, - makeAppender[time.Time, string, *array.StringBuilder](dateToStringConverter{conv: cc.DateToString()})) + utils.MakeAppender[time.Time, string, *array.StringBuilder](dateToStringConverter{conv: cc.DateToString()})) case Ydb.Type_DATE: - appenders = append(appenders, makeAppender[time.Time, uint16, *array.Uint16Builder](cc.Date())) + appenders = append(appenders, utils.MakeAppender[time.Time, uint16, *array.Uint16Builder](cc.Date())) default: return nil, nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbType, typeName, common.ErrDataTypeNotSupported) } @@ -91,9 +91,9 @@ func addAcceptorAppenderFromSQLTypeName( switch ydbTypeID { case Ydb.Type_UTF8: appenders = append(appenders, - makeAppender[time.Time, string, *array.StringBuilder](date32ToStringConverter{conv: cc.DateToString()})) + utils.MakeAppender[time.Time, string, *array.StringBuilder](date32ToStringConverter{conv: cc.DateToString()})) case Ydb.Type_DATE: - appenders = append(appenders, makeAppender[time.Time, uint16, *array.Uint16Builder](cc.Date())) + appenders = append(appenders, utils.MakeAppender[time.Time, uint16, *array.Uint16Builder](cc.Date())) default: return nil, nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbType, typeName, common.ErrDataTypeNotSupported) } @@ -108,9 +108,9 @@ func addAcceptorAppenderFromSQLTypeName( switch ydbTypeID { case Ydb.Type_UTF8: appenders = append(appenders, - makeAppender[time.Time, string, *array.StringBuilder](dateTime64ToStringConverter{conv: cc.TimestampToString(true)})) + utils.MakeAppender[time.Time, string, *array.StringBuilder](dateTime64ToStringConverter{conv: cc.TimestampToString(true)})) case Ydb.Type_TIMESTAMP: - appenders = append(appenders, makeAppender[time.Time, uint64, *array.Uint64Builder](cc.Timestamp())) + appenders = append(appenders, utils.MakeAppender[time.Time, uint64, *array.Uint64Builder](cc.Timestamp())) default: return nil, nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbType, typeName, common.ErrDataTypeNotSupported) } @@ -125,9 +125,9 @@ func addAcceptorAppenderFromSQLTypeName( switch ydbTypeID { case Ydb.Type_UTF8: appenders = append(appenders, - makeAppender[time.Time, string, *array.StringBuilder](dateTimeToStringConverter{conv: cc.DatetimeToString()})) + utils.MakeAppender[time.Time, string, *array.StringBuilder](dateTimeToStringConverter{conv: cc.DatetimeToString()})) case Ydb.Type_DATETIME: - appenders = append(appenders, makeAppender[time.Time, uint32, *array.Uint32Builder](cc.Datetime())) + appenders = append(appenders, utils.MakeAppender[time.Time, uint32, *array.Uint32Builder](cc.Datetime())) default: return nil, nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbType, typeName, common.ErrDataTypeNotSupported) } @@ -137,44 +137,3 @@ func addAcceptorAppenderFromSQLTypeName( return acceptors, appenders, nil } - -func makeAppender[ - IN common.ValueType, - OUT common.ValueType, - AB common.ArrowBuilder[OUT], -](conv conversion.ValuePtrConverter[IN, OUT]) func(acceptor any, builder array.Builder) error { - return func(acceptor any, builder array.Builder) error { - return appendValueToArrowBuilder[IN, OUT, AB](acceptor, builder, conv) - } -} - -func appendValueToArrowBuilder[IN common.ValueType, OUT common.ValueType, AB common.ArrowBuilder[OUT]]( - acceptor any, - builder array.Builder, - conv conversion.ValuePtrConverter[IN, OUT], -) error { - cast := acceptor.(*IN) - - if cast == nil { - builder.AppendNull() - - return nil - } - - out, err := conv.Convert(cast) - if err != nil { - if errors.Is(err, common.ErrValueOutOfTypeBounds) { - // TODO: write warning to logger - builder.AppendNull() - - return nil - } - - return fmt.Errorf("convert value %v: %w", *cast, err) - } - - //nolint:forcetypeassert - builder.(AB).Append(out) - - return nil -} diff --git a/app/server/datasource/rdbms/clickhouse/acceptor_appender_nullable.go b/app/server/datasource/rdbms/clickhouse/acceptor_appender_nullable.go index a5963d5c..661a1ecb 100644 --- a/app/server/datasource/rdbms/clickhouse/acceptor_appender_nullable.go +++ b/app/server/datasource/rdbms/clickhouse/acceptor_appender_nullable.go @@ -1,7 +1,6 @@ package clickhouse import ( - "errors" "fmt" "time" @@ -9,6 +8,7 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "github.com/ydb-platform/fq-connector-go/app/server/conversion" + "github.com/ydb-platform/fq-connector-go/app/server/utils" "github.com/ydb-platform/fq-connector-go/common" ) @@ -28,41 +28,41 @@ func addAcceptorAppenderFromSQLTypeNameNullable( switch { case typeName == typeBool: acceptors = append(acceptors, new(*bool)) - appenders = append(appenders, makeAppenderNullable[bool, uint8, *array.Uint8Builder](cc.Bool())) + appenders = append(appenders, utils.MakeAppenderNullable[bool, uint8, *array.Uint8Builder](cc.Bool())) case typeName == typeInt8: acceptors = append(acceptors, new(*int8)) - appenders = append(appenders, makeAppenderNullable[int8, int8, *array.Int8Builder](cc.Int8())) + appenders = append(appenders, utils.MakeAppenderNullable[int8, int8, *array.Int8Builder](cc.Int8())) case typeName == typeInt16: acceptors = append(acceptors, new(*int16)) - appenders = append(appenders, makeAppenderNullable[int16, int16, *array.Int16Builder](cc.Int16())) + appenders = append(appenders, utils.MakeAppenderNullable[int16, int16, *array.Int16Builder](cc.Int16())) case typeName == typeInt32: acceptors = append(acceptors, new(*int32)) - appenders = append(appenders, makeAppenderNullable[int32, int32, *array.Int32Builder](cc.Int32())) + appenders = append(appenders, utils.MakeAppenderNullable[int32, int32, *array.Int32Builder](cc.Int32())) case typeName == typeInt64: acceptors = append(acceptors, new(*int64)) - appenders = append(appenders, makeAppenderNullable[int64, int64, *array.Int64Builder](cc.Int64())) + appenders = append(appenders, utils.MakeAppenderNullable[int64, int64, *array.Int64Builder](cc.Int64())) case typeName == typeUInt8: acceptors = append(acceptors, new(*uint8)) - appenders = append(appenders, makeAppenderNullable[uint8, uint8, *array.Uint8Builder](cc.Uint8())) + appenders = append(appenders, utils.MakeAppenderNullable[uint8, uint8, *array.Uint8Builder](cc.Uint8())) case typeName == typeUInt16: acceptors = append(acceptors, new(*uint16)) - appenders = append(appenders, makeAppenderNullable[uint16, uint16, *array.Uint16Builder](cc.Uint16())) + appenders = append(appenders, utils.MakeAppenderNullable[uint16, uint16, *array.Uint16Builder](cc.Uint16())) case typeName == typeUInt32: acceptors = append(acceptors, new(*uint32)) - appenders = append(appenders, makeAppenderNullable[uint32, uint32, *array.Uint32Builder](cc.Uint32())) + appenders = append(appenders, utils.MakeAppenderNullable[uint32, uint32, *array.Uint32Builder](cc.Uint32())) case typeName == typeUInt64: acceptors = append(acceptors, new(*uint64)) - appenders = append(appenders, makeAppenderNullable[uint64, uint64, *array.Uint64Builder](cc.Uint64())) + appenders = append(appenders, utils.MakeAppenderNullable[uint64, uint64, *array.Uint64Builder](cc.Uint64())) case typeName == typeFloat32: acceptors = append(acceptors, new(*float32)) - appenders = append(appenders, makeAppenderNullable[float32, float32, *array.Float32Builder](cc.Float32())) + appenders = append(appenders, utils.MakeAppenderNullable[float32, float32, *array.Float32Builder](cc.Float32())) case typeName == typeFload64: acceptors = append(acceptors, new(*float64)) - appenders = append(appenders, makeAppenderNullable[float64, float64, *array.Float64Builder](cc.Float64())) + appenders = append(appenders, utils.MakeAppenderNullable[float64, float64, *array.Float64Builder](cc.Float64())) case typeName == typeString, tm.isFixedString.MatchString(typeName): // Looks like []byte would be a better option here, but clickhouse driver prefers string acceptors = append(acceptors, new(*string)) - appenders = append(appenders, makeAppenderNullable[string, []byte, *array.BinaryBuilder](cc.StringToBytes())) + appenders = append(appenders, utils.MakeAppenderNullable[string, []byte, *array.BinaryBuilder](cc.StringToBytes())) case typeName == typeDate: acceptors = append(acceptors, new(*time.Time)) @@ -74,9 +74,9 @@ func addAcceptorAppenderFromSQLTypeNameNullable( switch ydbTypeID { case Ydb.Type_UTF8: appenders = append(appenders, - makeAppenderNullable[time.Time, string, *array.StringBuilder](dateToStringConverter{conv: cc.DateToString()})) + utils.MakeAppenderNullable[time.Time, string, *array.StringBuilder](dateToStringConverter{conv: cc.DateToString()})) case Ydb.Type_DATE: - appenders = append(appenders, makeAppenderNullable[time.Time, uint16, *array.Uint16Builder](cc.Date())) + appenders = append(appenders, utils.MakeAppenderNullable[time.Time, uint16, *array.Uint16Builder](cc.Date())) default: return nil, nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbType, typeName, common.ErrDataTypeNotSupported) } @@ -91,9 +91,9 @@ func addAcceptorAppenderFromSQLTypeNameNullable( switch ydbTypeID { case Ydb.Type_UTF8: appenders = append(appenders, - makeAppenderNullable[time.Time, string, *array.StringBuilder](date32ToStringConverter{conv: cc.DateToString()})) + utils.MakeAppenderNullable[time.Time, string, *array.StringBuilder](date32ToStringConverter{conv: cc.DateToString()})) case Ydb.Type_DATE: - appenders = append(appenders, makeAppenderNullable[time.Time, uint16, *array.Uint16Builder](cc.Date())) + appenders = append(appenders, utils.MakeAppenderNullable[time.Time, uint16, *array.Uint16Builder](cc.Date())) default: return nil, nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbType, typeName, common.ErrDataTypeNotSupported) } @@ -108,10 +108,10 @@ func addAcceptorAppenderFromSQLTypeNameNullable( switch ydbTypeID { case Ydb.Type_UTF8: appenders = append(appenders, - makeAppenderNullable[time.Time, string, *array.StringBuilder]( + utils.MakeAppenderNullable[time.Time, string, *array.StringBuilder]( dateTime64ToStringConverter{conv: cc.TimestampToString(true)})) case Ydb.Type_TIMESTAMP: - appenders = append(appenders, makeAppenderNullable[time.Time, uint64, *array.Uint64Builder](cc.Timestamp())) + appenders = append(appenders, utils.MakeAppenderNullable[time.Time, uint64, *array.Uint64Builder](cc.Timestamp())) default: return nil, nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbType, typeName, common.ErrDataTypeNotSupported) } @@ -126,9 +126,9 @@ func addAcceptorAppenderFromSQLTypeNameNullable( switch ydbTypeID { case Ydb.Type_UTF8: appenders = append(appenders, - makeAppenderNullable[time.Time, string, *array.StringBuilder](dateTimeToStringConverter{conv: cc.DatetimeToString()})) + utils.MakeAppenderNullable[time.Time, string, *array.StringBuilder](dateTimeToStringConverter{conv: cc.DatetimeToString()})) case Ydb.Type_DATETIME: - appenders = append(appenders, makeAppenderNullable[time.Time, uint32, *array.Uint32Builder](cc.Datetime())) + appenders = append(appenders, utils.MakeAppenderNullable[time.Time, uint32, *array.Uint32Builder](cc.Datetime())) default: return nil, nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbType, typeName, common.ErrDataTypeNotSupported) } @@ -138,50 +138,3 @@ func addAcceptorAppenderFromSQLTypeNameNullable( return acceptors, appenders, nil } - -func makeAppenderNullable[ - IN common.ValueType, - OUT common.ValueType, - AB common.ArrowBuilder[OUT], -](conv conversion.ValuePtrConverter[IN, OUT]) func(acceptor any, builder array.Builder) error { - return func(acceptor any, builder array.Builder) error { - return appendValueToArrowBuilderNullable[IN, OUT, AB](acceptor, builder, conv) - } -} - -func appendValueToArrowBuilderNullable[IN common.ValueType, OUT common.ValueType, AB common.ArrowBuilder[OUT]]( - acceptor any, - builder array.Builder, - conv conversion.ValuePtrConverter[IN, OUT], -) error { - cast := acceptor.(**IN) - - if *cast == nil { - builder.AppendNull() - - return nil - } - - value := *cast - - out, err := conv.Convert(value) - if err != nil { - if errors.Is(err, common.ErrValueOutOfTypeBounds) { - // TODO: write warning to logger - builder.AppendNull() - - return nil - } - - return fmt.Errorf("convert value %v: %w", value, err) - } - - //nolint:forcetypeassert - builder.(AB).Append(out) - - // Without that ClickHouse native driver would return invalid values for NULLABLE(bool) columns; - // TODO: research it. - *cast = nil - - return nil -} diff --git a/app/server/datasource/rdbms/ms_sql_server/type_mapper.go b/app/server/datasource/rdbms/ms_sql_server/type_mapper.go index daa22142..c3978c5e 100644 --- a/app/server/datasource/rdbms/ms_sql_server/type_mapper.go +++ b/app/server/datasource/rdbms/ms_sql_server/type_mapper.go @@ -1,7 +1,6 @@ package ms_sql_server import ( - "errors" "fmt" "time" @@ -12,6 +11,7 @@ import ( "github.com/ydb-platform/fq-connector-go/app/server/conversion" "github.com/ydb-platform/fq-connector-go/app/server/datasource" "github.com/ydb-platform/fq-connector-go/app/server/paging" + "github.com/ydb-platform/fq-connector-go/app/server/utils" "github.com/ydb-platform/fq-connector-go/common" ) @@ -96,25 +96,25 @@ func transformerFromSQLTypes(types []string, ydbTypes []*Ydb.Type, cc conversion switch typeName { case "BIT": acceptors = append(acceptors, new(*bool)) - appenders = append(appenders, makeAppender[bool, uint8, *array.Uint8Builder](cc.Bool())) + appenders = append(appenders, utils.MakeAppenderNullable[bool, uint8, *array.Uint8Builder](cc.Bool())) case "TINYINT": acceptors = append(acceptors, new(*int8)) - appenders = append(appenders, makeAppender[int8, int8, *array.Int8Builder](cc.Int8())) + appenders = append(appenders, utils.MakeAppenderNullable[int8, int8, *array.Int8Builder](cc.Int8())) case "SMALLINT": acceptors = append(acceptors, new(*int16)) - appenders = append(appenders, makeAppender[int16, int16, *array.Int16Builder](cc.Int16())) + appenders = append(appenders, utils.MakeAppenderNullable[int16, int16, *array.Int16Builder](cc.Int16())) case "INT": acceptors = append(acceptors, new(*int32)) - appenders = append(appenders, makeAppender[int32, int32, *array.Int32Builder](cc.Int32())) + appenders = append(appenders, utils.MakeAppenderNullable[int32, int32, *array.Int32Builder](cc.Int32())) case "BIGINT": acceptors = append(acceptors, new(*int64)) - appenders = append(appenders, makeAppender[int64, int64, *array.Int64Builder](cc.Int64())) + appenders = append(appenders, utils.MakeAppenderNullable[int64, int64, *array.Int64Builder](cc.Int64())) case "REAL": acceptors = append(acceptors, new(*float32)) - appenders = append(appenders, makeAppender[float32, float32, *array.Float32Builder](cc.Float32())) + appenders = append(appenders, utils.MakeAppenderNullable[float32, float32, *array.Float32Builder](cc.Float32())) case "FLOAT": acceptors = append(acceptors, new(*float64)) - appenders = append(appenders, makeAppender[float64, float64, *array.Float64Builder](cc.Float64())) + appenders = append(appenders, utils.MakeAppenderNullable[float64, float64, *array.Float64Builder](cc.Float64())) case "BINARY", "VARBINARY", "IMAGE": acceptors = append(acceptors, new(*[]byte)) appenders = append(appenders, func(acceptor any, builder array.Builder) error { @@ -129,7 +129,7 @@ func transformerFromSQLTypes(types []string, ydbTypes []*Ydb.Type, cc conversion }) case "CHAR", "VARCHAR", "TEXT", "NCHAR", "NVARCHAR", "NTEXT": acceptors = append(acceptors, new(*string)) - appenders = append(appenders, makeAppender[string, string, *array.StringBuilder](cc.String())) + appenders = append(appenders, utils.MakeAppenderNullable[string, string, *array.StringBuilder](cc.String())) case "DATE": acceptors = append(acceptors, new(*time.Time)) @@ -140,9 +140,9 @@ func transformerFromSQLTypes(types []string, ydbTypes []*Ydb.Type, cc conversion switch ydbTypeID { case Ydb.Type_UTF8: - appenders = append(appenders, makeAppender[time.Time, string, *array.StringBuilder](cc.DateToString())) + appenders = append(appenders, utils.MakeAppenderNullable[time.Time, string, *array.StringBuilder](cc.DateToString())) case Ydb.Type_DATE: - appenders = append(appenders, makeAppender[time.Time, uint16, *array.Uint16Builder](cc.Date())) + appenders = append(appenders, utils.MakeAppenderNullable[time.Time, uint16, *array.Uint16Builder](cc.Date())) default: return nil, fmt.Errorf( "unexpected ydb type %v for ms sql server type %v: %w", @@ -158,9 +158,9 @@ func transformerFromSQLTypes(types []string, ydbTypes []*Ydb.Type, cc conversion switch ydbTypeID { case Ydb.Type_UTF8: - appenders = append(appenders, makeAppender[time.Time, string, *array.StringBuilder](cc.DatetimeToString())) + appenders = append(appenders, utils.MakeAppenderNullable[time.Time, string, *array.StringBuilder](cc.DatetimeToString())) case Ydb.Type_DATETIME: - appenders = append(appenders, makeAppender[time.Time, uint32, *array.Uint32Builder](cc.Datetime())) + appenders = append(appenders, utils.MakeAppenderNullable[time.Time, uint32, *array.Uint32Builder](cc.Datetime())) default: return nil, fmt.Errorf( "unexpected ydb type %v for ms sql server type %v: %w", @@ -176,9 +176,10 @@ func transformerFromSQLTypes(types []string, ydbTypes []*Ydb.Type, cc conversion switch ydbTypeID { case Ydb.Type_UTF8: - appenders = append(appenders, makeAppender[time.Time, string, *array.StringBuilder](cc.TimestampToString(true))) + appenders = append(appenders, + utils.MakeAppenderNullable[time.Time, string, *array.StringBuilder](cc.TimestampToString(true))) case Ydb.Type_TIMESTAMP: - appenders = append(appenders, makeAppender[time.Time, uint64, *array.Uint64Builder](cc.Timestamp())) + appenders = append(appenders, utils.MakeAppenderNullable[time.Time, uint64, *array.Uint64Builder](cc.Timestamp())) default: return nil, fmt.Errorf( "unexpected ydb type %v for ms sql server type %v: %w", @@ -192,50 +193,4 @@ func transformerFromSQLTypes(types []string, ydbTypes []*Ydb.Type, cc conversion return paging.NewRowTransformer[any](acceptors, appenders, nil), nil } -func makeAppender[ - IN common.ValueType, - OUT common.ValueType, - AB common.ArrowBuilder[OUT], -](conv conversion.ValuePtrConverter[IN, OUT]) func(acceptor any, builder array.Builder) error { - return func(acceptor any, builder array.Builder) error { - return appendValueToArrowBuilder[IN, OUT, AB](acceptor, builder, conv) - } -} - -func appendValueToArrowBuilder[IN common.ValueType, OUT common.ValueType, AB common.ArrowBuilder[OUT]]( - acceptor any, - builder array.Builder, - conv conversion.ValuePtrConverter[IN, OUT], -) error { - cast := acceptor.(**IN) - - if *cast == nil { - builder.AppendNull() - - return nil - } - - value := *cast - - out, err := conv.Convert(value) - if err != nil { - if errors.Is(err, common.ErrValueOutOfTypeBounds) { - // TODO: write warning to logger - builder.AppendNull() - - return nil - } - - return fmt.Errorf("convert value %v: %w", value, err) - } - - //nolint:forcetypeassert - builder.(AB).Append(out) - - // it was copied from ClickHouse, not sure if it is necessary - *cast = nil - - return nil -} - func NewTypeMapper() datasource.TypeMapper { return typeMapper{} } diff --git a/app/server/datasource/rdbms/mysql/type_mapper.go b/app/server/datasource/rdbms/mysql/type_mapper.go index 637b2c72..e10e99cc 100644 --- a/app/server/datasource/rdbms/mysql/type_mapper.go +++ b/app/server/datasource/rdbms/mysql/type_mapper.go @@ -1,7 +1,6 @@ package mysql import ( - "errors" "fmt" "regexp" "strconv" @@ -16,6 +15,7 @@ import ( "github.com/ydb-platform/fq-connector-go/app/server/conversion" "github.com/ydb-platform/fq-connector-go/app/server/datasource" "github.com/ydb-platform/fq-connector-go/app/server/paging" + "github.com/ydb-platform/fq-connector-go/app/server/utils" "github.com/ydb-platform/fq-connector-go/common" ) @@ -178,10 +178,10 @@ func addAcceptorAppender( switch ydbTypeId { case Ydb.Type_UINT32: *acceptors = append(*acceptors, new(*uint32)) - *appenders = append(*appenders, makeAppender[uint32, uint32, *array.Uint32Builder](cc.Uint32())) + *appenders = append(*appenders, utils.MakeAppenderNullable[uint32, uint32, *array.Uint32Builder](cc.Uint32())) case Ydb.Type_INT32: *acceptors = append(*acceptors, new(*int32)) - *appenders = append(*appenders, makeAppender[int32, int32, *array.Int32Builder](cc.Int32())) + *appenders = append(*appenders, utils.MakeAppenderNullable[int32, int32, *array.Int32Builder](cc.Int32())) default: return fmt.Errorf("type mismatch: mysql '%d' vs ydb '%s': %w", mySQLType, ydbTypeId.String(), common.ErrDataTypeNotSupported) } @@ -189,30 +189,30 @@ func addAcceptorAppender( switch ydbTypeId { case Ydb.Type_UINT64: *acceptors = append(*acceptors, new(*uint64)) - *appenders = append(*appenders, makeAppender[uint64, uint64, *array.Uint64Builder](cc.Uint64())) + *appenders = append(*appenders, utils.MakeAppenderNullable[uint64, uint64, *array.Uint64Builder](cc.Uint64())) case Ydb.Type_INT64: *acceptors = append(*acceptors, new(*int64)) - *appenders = append(*appenders, makeAppender[int64, int64, *array.Int64Builder](cc.Int64())) + *appenders = append(*appenders, utils.MakeAppenderNullable[int64, int64, *array.Int64Builder](cc.Int64())) default: return fmt.Errorf("type mismatch: mysql '%d' vs ydb '%s': %w", mySQLType, ydbTypeId.String(), common.ErrDataTypeNotSupported) } case mysql.MYSQL_TYPE_FLOAT: *acceptors = append(*acceptors, new(*float32)) - *appenders = append(*appenders, makeAppender[float32, float32, *array.Float32Builder](cc.Float32())) + *appenders = append(*appenders, utils.MakeAppenderNullable[float32, float32, *array.Float32Builder](cc.Float32())) case mysql.MYSQL_TYPE_DOUBLE: *acceptors = append(*acceptors, new(*float64)) - *appenders = append(*appenders, makeAppender[float64, float64, *array.Float64Builder](cc.Float64())) + *appenders = append(*appenders, utils.MakeAppenderNullable[float64, float64, *array.Float64Builder](cc.Float64())) case mysql.MYSQL_TYPE_TINY: switch ydbTypeId { case Ydb.Type_BOOL: *acceptors = append(*acceptors, new(*bool)) - *appenders = append(*appenders, makeAppender[bool, uint8, *array.Uint8Builder](cc.Bool())) + *appenders = append(*appenders, utils.MakeAppenderNullable[bool, uint8, *array.Uint8Builder](cc.Bool())) case Ydb.Type_UINT8: *acceptors = append(*acceptors, new(*uint8)) - *appenders = append(*appenders, makeAppender[uint8, uint8, *array.Uint8Builder](cc.Uint8())) + *appenders = append(*appenders, utils.MakeAppenderNullable[uint8, uint8, *array.Uint8Builder](cc.Uint8())) case Ydb.Type_INT8: *acceptors = append(*acceptors, new(*int8)) - *appenders = append(*appenders, makeAppender[int8, int8, *array.Int8Builder](cc.Int8())) + *appenders = append(*appenders, utils.MakeAppenderNullable[int8, int8, *array.Int8Builder](cc.Int8())) default: return fmt.Errorf("type mismatch: mysql '%d' vs ydb '%s': %w", mySQLType, ydbTypeId.String(), common.ErrDataTypeNotSupported) } @@ -220,24 +220,24 @@ func addAcceptorAppender( switch ydbTypeId { case Ydb.Type_UINT16: *acceptors = append(*acceptors, new(*uint16)) - *appenders = append(*appenders, makeAppender[uint16, uint16, *array.Uint16Builder](cc.Uint16())) + *appenders = append(*appenders, utils.MakeAppenderNullable[uint16, uint16, *array.Uint16Builder](cc.Uint16())) case Ydb.Type_INT16: *acceptors = append(*acceptors, new(*int16)) - *appenders = append(*appenders, makeAppender[int16, int16, *array.Int16Builder](cc.Int16())) + *appenders = append(*appenders, utils.MakeAppenderNullable[int16, int16, *array.Int16Builder](cc.Int16())) default: return fmt.Errorf("type mismatch: mysql '%d' vs ydb '%s': %w", mySQLType, ydbTypeId.String(), common.ErrDataTypeNotSupported) } case mysql.MYSQL_TYPE_LONG_BLOB, mysql.MYSQL_TYPE_BLOB, mysql.MYSQL_TYPE_MEDIUM_BLOB, mysql.MYSQL_TYPE_TINY_BLOB: *acceptors = append(*acceptors, new(*[]byte)) - *appenders = append(*appenders, makeAppender[[]byte, []byte, *array.BinaryBuilder](cc.Bytes())) + *appenders = append(*appenders, utils.MakeAppenderNullable[[]byte, []byte, *array.BinaryBuilder](cc.Bytes())) case mysql.MYSQL_TYPE_VARCHAR, mysql.MYSQL_TYPE_STRING, mysql.MYSQL_TYPE_VAR_STRING: *acceptors = append(*acceptors, new(*string)) switch ydbTypeId { case Ydb.Type_UTF8: - *appenders = append(*appenders, makeAppender[string, string, *array.StringBuilder](cc.String())) + *appenders = append(*appenders, utils.MakeAppenderNullable[string, string, *array.StringBuilder](cc.String())) case Ydb.Type_STRING: - *appenders = append(*appenders, makeAppender[string, []byte, *array.BinaryBuilder](cc.StringToBytes())) + *appenders = append(*appenders, utils.MakeAppenderNullable[string, []byte, *array.BinaryBuilder](cc.StringToBytes())) default: return fmt.Errorf("type mismatch: mysql '%d' vs ydb '%s': %w", mySQLType, ydbTypeId.String(), common.ErrDataTypeNotSupported) } @@ -249,13 +249,13 @@ func addAcceptorAppender( *appenders = append(*appenders, func(acceptor any, builder array.Builder) error { cast := acceptor.(**time.Time) - return appendNullableToArrowBuilder[time.Time, string, *array.StringBuilder](cast, builder, cc.DateToString()) + return utils.MakeAppenderNullable[time.Time, string, *array.StringBuilder](cc.DateToString())(cast, builder) }) case Ydb.Type_DATE: *appenders = append(*appenders, func(acceptor any, builder array.Builder) error { cast := acceptor.(**time.Time) - return appendNullableToArrowBuilder[time.Time, uint16, *array.Uint16Builder](cast, builder, cc.Date()) + return utils.MakeAppenderNullable[time.Time, uint16, *array.Uint16Builder](cc.Date())(cast, builder) }) default: return fmt.Errorf("type mismatch: mysql '%d' vs ydb '%s': %w", mySQLType, ydbTypeId.String(), common.ErrDataTypeNotSupported) @@ -274,73 +274,23 @@ func addAcceptorAppender( // therefore, we can append 'Z' suffix only to TIMESTAMP column (not DATETIME) utc := mySQLType == mysql.MYSQL_TYPE_TIMESTAMP || mySQLType == mysql.MYSQL_TYPE_TIMESTAMP2 - return appendNullableToArrowBuilder[ - time.Time, - string, - *array.StringBuilder](cast, builder, cc.TimestampToString(utc)) + return utils.MakeAppenderNullable[time.Time, string, *array.StringBuilder](cc.TimestampToString(utc))(cast, builder) }) case Ydb.Type_TIMESTAMP: *appenders = append(*appenders, func(acceptor any, builder array.Builder) error { cast := acceptor.(**time.Time) - return appendNullableToArrowBuilder[time.Time, uint64, *array.Uint64Builder]( - cast, builder, cc.Timestamp()) + return utils.MakeAppenderNullable[time.Time, uint64, *array.Uint64Builder](cc.Timestamp())(cast, builder) }) default: return fmt.Errorf("type mismatch: mysql '%d' vs ydb '%s': %w", mySQLType, ydbTypeId.String(), common.ErrDataTypeNotSupported) } case mysql.MYSQL_TYPE_JSON: *acceptors = append(*acceptors, new(*string)) - *appenders = append(*appenders, makeAppender[string, string, *array.StringBuilder](cc.String())) + *appenders = append(*appenders, utils.MakeAppenderNullable[string, string, *array.StringBuilder](cc.String())) default: return fmt.Errorf("unexpected mysql type '%d': %w", mySQLType, common.ErrDataTypeNotSupported) } return nil } - -func makeAppender[ - IN common.ValueType, - OUT common.ValueType, - AB common.ArrowBuilder[OUT], -](conv conversion.ValuePtrConverter[IN, OUT]) func(acceptor any, builder array.Builder) error { - return func(acceptor any, builder array.Builder) error { - return appendNullableToArrowBuilder[IN, OUT, AB](acceptor, builder, conv) - } -} - -func appendNullableToArrowBuilder[IN common.ValueType, OUT common.ValueType, AB common.ArrowBuilder[OUT]]( - acceptor any, - builder array.Builder, - conv conversion.ValuePtrConverter[IN, OUT], -) error { - cast := acceptor.(**IN) - - if *cast == nil { - builder.AppendNull() - - return nil - } - - value := *cast - - out, err := conv.Convert(value) - if err != nil { - if errors.Is(err, common.ErrValueOutOfTypeBounds) { - builder.AppendNull() - - return nil - } - - return fmt.Errorf("convert value %v: %w", value, err) - } - - b, ok := builder.(AB) - if !ok { - return common.ErrDataTypeNotSupported - } - - b.Append(out) - - return nil -} diff --git a/app/server/datasource/rdbms/oracle/type_mapper.go b/app/server/datasource/rdbms/oracle/type_mapper.go index 283f0149..eeb6a9d8 100644 --- a/app/server/datasource/rdbms/oracle/type_mapper.go +++ b/app/server/datasource/rdbms/oracle/type_mapper.go @@ -1,7 +1,6 @@ package oracle import ( - "errors" "fmt" "regexp" "time" @@ -13,6 +12,7 @@ import ( "github.com/ydb-platform/fq-connector-go/app/server/conversion" "github.com/ydb-platform/fq-connector-go/app/server/datasource" "github.com/ydb-platform/fq-connector-go/app/server/paging" + "github.com/ydb-platform/fq-connector-go/app/server/utils" "github.com/ydb-platform/fq-connector-go/common" ) @@ -104,13 +104,13 @@ func transformerFromSQLTypes(types []string, ydbTypes []*Ydb.Type, cc conversion switch typeName { case "NUMBER": acceptors = append(acceptors, new(*int64)) - appenders = append(appenders, makeAppender[int64, int64, *array.Int64Builder](cc.Int64())) + appenders = append(appenders, utils.MakeAppenderNullable[int64, int64, *array.Int64Builder](cc.Int64())) case "NCHAR", "CHAR", "LongVarChar", "LONG", "ROWID", "UROWID": acceptors = append(acceptors, new(*string)) - appenders = append(appenders, makeAppender[string, string, *array.StringBuilder](cc.String())) + appenders = append(appenders, utils.MakeAppenderNullable[string, string, *array.StringBuilder](cc.String())) case "RAW", "LongRaw": acceptors = append(acceptors, new(*[]byte)) - appenders = append(appenders, makeAppender[[]byte, []byte, *array.BinaryBuilder](cc.Bytes())) + appenders = append(appenders, utils.MakeAppenderNullable[[]byte, []byte, *array.BinaryBuilder](cc.Bytes())) case "OCIBlobLocator": ydbType := ydbTypes[i] @@ -122,9 +122,9 @@ func transformerFromSQLTypes(types []string, ydbTypes []*Ydb.Type, cc conversion acceptors = append(acceptors, new(*[]byte)) if ydbTypeID == Ydb.Type_JSON { - appenders = append(appenders, makeAppender[[]byte, string, *array.StringBuilder](cc.BytesToString())) + appenders = append(appenders, utils.MakeAppenderNullable[[]byte, string, *array.StringBuilder](cc.BytesToString())) } else { - appenders = append(appenders, makeAppender[[]byte, []byte, *array.BinaryBuilder](cc.Bytes())) + appenders = append(appenders, utils.MakeAppenderNullable[[]byte, []byte, *array.BinaryBuilder](cc.Bytes())) } // YQ-3498: go-ora driver has a bug when reading BINARY_FLOAT -1.1, gives -1.2 @@ -134,7 +134,7 @@ func transformerFromSQLTypes(types []string, ydbTypes []*Ydb.Type, cc conversion // appenders = append(appenders, makeAppender[float32, float32, *array.Float32Builder](cc.Float32())) case "IBDouble": acceptors = append(acceptors, new(*float64)) - appenders = append(appenders, makeAppender[float64, float64, *array.Float64Builder](cc.Float64())) + appenders = append(appenders, utils.MakeAppenderNullable[float64, float64, *array.Float64Builder](cc.Float64())) case "DATE": // Oracle Date value range is much more wide than YDB's Datetime value range ydbType := ydbTypes[i] @@ -149,9 +149,9 @@ func transformerFromSQLTypes(types []string, ydbTypes []*Ydb.Type, cc conversion switch ydbTypeID { case Ydb.Type_UTF8: appenders = append(appenders, - makeAppender[time.Time, string, *array.StringBuilder](cc.DatetimeToString())) + utils.MakeAppenderNullable[time.Time, string, *array.StringBuilder](cc.DatetimeToString())) case Ydb.Type_DATETIME: - appenders = append(appenders, makeAppender[time.Time, uint32, *array.Uint32Builder](cc.Datetime())) + appenders = append(appenders, utils.MakeAppenderNullable[time.Time, uint32, *array.Uint32Builder](cc.Datetime())) default: return nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbType, typeName, common.ErrDataTypeNotSupported) } @@ -169,9 +169,9 @@ func transformerFromSQLTypes(types []string, ydbTypes []*Ydb.Type, cc conversion switch ydbTypeID { case Ydb.Type_UTF8: appenders = append(appenders, - makeAppender[time.Time, string, *array.StringBuilder](cc.TimestampToString(true))) + utils.MakeAppenderNullable[time.Time, string, *array.StringBuilder](cc.TimestampToString(true))) case Ydb.Type_TIMESTAMP: - appenders = append(appenders, makeAppender[time.Time, uint64, *array.Uint64Builder](cc.Timestamp())) + appenders = append(appenders, utils.MakeAppenderNullable[time.Time, uint64, *array.Uint64Builder](cc.Timestamp())) default: return nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbType, typeName, common.ErrDataTypeNotSupported) } @@ -183,49 +183,6 @@ func transformerFromSQLTypes(types []string, ydbTypes []*Ydb.Type, cc conversion return paging.NewRowTransformer[any](acceptors, appenders, nil), nil } -func makeAppender[ - IN common.ValueType, - OUT common.ValueType, - AB common.ArrowBuilder[OUT], -](conv conversion.ValuePtrConverter[IN, OUT]) func(acceptor any, builder array.Builder) error { - return func(acceptor any, builder array.Builder) error { - return appendValueToArrowBuilder[IN, OUT, AB](acceptor, builder, conv) - } -} - -func appendValueToArrowBuilder[IN common.ValueType, OUT common.ValueType, AB common.ArrowBuilder[OUT]]( - acceptor any, - builder array.Builder, - conv conversion.ValuePtrConverter[IN, OUT], -) error { - cast := acceptor.(**IN) - - if *cast == nil { - builder.AppendNull() - - return nil - } - - value := *cast - - out, err := conv.Convert(value) - if err != nil { - if errors.Is(err, common.ErrValueOutOfTypeBounds) { - // TODO: write warning to logger - builder.AppendNull() - - return nil - } - - return fmt.Errorf("convert value %v: %w", value, err) - } - - //nolint:forcetypeassert - builder.(AB).Append(out) - - return nil -} - func NewTypeMapper() datasource.TypeMapper { return typeMapper{ isTimestamp: regexp.MustCompile(`TIMESTAMP\((.+)\)$`), diff --git a/app/server/datasource/rdbms/postgresql/type_mapper.go b/app/server/datasource/rdbms/postgresql/type_mapper.go index e6b7123c..4229b375 100644 --- a/app/server/datasource/rdbms/postgresql/type_mapper.go +++ b/app/server/datasource/rdbms/postgresql/type_mapper.go @@ -1,7 +1,6 @@ package postgresql import ( - "errors" "fmt" "time" @@ -14,6 +13,7 @@ import ( "github.com/ydb-platform/fq-connector-go/app/server/conversion" "github.com/ydb-platform/fq-connector-go/app/server/datasource" "github.com/ydb-platform/fq-connector-go/app/server/paging" + "github.com/ydb-platform/fq-connector-go/app/server/utils" "github.com/ydb-platform/fq-connector-go/common" ) @@ -244,23 +244,7 @@ func appendValuePtrToArrowBuilder[ return nil } - cast := value.(*IN) - - out, err := conv.Convert(cast) - if err != nil { - if errors.Is(err, common.ErrValueOutOfTypeBounds) { - // TODO: logger ? - builder.AppendNull() - - return nil - } - - return fmt.Errorf("convert value: %w", err) - } - - builder.(AB).Append(out) - - return nil + return utils.AppendValueToArrowBuilder[IN, OUT, AB](value, builder, conv) } func NewTypeMapper() datasource.TypeMapper { return typeMapper{} } diff --git a/app/server/datasource/rdbms/ydb/type_mapper.go b/app/server/datasource/rdbms/ydb/type_mapper.go index 6846596f..38694e7a 100644 --- a/app/server/datasource/rdbms/ydb/type_mapper.go +++ b/app/server/datasource/rdbms/ydb/type_mapper.go @@ -1,7 +1,6 @@ package ydb import ( - "errors" "fmt" "regexp" "time" @@ -14,6 +13,7 @@ import ( "github.com/ydb-platform/fq-connector-go/app/server/conversion" "github.com/ydb-platform/fq-connector-go/app/server/datasource" "github.com/ydb-platform/fq-connector-go/app/server/paging" + "github.com/ydb-platform/fq-connector-go/app/server/utils" "github.com/ydb-platform/fq-connector-go/common" ) @@ -152,56 +152,6 @@ func makePrimitiveTypeFromString(typeName string) (*Ydb.Type, error) { } } -func appendToBuilderSinglePtr[ - IN common.ValueType, - OUT common.ValueType, - AB common.ArrowBuilder[OUT], -]( - conv conversion.ValuePtrConverter[IN, OUT], -) func(acceptor any, builder array.Builder) error { - return func(acceptor any, builder array.Builder) error { - ptr := acceptor.(*IN) - - out, err := conv.Convert(ptr) - if err != nil { - if errors.Is(err, common.ErrValueOutOfTypeBounds) { - // TODO: write warning to logger - builder.AppendNull() - - return nil - } - - return fmt.Errorf("convert value %v: %w", ptr, err) - } - - //nolint:forcetypeassert - builder.(AB).Append(out) - - return nil - } -} - -func appendToBuilderDoublePtr[ - IN common.ValueType, - OUT common.ValueType, - AB common.ArrowBuilder[OUT], -]( - conv conversion.ValuePtrConverter[IN, OUT], -) func(acceptor any, builder array.Builder) error { - return func(acceptor any, builder array.Builder) error { - doublePtr := acceptor.(**IN) - - ptr := *doublePtr - if ptr == nil { - builder.AppendNull() - - return nil - } - - return appendToBuilderSinglePtr[IN, OUT, AB](conv)(ptr, builder) - } -} - func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type, cc conversion.Collection) (paging.RowTransformer[any], error) { acceptors := make([]any, 0, len(typeNames)) appenders := make([]func(acceptor any, builder array.Builder) error, 0, len(typeNames)) @@ -302,10 +252,10 @@ func makeAcceptorAppenderCheckOptional[ AB common.ArrowBuilder[OUT], ](optional bool, conv conversion.ValuePtrConverter[IN, OUT]) (any, func(acceptor any, builder array.Builder) error, error) { if optional { - return new(*IN), appendToBuilderDoublePtr[IN, OUT, AB](conv), nil + return new(*IN), utils.MakeAppenderNullable[IN, OUT, AB](conv), nil } - return new(IN), appendToBuilderSinglePtr[IN, OUT, AB](conv), nil + return new(IN), utils.MakeAppender[IN, OUT, AB](conv), nil } func NewTypeMapper() datasource.TypeMapper { diff --git a/app/server/utils/arrow_builder.go b/app/server/utils/arrow_builder.go new file mode 100644 index 00000000..32b47979 --- /dev/null +++ b/app/server/utils/arrow_builder.go @@ -0,0 +1,99 @@ +package utils + +import ( + "errors" + "fmt" + + "github.com/apache/arrow/go/v13/arrow/array" + + "github.com/ydb-platform/fq-connector-go/app/server/conversion" + "github.com/ydb-platform/fq-connector-go/common" +) + +func MakeAppender[ + IN common.ValueType, + OUT common.ValueType, + AB common.ArrowBuilder[OUT], +](conv conversion.ValuePtrConverter[IN, OUT]) func(acceptor any, builder array.Builder) error { + return func(acceptor any, builder array.Builder) error { + return AppendValueToArrowBuilder[IN, OUT, AB](acceptor, builder, conv) + } +} + +func AppendValueToArrowBuilder[IN common.ValueType, OUT common.ValueType, AB common.ArrowBuilder[OUT]]( + acceptor any, + builder array.Builder, + conv conversion.ValuePtrConverter[IN, OUT], +) error { + cast := acceptor.(*IN) + + if cast == nil { + builder.AppendNull() + + return nil + } + + out, err := conv.Convert(cast) + if err != nil { + if errors.Is(err, common.ErrValueOutOfTypeBounds) { + // TODO: write warning to logger + builder.AppendNull() + + return nil + } + + return fmt.Errorf("convert value %v: %w", *cast, err) + } + + //nolint:forcetypeassert + builder.(AB).Append(out) + + return nil +} + +func MakeAppenderNullable[ + IN common.ValueType, + OUT common.ValueType, + AB common.ArrowBuilder[OUT], +](conv conversion.ValuePtrConverter[IN, OUT]) func(acceptor any, builder array.Builder) error { + return func(acceptor any, builder array.Builder) error { + return AppendValueToArrowBuilderNullable[IN, OUT, AB](acceptor, builder, conv) + } +} + +func AppendValueToArrowBuilderNullable[IN common.ValueType, OUT common.ValueType, AB common.ArrowBuilder[OUT]]( + acceptor any, + builder array.Builder, + conv conversion.ValuePtrConverter[IN, OUT], +) error { + cast := acceptor.(**IN) + + if *cast == nil { + builder.AppendNull() + + return nil + } + + value := *cast + + out, err := conv.Convert(value) + if err != nil { + if errors.Is(err, common.ErrValueOutOfTypeBounds) { + // TODO: write warning to logger + builder.AppendNull() + + return nil + } + + return fmt.Errorf("convert value %v: %w", value, err) + } + + //nolint:forcetypeassert + builder.(AB).Append(out) + + // Without that ClickHouse native driver would return invalid values for NULLABLE(bool) columns; + // TODO: research it. + *cast = nil + + return nil +}