Skip to content

Commit c98a03a

Browse files
committed
Add MAP type support to the ClickHouse connector
1 parent c37191d commit c98a03a

File tree

5 files changed

+309
-34
lines changed

5 files changed

+309
-34
lines changed

docs/src/main/sphinx/connector/clickhouse.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,9 @@ to the following table:
248248
* - `UUID`
249249
- `UUID`
250250
-
251+
* - `MAP(k, v)`
252+
- `MAP(k, v)`
253+
-
251254
:::
252255

253256
No other types are supported.
@@ -307,6 +310,9 @@ to the following table:
307310
* - `UUID`
308311
- `UUID`
309312
-
313+
* - `MAP(k, v)`
314+
- `MAP(k, v)`
315+
-
310316
:::
311317

312318
No other types are supported.

plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseClient.java

Lines changed: 177 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import com.clickhouse.data.ClickHouseColumn;
1717
import com.clickhouse.data.ClickHouseDataType;
1818
import com.clickhouse.data.ClickHouseVersion;
19+
import com.clickhouse.data.value.UnsignedLong;
20+
import com.clickhouse.jdbc.JdbcTypeMapping;
1921
import com.google.common.base.Enums;
2022
import com.google.common.base.Splitter;
2123
import com.google.common.collect.ImmutableList;
@@ -44,6 +46,7 @@
4446
import io.trino.plugin.jdbc.JdbcTypeHandle;
4547
import io.trino.plugin.jdbc.LongReadFunction;
4648
import io.trino.plugin.jdbc.LongWriteFunction;
49+
import io.trino.plugin.jdbc.ObjectReadFunction;
4750
import io.trino.plugin.jdbc.ObjectWriteFunction;
4851
import io.trino.plugin.jdbc.QueryBuilder;
4952
import io.trino.plugin.jdbc.RemoteTableName;
@@ -61,7 +64,11 @@
6164
import io.trino.plugin.jdbc.expression.JdbcConnectorExpressionRewriterBuilder;
6265
import io.trino.plugin.jdbc.expression.ParameterizedExpression;
6366
import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
67+
import io.trino.spi.StandardErrorCode;
6468
import io.trino.spi.TrinoException;
69+
import io.trino.spi.block.BlockBuilder;
70+
import io.trino.spi.block.MapHashTables;
71+
import io.trino.spi.block.SqlMap;
6572
import io.trino.spi.connector.AggregateFunction;
6673
import io.trino.spi.connector.ColumnHandle;
6774
import io.trino.spi.connector.ColumnMetadata;
@@ -74,9 +81,11 @@
7481
import io.trino.spi.type.DecimalType;
7582
import io.trino.spi.type.Decimals;
7683
import io.trino.spi.type.Int128;
84+
import io.trino.spi.type.MapType;
7785
import io.trino.spi.type.StandardTypes;
7886
import io.trino.spi.type.Type;
7987
import io.trino.spi.type.TypeManager;
88+
import io.trino.spi.type.TypeOperators;
8089
import io.trino.spi.type.TypeSignature;
8190
import io.trino.spi.type.VarbinaryType;
8291
import io.trino.spi.type.VarcharType;
@@ -99,6 +108,7 @@
99108
import java.time.LocalDateTime;
100109
import java.time.ZonedDateTime;
101110
import java.util.Collection;
111+
import java.util.HashMap;
102112
import java.util.List;
103113
import java.util.Map;
104114
import java.util.Map.Entry;
@@ -116,6 +126,7 @@
116126
import static com.google.common.base.Strings.isNullOrEmpty;
117127
import static com.google.common.base.Verify.verify;
118128
import static com.google.common.collect.ImmutableMap.toImmutableMap;
129+
import static io.airlift.slice.Slices.utf8Slice;
119130
import static io.airlift.slice.Slices.wrappedBuffer;
120131
import static io.trino.plugin.clickhouse.ClickHouseSessionProperties.isMapStringAsVarchar;
121132
import static io.trino.plugin.clickhouse.ClickHouseTableProperties.ENGINE_PROPERTY;
@@ -164,6 +175,7 @@
164175
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
165176
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
166177
import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
178+
import static io.trino.spi.block.MapValueBuilder.buildMapValue;
167179
import static io.trino.spi.connector.ConnectorMetadata.MODIFYING_ROWS_MESSAGE;
168180
import static io.trino.spi.type.BigintType.BIGINT;
169181
import static io.trino.spi.type.BooleanType.BOOLEAN;
@@ -184,6 +196,7 @@
184196
import static io.trino.spi.type.UuidType.javaUuidToTrinoUuid;
185197
import static io.trino.spi.type.UuidType.trinoUuidToJavaUuid;
186198
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
199+
import static java.lang.Float.floatToIntBits;
187200
import static java.lang.Float.floatToRawIntBits;
188201
import static java.lang.Math.floorDiv;
189202
import static java.lang.Math.floorMod;
@@ -212,6 +225,7 @@ public class ClickHouseClient
212225

