Skip to content

Commit aac585d

Browse files
committed
Add MAP type support to the ClickHouse connector
1 parent c37191d commit aac585d

File tree

5 files changed

+292
-34
lines changed

5 files changed

+292
-34
lines changed

docs/src/main/sphinx/connector/clickhouse.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,9 @@ to the following table:
248248
* - `UUID`
249249
- `UUID`
250250
-
251+
* - `MAP(k, v)`
252+
- `MAP(k, v)`
253+
-
251254
:::
252255

253256
No other types are supported.
@@ -307,6 +310,9 @@ to the following table:
307310
* - `UUID`
308311
- `UUID`
309312
-
313+
* - `MAP(k, v)`
314+
- `MAP(k, v)`
315+
-
310316
:::
311317

312318
No other types are supported.

plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseClient.java

Lines changed: 165 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import com.clickhouse.data.ClickHouseColumn;
1717
import com.clickhouse.data.ClickHouseDataType;
1818
import com.clickhouse.data.ClickHouseVersion;
19+
import com.clickhouse.data.value.UnsignedLong;
20+
import com.clickhouse.jdbc.JdbcTypeMapping;
1921
import com.google.common.base.Enums;
2022
import com.google.common.base.Splitter;
2123
import com.google.common.collect.ImmutableList;
@@ -44,6 +46,7 @@
4446
import io.trino.plugin.jdbc.JdbcTypeHandle;
4547
import io.trino.plugin.jdbc.LongReadFunction;
4648
import io.trino.plugin.jdbc.LongWriteFunction;
49+
import io.trino.plugin.jdbc.ObjectReadFunction;
4750
import io.trino.plugin.jdbc.ObjectWriteFunction;
4851
import io.trino.plugin.jdbc.QueryBuilder;
4952
import io.trino.plugin.jdbc.RemoteTableName;
@@ -61,7 +64,10 @@
6164
import io.trino.plugin.jdbc.expression.JdbcConnectorExpressionRewriterBuilder;
6265
import io.trino.plugin.jdbc.expression.ParameterizedExpression;
6366
import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
67+
import io.trino.spi.StandardErrorCode;
6468
import io.trino.spi.TrinoException;
69+
import io.trino.spi.block.BlockBuilder;
70+
import io.trino.spi.block.SqlMap;
6571
import io.trino.spi.connector.AggregateFunction;
6672
import io.trino.spi.connector.ColumnHandle;
6773
import io.trino.spi.connector.ColumnMetadata;
@@ -74,9 +80,11 @@
7480
import io.trino.spi.type.DecimalType;
7581
import io.trino.spi.type.Decimals;
7682
import io.trino.spi.type.Int128;
83+
import io.trino.spi.type.MapType;
7784
import io.trino.spi.type.StandardTypes;
7885
import io.trino.spi.type.Type;
7986
import io.trino.spi.type.TypeManager;
87+
import io.trino.spi.type.TypeOperators;
8088
import io.trino.spi.type.TypeSignature;
8189
import io.trino.spi.type.VarbinaryType;
8290
import io.trino.spi.type.VarcharType;
@@ -99,6 +107,7 @@
99107
import java.time.LocalDateTime;
100108
import java.time.ZonedDateTime;
101109
import java.util.Collection;
110+
import java.util.HashMap;
102111
import java.util.List;
103112
import java.util.Map;
104113
import java.util.Map.Entry;
@@ -116,6 +125,7 @@
116125
import static com.google.common.base.Strings.isNullOrEmpty;
117126
import static com.google.common.base.Verify.verify;
118127
import static com.google.common.collect.ImmutableMap.toImmutableMap;
128+
import static io.airlift.slice.Slices.utf8Slice;
119129
import static io.airlift.slice.Slices.wrappedBuffer;
120130
import static io.trino.plugin.clickhouse.ClickHouseSessionProperties.isMapStringAsVarchar;
121131
import static io.trino.plugin.clickhouse.ClickHouseTableProperties.ENGINE_PROPERTY;
@@ -164,6 +174,7 @@
164174
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
165175
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
166176
import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
177+
import static io.trino.spi.block.MapValueBuilder.buildMapValue;
167178
import static io.trino.spi.connector.ConnectorMetadata.MODIFYING_ROWS_MESSAGE;
168179
import static io.trino.spi.type.BigintType.BIGINT;
169180
import static io.trino.spi.type.BooleanType.BOOLEAN;
@@ -184,6 +195,7 @@
184195
import static io.trino.spi.type.UuidType.javaUuidToTrinoUuid;
185196
import static io.trino.spi.type.UuidType.trinoUuidToJavaUuid;
186197
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
198+
import static java.lang.Float.floatToIntBits;
187199
import static java.lang.Float.floatToRawIntBits;
188200
import static java.lang.Math.floorDiv;
189201
import static java.lang.Math.floorMod;
@@ -212,6 +224,7 @@ public class ClickHouseClient
212224

