Skip to content

Commit 76808e8

Browse files
chenjian2664ebyhr
authored andcommitted
Fix reading mixed-type array in Mongodb
1 parent 6f93102 commit 76808e8

File tree

6 files changed

+83
-15
lines changed

6 files changed

+83
-15
lines changed

plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoPageSource.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import static io.trino.plugin.mongodb.MongoSession.DATABASE_NAME;
6767
import static io.trino.plugin.mongodb.MongoSession.ID;
6868
import static io.trino.plugin.mongodb.ObjectIdType.OBJECT_ID;
69+
import static io.trino.plugin.mongodb.TypeUtils.getImplicitRowFieldIndex;
6970
import static io.trino.plugin.mongodb.TypeUtils.isJsonType;
7071
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
7172
import static io.trino.spi.type.BigintType.BIGINT;
@@ -103,18 +104,21 @@ public class MongoPageSource
103104
private boolean finished;
104105

105106
private final PageBuilder pageBuilder;
107+
private final String implicitPrefix;
106108

107109
public MongoPageSource(
108110
MongoSession mongoSession,
109111
MongoTableHandle tableHandle,
110-
List<MongoColumnHandle> columns)
112+
List<MongoColumnHandle> columns,
113+
String implicitPrefix)
111114
{
112115
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
113116
this.columnTypes = columns.stream().map(MongoColumnHandle::type).collect(toList());
114117
this.cursor = mongoSession.execute(tableHandle, columns);
115118
currentDoc = null;
116119

117120
pageBuilder = new PageBuilder(columnTypes);
121+
this.implicitPrefix = requireNonNull(implicitPrefix, "implicitPrefix is null");
118122
}
119123

120124
@Override
@@ -383,7 +387,7 @@ else if (type instanceof RowType rowType) {
383387
output.appendNull();
384388
}
385389

