Skip to content

Commit

Permalink
MongoDB: simpler bsonToYqlColumn
Browse files Browse the repository at this point in the history
  • Loading branch information
ninaiad committed Jan 19, 2025
1 parent 1503bab commit 31d54ea
Showing 1 changed file with 82 additions and 57 deletions.
139 changes: 82 additions & 57 deletions app/server/datasource/mongodb/type_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,87 +60,112 @@ func typeMap(v bson.RawValue, logger *zap.Logger) (*Ydb.Type, error) {
return nil, common.ErrDataTypeNotSupported
}

func bsonToYqlColumn(docs []bson.Raw, doSkipUnsupported bool, logger *zap.Logger) ([]*Ydb.Column, error) {
if len(docs) == 0 {
return []*Ydb.Column{}, nil
func bsonToYqlColumnSingleDoc(
doc bson.Raw,
deducedTypes map[string]*Ydb.Type,
ambiguousFields, ambiguousArrayFields map[string]struct{},
doSkipUnsupported bool,
logger *zap.Logger,
) error {
elements, err := doc.Elements()
if err != nil {
return fmt.Errorf("doc.Elements(): %w", err)
}

deducedTypes := make(map[string]*Ydb.Type)
ambiguousFields := make(map[string]struct{})

for _, doc := range docs {
elements, err := doc.Elements()
for _, elem := range elements {
key, err := elem.KeyErr()
if err != nil {
return nil, fmt.Errorf("doc.Elements(): %w", err)
return fmt.Errorf("elem.KeyErr(): %w", err)
}

for _, elem := range elements {
key, err := elem.KeyErr()
if err != nil {
return nil, fmt.Errorf("elem.KeyErr(): %w", err)
}
prevType, prevTypeExists := deducedTypes[key]

t, err := typeMap(elem.Value(), logger)
if err != nil {
if errors.Is(err, errNull) {
ambiguousFields[key] = struct{}{}

prevType, prevTypeExists := deducedTypes[key]
prevTypeString := prevType.String()
continue
} else if errors.Is(err, errEmptyArray) {
ambiguousArrayFields[key] = struct{}{}

t, err := typeMap(elem.Value(), logger)
if err != nil {
if errors.Is(err, errNull) {
if !prevTypeExists {
ambiguousFields[key] = struct{}{}
}

continue
} else if errors.Is(err, errEmptyArray) {
if prevTypeExists && prevType.GetListType() == nil {
deducedTypes[key] = common.MakePrimitiveType(Ydb.Type_UTF8)

logger.Debug(fmt.Sprintf("bsonToYqlColumn: keeping serialized %v. prev: %v curr: []", key, prevTypeString))
} else if !prevTypeExists {
ambiguousFields[key] = struct{}{}
}

continue
} else if errors.Is(err, common.ErrDataTypeNotSupported) {
logger.Debug(fmt.Sprintf("bsonToYqlColumn: data not supported: %v", key))

if !doSkipUnsupported {
deducedTypes[key] = common.MakePrimitiveType(Ydb.Type_UTF8)
}

continue
if prevTypeExists && prevType.GetListType() == nil {
deducedTypes[key] = common.MakePrimitiveType(Ydb.Type_UTF8)

logger.Debug(fmt.Sprintf("bsonToYqlColumnSingleDoc: keeping serialized %v. prev: %v curr: []", key, prevType.String()))
}

return nil, err
continue
} else if errors.Is(err, common.ErrDataTypeNotSupported) {
logger.Debug(fmt.Sprintf("bsonToYqlColumnSingleDoc: data not supported: %v", key))

if !doSkipUnsupported {
deducedTypes[key] = common.MakePrimitiveType(Ydb.Type_UTF8)
}

continue
}

tString := t.String()
return err
}

if !prevTypeExists {
deducedTypes[key] = t
tString := t.String()
_, prevIsArray := ambiguousArrayFields[key]

logger.Debug(fmt.Sprintf("bsonToYqlColumn: new column %v %v", key, tString))
} else if prevTypeString != tString {
deducedTypes[key] = common.MakePrimitiveType(Ydb.Type_UTF8)
if (prevTypeExists && prevType.String() != tString) || (prevIsArray && t.GetListType() == nil) {
deducedTypes[key] = common.MakePrimitiveType(Ydb.Type_UTF8)

logger.Debug(fmt.Sprintf("bsonToYqlColumn: keeping serialized %v. curr: %v prev: %v", key, prevTypeString, tString))
}
logger.Debug(fmt.Sprintf("bsonToYqlColumnSingleDoc: keeping serialized %v. prev: %v curr: %v", key, prevType.String(), tString))

continue
}

deducedTypes[key] = t

logger.Debug(fmt.Sprintf("bsonToYqlColumnSingleDoc: column %v of type %v", key, tString))
}

if !doSkipUnsupported {
for field := range ambiguousFields {
if _, ok := deducedTypes[field]; !ok {
deducedTypes[field] = common.MakePrimitiveType(Ydb.Type_UTF8)
}
return nil
}

func bsonToYqlColumn(docs []bson.Raw, doSkipUnsupported bool, logger *zap.Logger) ([]*Ydb.Column, error) {
if len(docs) == 0 {
return []*Ydb.Column{}, nil
}

deducedTypes := make(map[string]*Ydb.Type)
ambiguousFields := make(map[string]struct{})
ambiguousArrayFields := make(map[string]struct{})

for _, doc := range docs {
err := bsonToYqlColumnSingleDoc(
doc,
deducedTypes,
ambiguousFields,
ambiguousArrayFields,
doSkipUnsupported,
logger,
)
if err != nil {
return nil, fmt.Errorf("bsonToYqlColumn(): %w", err)
}
}

for field := range ambiguousArrayFields {
ambiguousFields[field] = struct{}{}
}

for field := range ambiguousFields {
if _, ok := deducedTypes[field]; !ok {
deducedTypes[field] = common.MakePrimitiveType(Ydb.Type_UTF8)
}
}

columns := make([]*Ydb.Column, 0, len(deducedTypes))

for columnName, deducedType := range deducedTypes {
if columnName == idColumn {
columns = append(columns, &Ydb.Column{Name: columnName, Type: deducedTypes[columnName]})
columns = append(columns, &Ydb.Column{Name: columnName, Type: deducedType})
} else {
columns = append(columns, &Ydb.Column{Name: columnName, Type: common.MakeOptionalType(deducedType)})
}
Expand Down

0 comments on commit 31d54ea

Please sign in to comment.