213225
private final ConnectorExpressionRewriter<ParameterizedExpression> connectorExpressionRewriter;
214226
private final AggregateFunctionRewriter<JdbcExpression, ?> aggregateFunctionRewriter;
227+
private final TypeOperators typeOperators;
215228
private final Type uuidType;
216229
private final Type ipAddressType;
217230
private final AtomicReference<ClickHouseVersion> clickHouseVersion = new AtomicReference<>();
@@ -226,6 +239,7 @@ public ClickHouseClient(
226239
RemoteQueryModifier queryModifier)
227240
{
228241
super("\"", connectionFactory, queryBuilder, config.getJdbcTypesMappedToVarchar(), identifierMapping, queryModifier, false);
242+
this.typeOperators = typeManager.getTypeOperators();
229243
this.uuidType = typeManager.getType(new TypeSignature(StandardTypes.UUID));
230244
this.ipAddressType = typeManager.getType(new TypeSignature(StandardTypes.IPADDRESS));
231245
JdbcTypeHandle bigintTypeHandle = new JdbcTypeHandle(Types.BIGINT, Optional.of("bigint"), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
@@ -636,8 +650,22 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
636650
return mapping;
637651
}
638652

653+
Optional<ColumnMapping> columnMapping = toColumnMapping(session, jdbcTypeName, typeHandle.jdbcType(), typeHandle.decimalDigits(), typeHandle.columnSize());
654+
if (columnMapping.isEmpty() && getUnsupportedTypeHandling(session) == CONVERT_TO_VARCHAR) {
655+
return mapToUnboundedVarchar(typeHandle);
656+
}
657+
return columnMapping;
658+
}
659+
660+
private Optional<ColumnMapping> toColumnMapping(
661+
ConnectorSession session,
662+
String typeName,
663+
int jdbcType,
664+
Optional<Integer> decimalDigits,
665+
Optional<Integer> columnSize)
666+
{
639667
ClickHouseVersion version = getClickHouseServerVersion(session);
640-
ClickHouseColumn column = ClickHouseColumn.of("", jdbcTypeName);
668+
ClickHouseColumn column = ClickHouseColumn.of("", typeName);
641669
ClickHouseDataType columnDataType = column.getDataType();
642670
switch (columnDataType) {
643671
case Bool:
@@ -677,11 +705,13 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
677705
return Optional.of(varbinaryColumnMapping());
678706
case UUID:
679707
return Optional.of(uuidColumnMapping());
708+
case Map:
709+
return mapColumnMapping(session, column.getKeyInfo(), column.getValueInfo());
680710
default:
681711
// no-op
682712
}
683713

684-
switch (typeHandle.jdbcType()) {
714+
switch (jdbcType) {
685715
case Types.TINYINT:
686716
return Optional.of(tinyintColumnMapping());
687717

@@ -706,16 +736,13 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
706736
return Optional.of(doubleColumnMapping());
707737

708738
case Types.DECIMAL:
709-
int decimalDigits = typeHandle.requiredDecimalDigits();
710-
int precision = typeHandle.requiredColumnSize();
711-
712739
ColumnMapping decimalColumnMapping;
713-
if (getDecimalRounding(session) == ALLOW_OVERFLOW && precision > Decimals.MAX_PRECISION) {
714-
int scale = Math.min(decimalDigits, getDecimalDefaultScale(session));
740+
if (getDecimalRounding(session) == ALLOW_OVERFLOW && columnSize.orElseThrow() > Decimals.MAX_PRECISION) {
741+
int scale = Math.min(decimalDigits.orElseThrow(), getDecimalDefaultScale(session));
715742
decimalColumnMapping = decimalColumnMapping(createDecimalType(Decimals.MAX_PRECISION, scale), getDecimalRoundingMode(session));
716743
}
717744
else {
718-
decimalColumnMapping = decimalColumnMapping(createDecimalType(precision, max(decimalDigits, 0)));
745+
decimalColumnMapping = decimalColumnMapping(createDecimalType(columnSize.orElseThrow(), max(decimalDigits.orElseThrow(), 0)));
719746
}
720747
return Optional.of(ColumnMapping.mapping(
721748
decimalColumnMapping.getType(),
@@ -730,7 +757,7 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
730757
case Types.TIMESTAMP:
731758
if (columnDataType == ClickHouseDataType.DateTime) {
732759
// ClickHouse DateTime does not have sub-second precision
733-
verify(typeHandle.requiredDecimalDigits() == 0, "Expected 0 as timestamp precision, but got %s", typeHandle.requiredDecimalDigits());
760+
verify(decimalDigits.orElseThrow() == 0, "Expected 0 as timestamp precision, but got %s", decimalDigits.orElseThrow());
734761
return Optional.of(ColumnMapping.longMapping(
735762
TIMESTAMP_SECONDS,
736763
timestampReadFunction(TIMESTAMP_SECONDS),
@@ -742,18 +769,14 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
742769
case Types.TIMESTAMP_WITH_TIMEZONE:
743770
if (columnDataType == ClickHouseDataType.DateTime) {
744771
// ClickHouse DateTime does not have sub-second precision
745-
verify(typeHandle.requiredDecimalDigits() == 0, "Expected 0 as timestamp with time zone precision, but got %s", typeHandle.requiredDecimalDigits());
772+
verify(decimalDigits.orElseThrow() == 0, "Expected 0 as timestamp with time zone precision, but got %s", decimalDigits.orElseThrow());
746773
return Optional.of(ColumnMapping.longMapping(
747774
TIMESTAMP_TZ_SECONDS,
748775
shortTimestampWithTimeZoneReadFunction(),
749776
shortTimestampWithTimeZoneWriteFunction(version, column.getTimeZone())));
750777
}
751778
}
752779

753-
if (getUnsupportedTypeHandling(session) == CONVERT_TO_VARCHAR) {
754-
return mapToUnboundedVarchar(typeHandle);
755-
}
756-
757780
return Optional.empty();
758781
}
759782

@@ -805,6 +828,12 @@ public WriteMapping toWriteMapping(ConnectorSession session, Type type)
805828
if (type.equals(uuidType)) {
806829
return WriteMapping.sliceMapping("UUID", uuidWriteFunction());
807830
}
831+
if (type instanceof MapType mapType) {
832+
WriteMapping keyMapping = toWriteMapping(session, mapType.getKeyType());
833+
WriteMapping valueMapping = toWriteMapping(session, mapType.getValueType());
834+
String dataType = "Map(%s, %s)".formatted(keyMapping.getDataType(), valueMapping.getDataType());
835+
return WriteMapping.objectMapping(dataType, mapWriteFunction());
836+
}
808837
throw new TrinoException(NOT_SUPPORTED, "Unsupported column type: " + type);
809838
}
810839

@@ -946,26 +975,30 @@ private ColumnMapping ipAddressColumnMapping(String clickhouseType)
946975
(resultSet, columnIndex) -> {
947976
// copied from IpAddressOperators.castFromVarcharToIpAddress
948977
byte[] address = InetAddresses.forString(resultSet.getString(columnIndex)).getAddress();
949-
950-
byte[] bytes;
951-
if (address.length == 4) {
952-
bytes = new byte[16];
953-
bytes[10] = (byte) 0xff;
954-
bytes[11] = (byte) 0xff;
955-
arraycopy(address, 0, bytes, 12, 4);
956-
}
957-
else if (address.length == 16) {
958-
bytes = address;
959-
}
960-
else {
961-
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Invalid InetAddress length: " + address.length);
962-
}
963-
978+
byte[] bytes = parseIpAddressBytes(address);
964979
return wrappedBuffer(bytes);
965980
},
966981
ipAddressWriteFunction(clickhouseType));
967982
}
968983

984+
private static byte[] parseIpAddressBytes(byte[] address)
985+
{
986+
byte[] parsedBytes;
987+
if (address.length == 4) {
988+
parsedBytes = new byte[16];
989+
parsedBytes[10] = (byte) 0xff;
990+
parsedBytes[11] = (byte) 0xff;
991+
arraycopy(address, 0, parsedBytes, 12, 4);
992+
}
993+
else if (address.length == 16) {
994+
parsedBytes = address;
995+
}
996+
else {
997+
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Invalid InetAddress length: " + address.length);
998+
}
999+
return parsedBytes;
1000+
}
1001+
9691002
private static SliceWriteFunction ipAddressWriteFunction(String clickhouseType)
9701003
{
9711004
return new SliceWriteFunction()
@@ -1003,6 +1036,109 @@ private static SliceWriteFunction uuidWriteFunction()
10031036
return (statement, index, value) -> statement.setObject(index, trinoUuidToJavaUuid(value), Types.OTHER);
10041037
}
10051038

1039+
private Optional<ColumnMapping> mapColumnMapping(ConnectorSession session, ClickHouseColumn keyColumn, ClickHouseColumn valueColumn)
1040+
{
1041+
JdbcTypeMapping typeMapping = JdbcTypeMapping.getDefaultMapping();
1042+
Optional<ColumnMapping> keyMapping = toColumnMapping(
1043+
session,
1044+
keyColumn.getDataType().name(),
1045+
typeMapping.toSqlType(keyColumn, Map.of()),
1046+
Optional.of(keyColumn.getPrecision()),
1047+
Optional.of(keyColumn.getScale()));
1048+
Optional<ColumnMapping> valueMapping = toColumnMapping(
1049+
session,
1050+
valueColumn.getDataType().name(),
1051+
typeMapping.toSqlType(valueColumn, Map.of()),
1052+
Optional.of(valueColumn.getPrecision()),
1053+
Optional.of(valueColumn.getScale()));
1054+
if (keyMapping.isEmpty() || valueMapping.isEmpty()) {
1055+
return Optional.empty();
1056+
}
1057+
1058+
MapType mapType = new MapType(
1059+
keyMapping.get().getType(),
1060+
valueMapping.get().getType(),
1061+
typeOperators);
1062+
return Optional.of(ColumnMapping.objectMapping(
1063+
mapType,
1064+
ObjectReadFunction.of(SqlMap.class, (resultSet, columnIndex) -> {
1065+
Object data = resultSet.getObject(columnIndex);
1066+
if (!(data instanceof Map<?, ?> mapData)) {
1067+
throw new TrinoException(StandardErrorCode.TYPE_MISMATCH, "Expected ClickHouse to return a Map");
1068+
}
1069+
1070+
return buildMapValue(
1071+
mapType,
1072+
mapData.size(),
1073+
(keyBuilder, valueBuilder) -> {
1074+
for (Object key : mapData.keySet()) {
1075+
writeValue(keyMapping.get().getType(), keyBuilder, key);
1076+
writeValue(valueMapping.get().getType(), valueBuilder, mapData.get(key));
1077+
}
1078+
});
1079+
}),
1080+
mapWriteFunction(),
1081+
DISABLE_PUSHDOWN));
1082+
}
1083+
1084+
private static ObjectWriteFunction mapWriteFunction()
1085+
{
1086+
return ObjectWriteFunction.of(SqlMap.class, (statement, index, value) -> {
1087+
MapType mapType = (MapType) value.getMapType();
1088+
Type keyType = mapType.getKeyType();
1089+
Type valueType = mapType.getValueType();
1090+
1091+
Map<Object, Object> mapValue = new HashMap<>();
1092+
for (int position = 0; position < value.getSize(); position++) {
1093+
Object keyEntry = keyType.getObjectValue(value.getRawKeyBlock(), position);
1094+
Object valueEntry = valueType.getObjectValue(value.getRawValueBlock(), position);
1095+
mapValue.put(keyEntry, valueEntry);
1096+
}
1097+
1098+
statement.setObject(index, mapValue);
1099+
});
1100+
}
1101+
1102+
private static void writeValue(Type type, BlockBuilder blockBuilder, Object value)
1103+
{
1104+
if (value == null) {
1105+
blockBuilder.appendNull();
1106+
}
1107+
else if (type.getJavaType() == long.class) {
1108+
switch (value) {
1109+
case Float floatValue -> type.writeLong(blockBuilder, floatToIntBits(floatValue));
1110+
case Double doubleValue -> type.writeLong(blockBuilder, Double.doubleToLongBits(doubleValue));
1111+
case Number numberValue -> type.writeLong(blockBuilder, numberValue.longValue());
1112+
case LocalDate dateValue -> type.writeLong(blockBuilder, dateValue.toEpochDay());
1113+
default -> throw new UnsupportedOperationException("Unsupported type for map key or value: " + type);
1114+
}
1115+
}
1116+
else if (type.getJavaType() == double.class) {
1117+
type.writeDouble(blockBuilder, ((Number) value).doubleValue());
1118+
}
1119+
else if (type.getJavaType() == boolean.class) {
1120+
type.writeBoolean(blockBuilder, (boolean) value);
1121+
}
1122+
else if (type.getJavaType() == io.airlift.slice.Slice.class) {
1123+
if (value instanceof InetAddress ipAddressValue) {
1124+
byte[] address = parseIpAddressBytes(ipAddressValue.getAddress());
1125+
type.writeSlice(blockBuilder, wrappedBuffer(address));
1126+
}
1127+
else if (value instanceof UUID uuidValue) {
1128+
type.writeSlice(blockBuilder, javaUuidToTrinoUuid(uuidValue));
1129+
}
1130+
else {
1131+
type.writeSlice(blockBuilder, utf8Slice(value.toString()));
1132+
}
1133+
}
1134+
else if (type.getJavaType() == Int128.class) {
1135+
type.writeObject(blockBuilder, Int128.valueOf(((UnsignedLong) value).bigIntegerValue()));
1136+
}
1137+
else {
1138+
throw new UnsupportedOperationException("Unsupported type for map key or value: " + type);
1139+
}
1140+
}
1141+
10061142
public static boolean supportsPushdown(Variable variable, RewriteContext<ParameterizedExpression> context)
10071143
{
10081144
JdbcTypeHandle typeHandle = ((JdbcColumnHandle) context.getAssignment(variable.getName()))

0 commit comments

Comments
 (0)