213226
private final ConnectorExpressionRewriter<ParameterizedExpression> connectorExpressionRewriter;
214227
private final AggregateFunctionRewriter<JdbcExpression, ?> aggregateFunctionRewriter;
228+
private final TypeOperators typeOperators;
215229
private final Type uuidType;
216230
private final Type ipAddressType;
217231
private final AtomicReference<ClickHouseVersion> clickHouseVersion = new AtomicReference<>();
@@ -226,6 +240,7 @@ public ClickHouseClient(
226240
RemoteQueryModifier queryModifier)
227241
{
228242
super("\"", connectionFactory, queryBuilder, config.getJdbcTypesMappedToVarchar(), identifierMapping, queryModifier, false);
243+
this.typeOperators = typeManager.getTypeOperators();
229244
this.uuidType = typeManager.getType(new TypeSignature(StandardTypes.UUID));
230245
this.ipAddressType = typeManager.getType(new TypeSignature(StandardTypes.IPADDRESS));
231246
JdbcTypeHandle bigintTypeHandle = new JdbcTypeHandle(Types.BIGINT, Optional.of("bigint"), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
@@ -636,8 +651,22 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
636651
return mapping;
637652
}
638653

654+
Optional<ColumnMapping> columnMapping = toColumnMapping(session, jdbcTypeName, typeHandle.jdbcType(), typeHandle.decimalDigits(), typeHandle.columnSize());
655+
if (columnMapping.isEmpty() && getUnsupportedTypeHandling(session) == CONVERT_TO_VARCHAR) {
656+
return mapToUnboundedVarchar(typeHandle);
657+
}
658+
return columnMapping;
659+
}
660+
661+
private Optional<ColumnMapping> toColumnMapping(
662+
ConnectorSession session,
663+
String typeName,
664+
int jdbcType,
665+
Optional<Integer> decimalDigits,
666+
Optional<Integer> columnSize)
667+
{
639668
ClickHouseVersion version = getClickHouseServerVersion(session);
640-
ClickHouseColumn column = ClickHouseColumn.of("", jdbcTypeName);
669+
ClickHouseColumn column = ClickHouseColumn.of("", typeName);
641670
ClickHouseDataType columnDataType = column.getDataType();
642671
switch (columnDataType) {
643672
case Bool:
@@ -677,11 +706,13 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
677706
return Optional.of(varbinaryColumnMapping());
678707
case UUID:
679708
return Optional.of(uuidColumnMapping());
709+
case Map:
710+
return mapColumnMapping(session, column.getKeyInfo(), column.getValueInfo());
680711
default:
681712
// no-op
682713
}
683714

684-
switch (typeHandle.jdbcType()) {
715+
switch (jdbcType) {
685716
case Types.TINYINT:
686717
return Optional.of(tinyintColumnMapping());
687718

@@ -706,16 +737,13 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
706737
return Optional.of(doubleColumnMapping());
707738

708739
case Types.DECIMAL:
709-
int decimalDigits = typeHandle.requiredDecimalDigits();
710-
int precision = typeHandle.requiredColumnSize();
711-
712740
ColumnMapping decimalColumnMapping;
713-
if (getDecimalRounding(session) == ALLOW_OVERFLOW && precision > Decimals.MAX_PRECISION) {
714-
int scale = Math.min(decimalDigits, getDecimalDefaultScale(session));
741+
if (getDecimalRounding(session) == ALLOW_OVERFLOW && columnSize.orElseThrow() > Decimals.MAX_PRECISION) {
742+
int scale = Math.min(decimalDigits.orElseThrow(), getDecimalDefaultScale(session));
715743
decimalColumnMapping = decimalColumnMapping(createDecimalType(Decimals.MAX_PRECISION, scale), getDecimalRoundingMode(session));
716744
}
717745
else {
718-
decimalColumnMapping = decimalColumnMapping(createDecimalType(precision, max(decimalDigits, 0)));
746+
decimalColumnMapping = decimalColumnMapping(createDecimalType(columnSize.orElseThrow(), max(decimalDigits.orElseThrow(), 0)));
719747
}
720748
return Optional.of(ColumnMapping.mapping(
721749
decimalColumnMapping.getType(),
@@ -730,7 +758,7 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
730758
case Types.TIMESTAMP:
731759
if (columnDataType == ClickHouseDataType.DateTime) {
732760
// ClickHouse DateTime does not have sub-second precision
733-
verify(typeHandle.requiredDecimalDigits() == 0, "Expected 0 as timestamp precision, but got %s", typeHandle.requiredDecimalDigits());
761+
verify(decimalDigits.orElseThrow() == 0, "Expected 0 as timestamp precision, but got %s", decimalDigits.orElseThrow());
734762
return Optional.of(ColumnMapping.longMapping(
735763
TIMESTAMP_SECONDS,
736764
timestampReadFunction(TIMESTAMP_SECONDS),
@@ -742,18 +770,14 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
742770
case Types.TIMESTAMP_WITH_TIMEZONE:
743771
if (columnDataType == ClickHouseDataType.DateTime) {
744772
// ClickHouse DateTime does not have sub-second precision
745-
verify(typeHandle.requiredDecimalDigits() == 0, "Expected 0 as timestamp with time zone precision, but got %s", typeHandle.requiredDecimalDigits());
773+
verify(decimalDigits.orElseThrow() == 0, "Expected 0 as timestamp with time zone precision, but got %s", decimalDigits.orElseThrow());
746774
return Optional.of(ColumnMapping.longMapping(
747775
TIMESTAMP_TZ_SECONDS,
748776
shortTimestampWithTimeZoneReadFunction(),
749777
shortTimestampWithTimeZoneWriteFunction(version, column.getTimeZone())));
750778
}
751779
}
752780

753-
if (getUnsupportedTypeHandling(session) == CONVERT_TO_VARCHAR) {
754-
return mapToUnboundedVarchar(typeHandle);
755-
}
756-
757781
return Optional.empty();
758782
}
759783

@@ -805,6 +829,12 @@ public WriteMapping toWriteMapping(ConnectorSession session, Type type)
805829
if (type.equals(uuidType)) {
806830
return WriteMapping.sliceMapping("UUID", uuidWriteFunction());
807831
}
832+
if (type instanceof MapType mapType) {
833+
WriteMapping keyMapping = toWriteMapping(session, mapType.getKeyType());
834+
WriteMapping valueMapping = toWriteMapping(session, mapType.getValueType());
835+
String dataType = "Map(%s, %s)".formatted(keyMapping.getDataType(), valueMapping.getDataType());
836+
return WriteMapping.objectMapping(dataType, mapWriteFunction());
837+
}
808838
throw new TrinoException(NOT_SUPPORTED, "Unsupported column type: " + type);
809839
}
810840

@@ -946,26 +976,30 @@ private ColumnMapping ipAddressColumnMapping(String clickhouseType)
946976
(resultSet, columnIndex) -> {
947977
// copied from IpAddressOperators.castFromVarcharToIpAddress
948978
byte[] address = InetAddresses.forString(resultSet.getString(columnIndex)).getAddress();
949-
950-
byte[] bytes;
951-
if (address.length == 4) {
952-
bytes = new byte[16];
953-
bytes[10] = (byte) 0xff;
954-
bytes[11] = (byte) 0xff;
955-
arraycopy(address, 0, bytes, 12, 4);
956-
}
957-
else if (address.length == 16) {
958-
bytes = address;
959-
}
960-
else {
961-
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Invalid InetAddress length: " + address.length);
962-
}
963-
979+
byte[] bytes = parseIpAddressBytes(address);
964980
return wrappedBuffer(bytes);
965981
},
966982
ipAddressWriteFunction(clickhouseType));
967983
}
968984

985+
private static byte[] parseIpAddressBytes(byte[] address)
986+
{
987+
byte[] parsedBytes;
988+
if (address.length == 4) {
989+
parsedBytes = new byte[16];
990+
parsedBytes[10] = (byte) 0xff;
991+
parsedBytes[11] = (byte) 0xff;
992+
arraycopy(address, 0, parsedBytes, 12, 4);
993+
}
994+
else if (address.length == 16) {
995+
parsedBytes = address;
996+
}
997+
else {
998+
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Invalid InetAddress length: " + address.length);
999+
}
1000+
return parsedBytes;
1001+
}
1002+
9691003
private static SliceWriteFunction ipAddressWriteFunction(String clickhouseType)
9701004
{
9711005
return new SliceWriteFunction()
@@ -1003,6 +1037,120 @@ private static SliceWriteFunction uuidWriteFunction()
10031037
return (statement, index, value) -> statement.setObject(index, trinoUuidToJavaUuid(value), Types.OTHER);
10041038
}
10051039

1040+
private Optional<ColumnMapping> mapColumnMapping(ConnectorSession session, ClickHouseColumn keyColumn, ClickHouseColumn valueColumn)
1041+
{
1042+
JdbcTypeMapping typeMapping = JdbcTypeMapping.getDefaultMapping();
1043+
Optional<ColumnMapping> keyMapping = toColumnMapping(
1044+
session,
1045+
keyColumn.getOriginalTypeName(),
1046+
typeMapping.toSqlType(keyColumn, Map.of()),
1047+
Optional.of(keyColumn.getPrecision()),
1048+
Optional.of(keyColumn.getScale()));
1049+
Optional<ColumnMapping> valueMapping = toColumnMapping(
1050+
session,
1051+
valueColumn.getOriginalTypeName(),
1052+
typeMapping.toSqlType(valueColumn, Map.of()),
1053+
Optional.of(valueColumn.getPrecision()),
1054+
Optional.of(valueColumn.getScale()));
1055+
if (keyMapping.isEmpty() || valueMapping.isEmpty()) {
1056+
return Optional.empty();
1057+
}
1058+
1059+
MapType mapType = new MapType(
1060+
keyMapping.get().getType(),
1061+
valueMapping.get().getType(),
1062+
typeOperators);
1063+
return Optional.of(ColumnMapping.objectMapping(
1064+
mapType,
1065+
ObjectReadFunction.of(SqlMap.class, (resultSet, columnIndex) -> {
1066+
Object data = resultSet.getObject(columnIndex);
1067+
if (!(data instanceof Map<?, ?> mapData)) {
1068+
throw new TrinoException(StandardErrorCode.TYPE_MISMATCH, "Expected ClickHouse to return a Map");
1069+
}
1070+
1071+
return buildMapValue(
1072+
mapType,
1073+
mapData.size(),
1074+
(keyBuilder, valueBuilder) -> {
1075+
for (Object key : mapData.keySet()) {
1076+
writeValue(keyMapping.get().getType(), keyBuilder, key);
1077+
writeValue(valueMapping.get().getType(), valueBuilder, mapData.get(key));
1078+
}
1079+
});
1080+
}),
1081+
mapWriteFunction(),
1082+
DISABLE_PUSHDOWN));
1083+
}
1084+
1085+
private static ObjectWriteFunction mapWriteFunction()
1086+
{
1087+
return ObjectWriteFunction.of(SqlMap.class, (statement, index, value) -> {
1088+
MapType mapType = (MapType) value.getMapType();
1089+
Type keyType = mapType.getKeyType();
1090+
Type valueType = mapType.getValueType();
1091+
1092+
Map<Object, Object> mapValue = new HashMap<>();
1093+
for (int position = 0; position < value.getSize(); position++) {
1094+
Object keyEntry = keyType.getObjectValue(value.getRawKeyBlock(), position);
1095+
Object valueEntry = valueType.getObjectValue(value.getRawValueBlock(), position);
1096+
mapValue.put(keyEntry, valueEntry);
1097+
}
1098+
1099+
statement.setObject(index, mapValue);
1100+
});
1101+
}
1102+
1103+
private static void writeValue(Type type, BlockBuilder blockBuilder, Object value)
1104+
{
1105+
if (value == null) {
1106+
blockBuilder.appendNull();
1107+
}
1108+
else if (type.getJavaType() == long.class) {
1109+
switch (value) {
1110+
case Float floatValue -> type.writeLong(blockBuilder, floatToIntBits(floatValue));
1111+
case Double doubleValue -> type.writeLong(blockBuilder, Double.doubleToLongBits(doubleValue));
1112+
case Number numberValue -> type.writeLong(blockBuilder, numberValue.longValue());
1113+
case LocalDate dateValue -> type.writeLong(blockBuilder, dateValue.toEpochDay());
1114+
default -> throw new UnsupportedOperationException("Unsupported type for map key or value: " + type);
1115+
}
1116+
}
1117+
else if (type.getJavaType() == double.class) {
1118+
type.writeDouble(blockBuilder, ((Number) value).doubleValue());
1119+
}
1120+
else if (type.getJavaType() == boolean.class) {
1121+
type.writeBoolean(blockBuilder, (boolean) value);
1122+
}
1123+
else if (type.getJavaType() == io.airlift.slice.Slice.class) {
1124+
if (value instanceof InetAddress ipAddressValue) {
1125+
byte[] address = parseIpAddressBytes(ipAddressValue.getAddress());
1126+
type.writeSlice(blockBuilder, wrappedBuffer(address));
1127+
}
1128+
else if (value instanceof UUID uuidValue) {
1129+
type.writeSlice(blockBuilder, javaUuidToTrinoUuid(uuidValue));
1130+
}
1131+
else {
1132+
type.writeSlice(blockBuilder, utf8Slice(value.toString()));
1133+
}
1134+
}
1135+
else if (type.getJavaType() == Int128.class) {
1136+
type.writeObject(blockBuilder, Int128.valueOf(((UnsignedLong) value).bigIntegerValue()));
1137+
}
1138+
else if (type.getJavaType() == SqlMap.class) {
1139+
Map<Object, Object> mapValue = (Map<Object, Object>) value;
1140+
MapType mapType = (MapType) type;
1141+
BlockBuilder keyBuilder = mapType.getKeyType().createBlockBuilder(null, 0);
1142+
BlockBuilder valueBuilder = mapType.getValueType().createBlockBuilder(null, 0);
1143+
for (Map.Entry<Object, Object> mapEntry : mapValue.entrySet()) {
1144+
writeValue(mapType.getKeyType(), keyBuilder, mapEntry.getKey());
1145+
writeValue(mapType.getValueType(), valueBuilder, mapEntry.getValue());
1146+
}
1147+
type.writeObject(blockBuilder, new SqlMap(mapType, MapHashTables.HashBuildMode.DUPLICATE_NOT_CHECKED, keyBuilder.build(), valueBuilder.build()));
1148+
}
1149+
else {
1150+
throw new UnsupportedOperationException("Unsupported type for map key or value: " + type);
1151+
}
1152+
}
1153+
10061154
public static boolean supportsPushdown(Variable variable, RewriteContext<ParameterizedExpression> context)
10071155
{
10081156
JdbcTypeHandle typeHandle = ((JdbcColumnHandle) context.getAssignment(variable.getName()))

0 commit comments

Comments
 (0)