Skip to content

Commit

Permalink
MongoDB: DescribeTable typemapping + datasource config fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ninaiad committed Jan 31, 2025
1 parent f90640b commit 0c511fe
Show file tree
Hide file tree
Showing 19 changed files with 319 additions and 299 deletions.
2 changes: 1 addition & 1 deletion app/client/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func callServer(
case api_common.EGenericDataSourceKind_CLICKHOUSE, api_common.EGenericDataSourceKind_POSTGRESQL,
api_common.EGenericDataSourceKind_YDB, api_common.EGenericDataSourceKind_MS_SQL_SERVER,
api_common.EGenericDataSourceKind_MYSQL, api_common.EGenericDataSourceKind_GREENPLUM,
api_common.EGenericDataSourceKind_ORACLE, api_common.EGenericDataSourceKind_LOGGING, api_common.EGenericDataSourceKind_MONGODB:
api_common.EGenericDataSourceKind_ORACLE, api_common.EGenericDataSourceKind_LOGGING, api_common.EGenericDataSourceKind_MONGO_DB:
typeMappingSettings := &api_service_protos.TTypeMappingSettings{
DateTimeFormat: dateTimeFormat,
}
Expand Down
352 changes: 182 additions & 170 deletions app/config/server.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions app/config/server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ message TMongoDbConfig {
// Timeout for MongoDB connection pinging.
// Valid values should satisfy `time.ParseDuration` (e. g. '5s', '100ms', '3h').
string ping_connection_timeout = 2;
// Number of documents to process in DescribeTable method to deduct table schema
uint32 count_docs_to_read = 3;

TExponentialBackoffConfig exponential_backoff = 10;
}
Expand Down
1 change: 1 addition & 0 deletions app/server/config/config.debug.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ datasources:

mongodb:
<<: *data_source_default_var
count_docs_to_read: 5

ydb:
<<: *data_source_default_var
Expand Down
2 changes: 2 additions & 0 deletions app/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,15 @@ func fillServerConfigDefaults(c *config.TServerConfig) {

if c.Datasources.Oracle.Pushdown == nil {
c.Datasources.Oracle.Pushdown = makeDefaultPushdownConfig()
}

// MongoDB

if c.Datasources.Mongodb == nil {
c.Datasources.Mongodb = &config.TMongoDbConfig{
OpenConnectionTimeout: "5s",
PingConnectionTimeout: "5s",
CountDocsToRead: 5,
}
}

Expand Down
1 change: 1 addition & 0 deletions app/server/config/config.prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ datasources:

mongodb:
<<: *data_source_default_var
count_docs_to_read: 5

ydb:
<<: *data_source_default_var
Expand Down
4 changes: 2 additions & 2 deletions app/server/data_source_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (dsc *DataSourceCollection) DescribeTable(
ds := s3.NewDataSource()

return ds.DescribeTable(ctx, logger, request)
case api_common.EGenericDataSourceKind_MONGODB:
case api_common.EGenericDataSourceKind_MONGO_DB:
mongoDbCfg := dsc.cfg.Datasources.Mongodb
ds := mongodb.NewDataSource(&retry.RetrierSet{
MakeConnection: retry.NewRetrierFromConfig(mongoDbCfg.ExponentialBackoff, retry.ErrorCheckerMakeConnectionCommon),
Expand Down Expand Up @@ -85,7 +85,7 @@ func (dsc *DataSourceCollection) DoReadSplit(
ds := s3.NewDataSource()

return readSplit[string](logger, stream, request, split, ds, dsc.memoryAllocator, dsc.readLimiterFactory, dsc.cfg)
case api_common.EGenericDataSourceKind_MONGODB:
case api_common.EGenericDataSourceKind_MONGO_DB:
mongoDbCfg := dsc.cfg.Datasources.Mongodb
ds := mongodb.NewDataSource(&retry.RetrierSet{
MakeConnection: retry.NewRetrierFromConfig(mongoDbCfg.ExponentialBackoff, retry.ErrorCheckerMakeConnectionCommon),
Expand Down
27 changes: 11 additions & 16 deletions app/server/datasource/mongodb/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
Expand All @@ -21,13 +20,6 @@ import (

var _ datasource.DataSource[string] = (*dataSource)(nil)

func getUnparsedDocSchema(tableName string) *api_service_protos.TSchema {
return &api_service_protos.TSchema{Columns: []*Ydb.Column{
{Name: idColumn, Type: common.MakePrimitiveType(Ydb.Type_STRING)},
{Name: tableName, Type: common.MakePrimitiveType(Ydb.Type_JSON)},
}}
}

type dataSource struct {
retrierSet *retry.RetrierSet
cfg *config.TMongoDbConfig
Expand Down Expand Up @@ -101,20 +93,20 @@ func (ds *dataSource) DescribeTable(
}
}()

if !mongoDbOptions.DoParse {
return &api_service_protos.TDescribeTableResponse{Schema: getUnparsedDocSchema(request.Table)}, nil
}

collection := conn.Database(dsi.Database).Collection(request.Table)

cursor, err := collection.Find(ctx, bson.D{}, options.Find().SetLimit(int64(mongoDbOptions.CountDocsToRead)))
cursor, err := collection.Find(ctx, bson.D{}, options.Find().SetLimit(int64(ds.cfg.GetCountDocsToRead())))
if err != nil {
return nil, fmt.Errorf("colection.Find: %w", err)
}

defer cursor.Close(ctx)
defer func() {
if err = cursor.Close(ctx); err != nil {
logger.Fatal(fmt.Sprintf("cursor.Close: %v", err))
}
}()

docs := make([]bson.Raw, 0, mongoDbOptions.CountDocsToRead)
docs := make([]bson.Raw, 0, ds.cfg.GetCountDocsToRead())
for cursor.Next(ctx) {
docs = append(docs, cursor.Current)
}
Expand All @@ -123,7 +115,10 @@ func (ds *dataSource) DescribeTable(
return nil, fmt.Errorf("cursor.Err(): %w", err)
}

columns, err := bsonToYqlColumn(docs, mongoDbOptions.SkipUnsupportedTypes, logger)
omitUnsupported :=
mongoDbOptions.UnsupportedTypeDisplayMode == api_common.TMongoDbDataSourceOptions_UNSUPPORTED_OMIT

columns, err := bsonToYql(logger, docs, omitUnsupported)
if err != nil {
return nil, fmt.Errorf("bsonToYqlColumn: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion app/server/datasource/mongodb/proposal.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,5 +311,5 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name (
|Object|Json / Struct|STRUCT|
|Array|List\<T\>|LIST|
|Decimal128 |Decimal|DECIMAL128|
|ObjectId (12 bytes)|String|BINARY|
|ObjectId (12 bytes)|Tagged("ObjectId", String)|BINARY|
|Date (int64, milliseconds since epoch)|Interval|DATE64|
150 changes: 89 additions & 61 deletions app/server/datasource/mongodb/type_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@ var errEmptyArray = errors.New("can't determine field type for items in an empty
var errNull = errors.New("can't determine field type for null")

const idColumn string = "_id"
const objectIdTag string = "ObjectId"

func typeMap(v bson.RawValue, logger *zap.Logger) (*Ydb.Type, error) {
func typesEqual(lhs, rhs *Ydb.Type) bool {
return lhs.String() == rhs.String()
}

func typeMap(logger *zap.Logger, v bson.RawValue, omitUnsupported bool) (*Ydb.Type, error) {

Check failure on line 24 in app/server/datasource/mongodb/type_mapping.go

View workflow job for this annotation

GitHub Actions / lint

cyclomatic complexity 20 of func `typeMap` is high (> 15) (gocyclo)
switch v.Type {
case bson.TypeInt32:
return common.MakePrimitiveType(Ydb.Type_INT32), nil
Expand All @@ -28,26 +33,45 @@ func typeMap(v bson.RawValue, logger *zap.Logger) (*Ydb.Type, error) {
return common.MakePrimitiveType(Ydb.Type_DOUBLE), nil
case bson.TypeString:
return common.MakePrimitiveType(Ydb.Type_UTF8), nil
case bson.TypeBinary, bson.TypeObjectID:
case bson.TypeBinary:
return common.MakePrimitiveType(Ydb.Type_STRING), nil
case bson.TypeObjectID:
return common.MakeTaggedType(objectIdTag, common.MakePrimitiveType(Ydb.Type_STRING)), nil
case bson.TypeDateTime:
return common.MakePrimitiveType(Ydb.Type_INTERVAL), nil
case bson.TypeArray:
elements, err := v.Array().Elements()
if err != nil {
return nil, err
return nil, fmt.Errorf("v.Array().Elements: %w", err)
}

if len(elements) > 0 {
innerType, err := typeMap(elements[0].Value(), logger)
var innerType *Ydb.Type

for _, elem := range elements {
newInnerType, err := typeMap(logger, elem.Value(), omitUnsupported)
if errors.Is(err, common.ErrDataTypeNotSupported) && !omitUnsupported {

Check warning on line 52 in app/server/datasource/mongodb/type_mapping.go

View workflow job for this annotation

GitHub Actions / lint

optimize-operands-order: for better performance 'errors.Is(err, common.ErrDataTypeNotSupported) && !omitUnsupported' might be rewritten as '!omitUnsupported && errors.Is(err, common.ErrDataTypeNotSupported)' (revive)
return common.MakeListType(common.MakePrimitiveType(Ydb.Type_UTF8)), nil
}

if err != nil {
return nil, err
return nil, fmt.Errorf("typeMap inner value for array: %w", err)
}

return common.MakeListType(innerType), nil
if innerType == nil {
innerType = newInnerType
continue
}

if !typesEqual(newInnerType, innerType) {
return common.MakeListType(common.MakePrimitiveType(Ydb.Type_UTF8)), nil
}
}

return nil, errEmptyArray
if innerType == nil {
return nil, errEmptyArray
}

return common.MakeListType(innerType), nil

case bson.TypeEmbeddedDocument:
return common.MakePrimitiveType(Ydb.Type_JSON), nil
Expand All @@ -60,75 +84,71 @@ func typeMap(v bson.RawValue, logger *zap.Logger) (*Ydb.Type, error) {
return nil, common.ErrDataTypeNotSupported
}

func bsonToYqlColumnSingleDoc(
doc bson.Raw,
func bsonToYqlColumn(
logger *zap.Logger,
elem bson.RawElement,
deducedTypes map[string]*Ydb.Type,
ambiguousFields, ambiguousArrayFields map[string]struct{},
doSkipUnsupported bool,
logger *zap.Logger,
omitUnsupported bool,
) error {
elements, err := doc.Elements()
key, err := elem.KeyErr()
if err != nil {
return fmt.Errorf("doc.Elements(): %w", err)
return fmt.Errorf("elem.KeyErr: %w", err)
}

for _, elem := range elements {
key, err := elem.KeyErr()
if err != nil {
return fmt.Errorf("elem.KeyErr(): %w", err)
}
prevType, prevTypeExists := deducedTypes[key]

prevType, prevTypeExists := deducedTypes[key]

t, err := typeMap(elem.Value(), logger)
if err != nil {
if errors.Is(err, errNull) {
ambiguousFields[key] = struct{}{}

continue
} else if errors.Is(err, errEmptyArray) {
ambiguousArrayFields[key] = struct{}{}
t, err := typeMap(logger, elem.Value(), omitUnsupported)
if err != nil {
if errors.Is(err, errNull) {
ambiguousFields[key] = struct{}{}

if prevTypeExists && prevType.GetListType() == nil {
deducedTypes[key] = common.MakePrimitiveType(Ydb.Type_UTF8)
return nil
} else if errors.Is(err, errEmptyArray) {
ambiguousArrayFields[key] = struct{}{}

logger.Debug(fmt.Sprintf("bsonToYqlColumnSingleDoc: keeping serialized %v. prev: %v curr: []", key, prevType.String()))
}
if prevTypeExists && prevType.GetListType() == nil {
deducedTypes[key] = common.MakePrimitiveType(Ydb.Type_UTF8)

continue
} else if errors.Is(err, common.ErrDataTypeNotSupported) {
logger.Debug(fmt.Sprintf("bsonToYqlColumnSingleDoc: data not supported: %v", key))
logger.Debug(fmt.Sprintf("bsonToYqlColumn: keeping serialized %v. prev: %v curr: []", key, prevType.String()))
}

if !doSkipUnsupported {
deducedTypes[key] = common.MakePrimitiveType(Ydb.Type_UTF8)
}
return nil
} else if errors.Is(err, common.ErrDataTypeNotSupported) {
logger.Debug(fmt.Sprintf("bsonToYqlColumn: data not supported: %v", key))

continue
if !omitUnsupported {
deducedTypes[key] = common.MakePrimitiveType(Ydb.Type_UTF8)
}

return err
return nil
}

tString := t.String()
_, prevIsArray := ambiguousArrayFields[key]

if (prevTypeExists && prevType.String() != tString) || (prevIsArray && t.GetListType() == nil) {
deducedTypes[key] = common.MakePrimitiveType(Ydb.Type_UTF8)
return err
}

logger.Debug(fmt.Sprintf("bsonToYqlColumnSingleDoc: keeping serialized %v. prev: %v curr: %v", key, prevType.String(), tString))
tString := t.String()
_, prevIsArray := ambiguousArrayFields[key]

continue
}
// Leaving fields that have inconsistent types serialized
// Extra check for arrays because we might have encountered an empty one:
// we know it is an array, but prevType is not determined yet
if (prevTypeExists && !typesEqual(prevType, t)) || (prevIsArray && t.GetListType() == nil) {
deducedTypes[key] = common.MakePrimitiveType(Ydb.Type_UTF8)

deducedTypes[key] = t
logger.Debug(fmt.Sprintf("bsonToYqlColumn: keeping serialized %v. prev: %v curr: %v", key, prevType.String(), tString))

logger.Debug(fmt.Sprintf("bsonToYqlColumnSingleDoc: column %v of type %v", key, tString))
return nil
}

deducedTypes[key] = t

logger.Debug(fmt.Sprintf("bsonToYqlColumn: column %v of type %v", key, tString))

return nil
}

func bsonToYqlColumn(docs []bson.Raw, doSkipUnsupported bool, logger *zap.Logger) ([]*Ydb.Column, error) {
func bsonToYql(logger *zap.Logger, docs []bson.Raw, omitUnsupported bool) ([]*Ydb.Column, error) {
if len(docs) == 0 {
return []*Ydb.Column{}, nil
}
Expand All @@ -138,16 +158,24 @@ func bsonToYqlColumn(docs []bson.Raw, doSkipUnsupported bool, logger *zap.Logger
ambiguousArrayFields := make(map[string]struct{})

for _, doc := range docs {
err := bsonToYqlColumnSingleDoc(
doc,
deducedTypes,
ambiguousFields,
ambiguousArrayFields,
doSkipUnsupported,
logger,
)
elements, err := doc.Elements()
if err != nil {
return nil, fmt.Errorf("bsonToYqlColumn(): %w", err)
return nil, fmt.Errorf("doc.Elements: %w", err)
}

for _, elem := range elements {
err := bsonToYqlColumn(
logger,
elem,
deducedTypes,
ambiguousFields,
ambiguousArrayFields,
omitUnsupported,
)

if err != nil {
return nil, fmt.Errorf("bsonToYqlColumn: %w", err)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion app/server/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func validateDataSourceOptions(dsi *api_common.TGenericDataSourceInstance) error
api_common.EGenericDataSourceKind_S3,
api_common.EGenericDataSourceKind_YDB,
api_common.EGenericDataSourceKind_MYSQL,
api_common.EGenericDataSourceKind_MONGODB:
api_common.EGenericDataSourceKind_MONGO_DB:
default:
return fmt.Errorf("unsupported data source %s: %w", dsi.GetKind().String(), common.ErrInvalidRequest)
}
Expand Down
2 changes: 1 addition & 1 deletion common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func NewAPIErrorFromStdError(err error, kind api_common.EGenericDataSourceKind)
apiError = newAPIErrorFromMsSQLServer(err)
case api_common.EGenericDataSourceKind_LOGGING:
apiError = newAPIErrorFromYdbError(err)
case api_common.EGenericDataSourceKind_MONGODB:
case api_common.EGenericDataSourceKind_MONGO_DB:
apiError = newAPIErrorFromMongoDbError(err)
default:
panic(fmt.Sprintf("Unexpected data source kind: %v", api_common.EGenericDataSourceKind_name[int32(kind)]))
Expand Down
4 changes: 4 additions & 0 deletions common/ydb_type_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ func MakeOptionalType(ydbType *Ydb.Type) *Ydb.Type {
return &Ydb.Type{Type: &Ydb.Type_OptionalType{OptionalType: &Ydb.OptionalType{Item: ydbType}}}
}

func MakeTaggedType(tag string, ydbType *Ydb.Type) *Ydb.Type {
return &Ydb.Type{Type: &Ydb.Type_TaggedType{TaggedType: &Ydb.TaggedType{Tag: tag, Type: ydbType}}}
}

func MakeListType(ydbType *Ydb.Type) *Ydb.Type {
return &Ydb.Type{Type: &Ydb.Type_ListType{ListType: &Ydb.ListType{Item: ydbType}}}
}
Expand Down
Loading

0 comments on commit 0c511fe

Please sign in to comment.