386-
private static Object getColumnValue(Document document, MongoColumnHandle mongoColumnHandle)
390+
private Object getColumnValue(Document document, MongoColumnHandle mongoColumnHandle)
387391
{
388392
Object value = document.get(mongoColumnHandle.baseName());
389393
if (mongoColumnHandle.isBaseColumn()) {
@@ -392,17 +396,27 @@ private static Object getColumnValue(Document document, MongoColumnHandle mongoC
392396
if (value instanceof DBRef dbRefValue) {
393397
return getDbRefValue(dbRefValue, mongoColumnHandle);
394398
}
395-
Document documentValue = (Document) value;
399+
396400
for (String dereferenceName : mongoColumnHandle.dereferenceNames()) {
397401
// When parent field itself is null
398-
if (documentValue == null) {
402+
if (value == null) {
399403
return null;
400404
}
401-
value = documentValue.get(dereferenceName);
402-
if (value instanceof Document nestedDocument) {
403-
documentValue = nestedDocument;
405+
if (value instanceof Document documentValue) {
406+
value = documentValue.get(dereferenceName);
407+
}
408+
else {
409+
checkArgument(value instanceof List<?>, "Unsupported dereference of %s in %s", value.getClass(), mongoColumnHandle);
410+
List<?> arrayValue = (List<?>) value;
411+
int arrayPosition = getImplicitRowFieldIndex(dereferenceName, implicitPrefix) - 1;
412+
checkArgument(arrayPosition >= 0, "Invalid array position %s in %s", dereferenceName, mongoColumnHandle);
413+
if (arrayPosition >= arrayValue.size()) {
414+
return null;
415+
}
416+
value = arrayValue.get(arrayPosition);
404417
}
405-
else if (value instanceof DBRef dbRefValue) {
418+
419+
if (value instanceof DBRef dbRefValue) {
406420
// Assuming DBRefField is the leaf field
407421
return getDbRefValue(dbRefValue, mongoColumnHandle);
408422
}

plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoPageSourceProvider.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,13 @@ public class MongoPageSourceProvider
3737
private static final int MONGO_DOMAIN_COMPACTION_THRESHOLD = 1000;
3838

3939
private final MongoSession mongoSession;
40+
private final String implicitPrefix;
4041

4142
@Inject
42-
public MongoPageSourceProvider(MongoSession mongoSession)
43+
public MongoPageSourceProvider(MongoSession mongoSession, MongoClientConfig config)
4344
{
4445
this.mongoSession = requireNonNull(mongoSession, "mongoSession is null");
46+
this.implicitPrefix = config.getImplicitRowFieldPrefix();
4547
}
4648

4749
@Override
@@ -80,6 +82,6 @@ public ConnectorPageSource createPageSource(
8082
return new EmptyPageSource();
8183
}
8284

83-
return new MongoPageSource(mongoSession, newTableHandle, handles.build());
85+
return new MongoPageSource(mongoSession, newTableHandle, handles.build(), implicitPrefix);
8486
}
8587
}

plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ public MongoCursor<Document> execute(MongoTableHandle tableHandle, List<MongoCol
524524
Set<MongoColumnHandle> projectedColumns = tableHandle.projectedColumns();
525525
checkArgument(projectedColumns.isEmpty() || projectedColumns.containsAll(columns), "projectedColumns must be empty or equal to columns");
526526

527-
Document projection = buildProjection(columns);
527+
Document projection = buildProjection(columns, implicitPrefix);
528528

529529
MongoCollection<Document> collection = getCollection(tableHandle.remoteTableName());
530530
Document filter = buildFilter(tableHandle);
@@ -540,7 +540,7 @@ public MongoCursor<Document> execute(MongoTableHandle tableHandle, List<MongoCol
540540
}
541541

542542
@VisibleForTesting
543-
static Document buildProjection(List<MongoColumnHandle> columns)
543+
static Document buildProjection(List<MongoColumnHandle> columns, String implicitPrefix)
544544
{
545545
Document output = new Document();
546546

@@ -552,7 +552,13 @@ static Document buildProjection(List<MongoColumnHandle> columns)
552552
// Starting in MongoDB 4.4, it is illegal to project an embedded document with any of the embedded document's fields
553553
// (https://www.mongodb.com/docs/manual/reference/limits/#mongodb-limit-Projection-Restrictions). So, Project only sufficient columns.
554554
for (MongoColumnHandle column : projectSufficientColumns(columns)) {
555-
output.append(column.getQualifiedName(), 1);
555+
if (column.dereferenceNames().stream().anyMatch(columnName -> TypeUtils.isImplicitRowField(columnName, implicitPrefix))) {
556+
// Add parent field for implicit column
557+
output.append(column.baseName(), 1);
558+
}
559+
else {
560+
output.append(column.getQualifiedName(), 1);
561+
}
556562
}
557563

558564
return output;

plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/TypeUtils.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.util.Set;
2323

24+
import static com.google.common.base.Verify.verify;
2425
import static io.trino.spi.type.BigintType.BIGINT;
2526
import static io.trino.spi.type.BooleanType.BOOLEAN;
2627
import static io.trino.spi.type.DateType.DATE;
@@ -64,4 +65,25 @@ public static boolean isPushdownSupportedType(Type type)
6465
|| type instanceof ObjectIdType
6566
|| PUSHDOWN_SUPPORTED_PRIMITIVE_TYPES.contains(type);
6667
}
68+
69+
public static boolean isImplicitRowField(String fieldName, String implicitPrefix)
70+
{
71+
return getImplicitRowFieldIndex(fieldName, implicitPrefix) != -1;
72+
}
73+
74+
public static int getImplicitRowFieldIndex(String fieldName, String implicitPrefix)
75+
{
76+
if (fieldName.length() <= implicitPrefix.length() || !fieldName.startsWith(implicitPrefix)) {
77+
return -1;
78+
}
79+
80+
try {
81+
int fieldIndex = Integer.parseInt(fieldName.substring(implicitPrefix.length()));
82+
verify(fieldIndex >= 1, "Field index must be >= 1");
83+
return fieldIndex;
84+
}
85+
catch (NumberFormatException e) {
86+
return -1;
87+
}
88+
}
6789
}

plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/TestMongoConnectorTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,30 @@ protected TestTable createTableWithDefaultColumns()
123123
return abort("MongoDB connector does not support column default values");
124124
}
125125

126+
@Test
127+
void testMongoMixedTypeArrayType()
128+
{
129+
String schema = getSession().getSchema().orElseThrow();
130+
String table = "test_mixed_array_" + randomNameSuffix();
131+
MongoDatabase db = client.getDatabase(schema);
132+
133+
db.createCollection(table);
134+
db.getCollection(table)
135+
.insertOne(new Document("mixed_array_col", ImmutableList.of(1, "two", 3.0, new Document("nested_arr", ImmutableList.of(4, 5)))));
136+
137+
assertThat(query("SHOW COLUMNS FROM " + table))
138+
.skippingTypesCheck()
139+
.matches("VALUES " +
140+
"('mixed_array_col', 'row(_pos1 bigint, _pos2 varchar, _pos3 double, _pos4 row(nested_arr array(bigint)))', '', '')");
141+
142+
assertThat(query("SELECT mixed_array_col._pos1, mixed_array_col._pos2, mixed_array_col._pos3 FROM " + table))
143+
.matches("VALUES (BIGINT '1', VARCHAR 'two', DOUBLE '3.0')");
144+
assertThat(query("SELECT mixed_array_col._pos4.nested_arr[1], mixed_array_col._pos4.nested_arr[2] FROM " + table))
145+
.matches("VALUES (BIGINT '4', BIGINT '5')");
146+
147+
assertUpdate("DROP TABLE " + table);
148+
}
149+
126150
@Test
127151
@Override
128152
public void testColumnName()

plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/TestMongoSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void testBuildProjectionWithoutId()
5555
{
5656
List<MongoColumnHandle> columns = ImmutableList.of(COL1, COL2);
5757

58-
Document output = MongoSession.buildProjection(columns);
58+
Document output = MongoSession.buildProjection(columns, "_pos");
5959
Document expected = new Document()
6060
.append(COL1.baseName(), 1)
6161
.append(COL2.baseName(), 1)
@@ -69,7 +69,7 @@ public void testBuildProjectionWithId()
6969
{
7070
List<MongoColumnHandle> columns = ImmutableList.of(COL1, COL2, ID_COL);
7171

72-
Document output = MongoSession.buildProjection(columns);
72+
Document output = MongoSession.buildProjection(columns, "_pos");
7373
Document expected = new Document()
7474
.append(COL1.baseName(), 1)
7575
.append(COL2.baseName(), 1)

0 commit comments

Comments
 (0)