Skip to content

Commit 3cab91d

Browse files
committed
Add MAP type support to the ClickHouse connector
1 parent 42b7ca9 commit 3cab91d

File tree

2 files changed

+191
-41
lines changed

2 files changed

+191
-41
lines changed

plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseClient.java

Lines changed: 146 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.clickhouse.data.ClickHouseColumn;
1717
import com.clickhouse.data.ClickHouseDataType;
1818
import com.clickhouse.data.ClickHouseVersion;
19+
import com.clickhouse.jdbc.JdbcTypeMapping;
1920
import com.google.common.base.Enums;
2021
import com.google.common.base.Splitter;
2122
import com.google.common.collect.ImmutableList;
@@ -61,7 +62,10 @@
6162
import io.trino.plugin.jdbc.expression.JdbcConnectorExpressionRewriterBuilder;
6263
import io.trino.plugin.jdbc.expression.ParameterizedExpression;
6364
import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
65+
import io.trino.spi.StandardErrorCode;
6466
import io.trino.spi.TrinoException;
67+
import io.trino.spi.block.BlockBuilder;
68+
import io.trino.spi.block.SqlMap;
6569
import io.trino.spi.connector.AggregateFunction;
6670
import io.trino.spi.connector.ColumnHandle;
6771
import io.trino.spi.connector.ColumnMetadata;
@@ -74,9 +78,16 @@
7478
import io.trino.spi.type.DecimalType;
7579
import io.trino.spi.type.Decimals;
7680
import io.trino.spi.type.Int128;
81+
<<<<<<< Updated upstream
82+
=======
83+
import io.trino.spi.type.LongTimestamp;
84+
import io.trino.spi.type.LongTimestampWithTimeZone;
85+
import io.trino.spi.type.MapType;
86+
>>>>>>> Stashed changes
7787
import io.trino.spi.type.StandardTypes;
7888
import io.trino.spi.type.Type;
7989
import io.trino.spi.type.TypeManager;
90+
import io.trino.spi.type.TypeOperators;
8091
import io.trino.spi.type.TypeSignature;
8192
import io.trino.spi.type.VarbinaryType;
8293
import io.trino.spi.type.VarcharType;
@@ -99,6 +110,7 @@
99110
import java.time.LocalDateTime;
100111
import java.time.ZonedDateTime;
101112
import java.util.Collection;
113+
import java.util.HashMap;
102114
import java.util.List;
103115
import java.util.Map;
104116
import java.util.Map.Entry;
@@ -151,7 +163,6 @@
151163
import static io.trino.plugin.jdbc.StandardColumnMappings.shortDecimalWriteFunction;
152164
import static io.trino.plugin.jdbc.StandardColumnMappings.smallintColumnMapping;
153165
import static io.trino.plugin.jdbc.StandardColumnMappings.smallintWriteFunction;
154-
import static io.trino.plugin.jdbc.StandardColumnMappings.timestampColumnMapping;
155166
import static io.trino.plugin.jdbc.StandardColumnMappings.timestampReadFunction;
156167
import static io.trino.plugin.jdbc.StandardColumnMappings.tinyintColumnMapping;
157168
import static io.trino.plugin.jdbc.StandardColumnMappings.tinyintWriteFunction;
@@ -164,6 +175,7 @@
164175
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
165176
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
166177
import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
178+
import static io.trino.spi.block.MapValueBuilder.buildMapValue;
167179
import static io.trino.spi.connector.ConnectorMetadata.MODIFYING_ROWS_MESSAGE;
168180
import static io.trino.spi.type.BigintType.BIGINT;
169181
import static io.trino.spi.type.BooleanType.BOOLEAN;
@@ -212,6 +224,7 @@ public class ClickHouseClient
212224

213225
private final ConnectorExpressionRewriter<ParameterizedExpression> connectorExpressionRewriter;
214226
private final AggregateFunctionRewriter<JdbcExpression, ?> aggregateFunctionRewriter;
227+
private final TypeOperators typeOperators;
215228
private final Type uuidType;
216229
private final Type ipAddressType;
217230
private final AtomicReference<ClickHouseVersion> clickHouseVersion = new AtomicReference<>();
@@ -226,6 +239,7 @@ public ClickHouseClient(
226239
RemoteQueryModifier queryModifier)
227240
{
228241
super("\"", connectionFactory, queryBuilder, config.getJdbcTypesMappedToVarchar(), identifierMapping, queryModifier, false);
242+
this.typeOperators = typeManager.getTypeOperators();
229243
this.uuidType = typeManager.getType(new TypeSignature(StandardTypes.UUID));
230244
this.ipAddressType = typeManager.getType(new TypeSignature(StandardTypes.IPADDRESS));
231245
JdbcTypeHandle bigintTypeHandle = new JdbcTypeHandle(Types.BIGINT, Optional.of("bigint"), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
@@ -630,14 +644,27 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
630644
{
631645
String jdbcTypeName = typeHandle.jdbcTypeName()
632646
.orElseThrow(() -> new TrinoException(JDBC_ERROR, "Type name is missing: " + typeHandle));
633-
634647
Optional<ColumnMapping> mapping = getForcedMappingToVarchar(typeHandle);
635648
if (mapping.isPresent()) {
636649
return mapping;
637650
}
638651

652+
Optional<ColumnMapping> columnMapping = toColumnMapping(session, jdbcTypeName, typeHandle.jdbcType(), typeHandle.decimalDigits(), typeHandle.columnSize());
653+
if (columnMapping.isEmpty() && getUnsupportedTypeHandling(session) == CONVERT_TO_VARCHAR) {
654+
return mapToUnboundedVarchar(typeHandle);
655+
}
656+
return columnMapping;
657+
}
658+
659+
private Optional<ColumnMapping> toColumnMapping(
660+
ConnectorSession session,
661+
String typeName,
662+
int jdbcType,
663+
Optional<Integer> decimalDigits,
664+
Optional<Integer> columnSize)
665+
{
639666
ClickHouseVersion version = getClickHouseServerVersion(session);
640-
ClickHouseColumn column = ClickHouseColumn.of("", jdbcTypeName);
667+
ClickHouseColumn column = ClickHouseColumn.of("", typeName);
641668
ClickHouseDataType columnDataType = column.getDataType();
642669
switch (columnDataType) {
643670
case Bool:
@@ -677,11 +704,13 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
677704
return Optional.of(varbinaryColumnMapping());
678705
case UUID:
679706
return Optional.of(uuidColumnMapping());
707+
case Map:
708+
return mapColumnMapping(session, column.getKeyInfo(), column.getValueInfo());
680709
default:
681710
// no-op
682711
}
683712

684-
switch (typeHandle.jdbcType()) {
713+
switch (jdbcType) {
685714
case Types.TINYINT:
686715
return Optional.of(tinyintColumnMapping());
687716

@@ -706,16 +735,18 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
706735
return Optional.of(doubleColumnMapping());
707736

708737
case Types.DECIMAL:
709-
int decimalDigits = typeHandle.requiredDecimalDigits();
710-
int precision = typeHandle.requiredColumnSize();
711-
712738
ColumnMapping decimalColumnMapping;
739+
<<<<<<< Updated upstream
713740
if (getDecimalRounding(session) == ALLOW_OVERFLOW && precision > Decimals.MAX_PRECISION) {
714741
int scale = Math.min(decimalDigits, getDecimalDefaultScale(session));
742+
=======
743+
if (getDecimalRounding(session) == ALLOW_OVERFLOW && columnSize.orElseThrow() > Decimals.MAX_PRECISION) {
744+
int scale = min(decimalDigits.orElseThrow(), getDecimalDefaultScale(session));
745+
>>>>>>> Stashed changes
715746
decimalColumnMapping = decimalColumnMapping(createDecimalType(Decimals.MAX_PRECISION, scale), getDecimalRoundingMode(session));
716747
}
717748
else {
718-
decimalColumnMapping = decimalColumnMapping(createDecimalType(precision, max(decimalDigits, 0)));
749+
decimalColumnMapping = decimalColumnMapping(createDecimalType(columnSize.orElseThrow(), max(decimalDigits.orElseThrow(), 0)));
719750
}
720751
return Optional.of(ColumnMapping.mapping(
721752
decimalColumnMapping.getType(),
@@ -730,28 +761,39 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
730761
case Types.TIMESTAMP:
731762
if (columnDataType == ClickHouseDataType.DateTime) {
732763
// ClickHouse DateTime does not have sub-second precision
733-
verify(typeHandle.requiredDecimalDigits() == 0, "Expected 0 as timestamp precision, but got %s", typeHandle.requiredDecimalDigits());
764+
verify(decimalDigits.orElseThrow() == 0, "Expected 0 as timestamp precision, but got %s", decimalDigits.orElseThrow());
734765
return Optional.of(ColumnMapping.longMapping(
735766
TIMESTAMP_SECONDS,
736767
timestampReadFunction(TIMESTAMP_SECONDS),
737768
timestampSecondsWriteFunction(version)));
738769
}
770+
<<<<<<< Updated upstream
739771
// TODO (https://github.com/trinodb/trino/issues/10537) Add support for Datetime64 type
740772
return Optional.of(timestampColumnMapping(TIMESTAMP_MILLIS));
773+
=======
774+
if (columnDataType == ClickHouseDataType.DateTime64) {
775+
return Optional.of(timestampColumnMapping(createTimestampType(decimalDigits.orElseThrow()), getClickHouseServerVersion(session)));
776+
}
777+
// TODO Add support for Datetime32 type
778+
return Optional.of(timestampColumnMapping(TIMESTAMP_MILLIS, version));
779+
>>>>>>> Stashed changes
741780

742781
case Types.TIMESTAMP_WITH_TIMEZONE:
743782
if (columnDataType == ClickHouseDataType.DateTime) {
744783
// ClickHouse DateTime does not have sub-second precision
745-
verify(typeHandle.requiredDecimalDigits() == 0, "Expected 0 as timestamp with time zone precision, but got %s", typeHandle.requiredDecimalDigits());
784+
verify(decimalDigits.orElseThrow() == 0, "Expected 0 as timestamp with time zone precision, but got %s", decimalDigits.orElseThrow());
746785
return Optional.of(ColumnMapping.longMapping(
747786
TIMESTAMP_TZ_SECONDS,
748787
shortTimestampWithTimeZoneReadFunction(),
788+
<<<<<<< Updated upstream
749789
shortTimestampWithTimeZoneWriteFunction(version, column.getTimeZone())));
790+
=======
791+
shortTimestampWithTimeZoneWriteFunction(DATETIME, version, column.getTimeZone())));
792+
}
793+
if (columnDataType == ClickHouseDataType.DateTime64) {
794+
return Optional.of(timestampWithTimeZoneColumnMapping(column, decimalDigits.orElseThrow(), version));
795+
>>>>>>> Stashed changes
750796
}
751-
}
752-
753-
if (getUnsupportedTypeHandling(session) == CONVERT_TO_VARCHAR) {
754-
return mapToUnboundedVarchar(typeHandle);
755797
}
756798

757799
return Optional.empty();
@@ -805,6 +847,12 @@ public WriteMapping toWriteMapping(ConnectorSession session, Type type)
805847
if (type.equals(uuidType)) {
806848
return WriteMapping.sliceMapping("UUID", uuidWriteFunction());
807849
}
850+
if (type instanceof MapType mapType) {
851+
WriteMapping keyMapping = toWriteMapping(session, mapType.getKeyType());
852+
WriteMapping valueMapping = toWriteMapping(session, mapType.getValueType());
853+
String dataType = "Map(%s, %s)".formatted(keyMapping.getDataType(), valueMapping.getDataType());
854+
return WriteMapping.objectMapping(dataType, mapWriteFunction());
855+
}
808856
throw new TrinoException(NOT_SUPPORTED, "Unsupported column type: " + type);
809857
}
810858

@@ -1003,6 +1051,90 @@ private static SliceWriteFunction uuidWriteFunction()
10031051
return (statement, index, value) -> statement.setObject(index, trinoUuidToJavaUuid(value), Types.OTHER);
10041052
}
10051053

1054+
private Optional<ColumnMapping> mapColumnMapping(ConnectorSession session, ClickHouseColumn keyColumn, ClickHouseColumn valueColumn)
1055+
{
1056+
JdbcTypeMapping typeMapping = JdbcTypeMapping.getDefaultMapping();
1057+
Optional<ColumnMapping> keyMapping = toColumnMapping(
1058+
session,
1059+
keyColumn.getDataType().name(),
1060+
typeMapping.toSqlType(keyColumn, Map.of()),
1061+
Optional.of(keyColumn.getPrecision()),
1062+
Optional.of(keyColumn.getScale()));
1063+
Optional<ColumnMapping> valueMapping = toColumnMapping(
1064+
session,
1065+
valueColumn.getDataType().name(),
1066+
typeMapping.toSqlType(valueColumn, Map.of()),
1067+
Optional.of(valueColumn.getPrecision()),
1068+
Optional.of(valueColumn.getScale()));
1069+
if (keyMapping.isEmpty() || valueMapping.isEmpty()) {
1070+
return Optional.empty();
1071+
}
1072+
1073+
MapType mapType = new MapType(
1074+
keyMapping.get().getType(),
1075+
valueMapping.get().getType(),
1076+
typeOperators);
1077+
return Optional.of(ColumnMapping.objectMapping(
1078+
mapType,
1079+
ObjectReadFunction.of(SqlMap.class, (resultSet, columnIndex) -> {
1080+
Object data = resultSet.getObject(columnIndex);
1081+
if (!(data instanceof Map<?, ?> mapData)) {
1082+
throw new TrinoException(StandardErrorCode.TYPE_MISMATCH, "Expected ClickHouse to return a Map");
1083+
}
1084+
1085+
return buildMapValue(
1086+
mapType,
1087+
mapData.size(),
1088+
(keyBuilder, valueBuilder) -> {
1089+
for (Object key : mapData.keySet()) {
1090+
writeValue(keyMapping.get().getType(), keyBuilder, key);
1091+
writeValue(valueMapping.get().getType(), valueBuilder, mapData.get(key));
1092+
}
1093+
});
1094+
}),
1095+
mapWriteFunction()));
1096+
}
1097+
1098+
private static ObjectWriteFunction mapWriteFunction()
1099+
{
1100+
return ObjectWriteFunction.of(SqlMap.class, (statement, index, value) -> {
1101+
MapType mapType = (MapType) value.getMapType();
1102+
Type keyType = mapType.getKeyType();
1103+
Type valueType = mapType.getValueType();
1104+
1105+
Map<Object, Object> mapValue = new HashMap<>();
1106+
for (int position = 0; position < value.getSize(); position++) {
1107+
Object keyEntry = keyType.getObjectValue(value.getRawKeyBlock(), position);
1108+
Object valueEntry = valueType.getObjectValue(value.getRawValueBlock(), position);
1109+
mapValue.put(keyEntry, valueEntry);
1110+
}
1111+
1112+
statement.setObject(index, mapValue);
1113+
});
1114+
}
1115+
1116+
private static void writeValue(Type type, BlockBuilder blockBuilder, Object value)
1117+
{
1118+
if (value == null) {
1119+
blockBuilder.appendNull();
1120+
}
1121+
else if (type.getJavaType() == long.class) {
1122+
type.writeLong(blockBuilder, ((Number) value).longValue());
1123+
}
1124+
else if (type.getJavaType() == double.class) {
1125+
type.writeDouble(blockBuilder, ((Number) value).doubleValue());
1126+
}
1127+
else if (type.getJavaType() == boolean.class) {
1128+
type.writeBoolean(blockBuilder, (boolean) value);
1129+
}
1130+
else if (type.getJavaType() == io.airlift.slice.Slice.class) {
1131+
type.writeSlice(blockBuilder, io.airlift.slice.Slices.utf8Slice(value.toString()));
1132+
}
1133+
else {
1134+
throw new UnsupportedOperationException("Unsupported type for map key or value: " + type);
1135+
}
1136+
}
1137+
10061138
public static boolean supportsPushdown(Variable variable, RewriteContext<ParameterizedExpression> context)
10071139
{
10081140
JdbcTypeHandle typeHandle = ((JdbcColumnHandle) context.getAssignment(variable.getName()))

0 commit comments

Comments
 (0)