Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ Type convertType(TypeInfo typeInfo) {
int listId = id++;
Type listType = convertType(listTypeInfo.getListElementTypeInfo());
return Types.ListType.ofOptional(listId, listType);
case UNION:
case VARIANT:
return Types.VariantType.get();
default:
throw new IllegalArgumentException("Unknown type " + typeInfo.getCategory());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ public static String convertToTypeString(Type type) {
case MAP:
final Types.MapType mapType = type.asMapType();
return String.format("map<%s,%s>", convert(mapType.keyType()), convert(mapType.valueType()));
case VARIANT:
return "variant";
default:
throw new UnsupportedOperationException(type + " is not supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iceberg.mr.hive;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
Expand All @@ -27,6 +29,7 @@
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.variant.Variant;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
Expand All @@ -35,11 +38,13 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
import org.apache.iceberg.types.Type.PrimitiveType;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.ListType;
import org.apache.iceberg.types.Types.MapType;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;

import org.apache.iceberg.variants.VariantMetadata;
import org.apache.iceberg.variants.VariantValue;

class Deserializer {
private final FieldDeserializer fieldDeserializer;
Expand Down Expand Up @@ -164,6 +169,26 @@ public FieldDeserializer list(ListType listTypeInfo, ObjectInspectorPair pair, F
};
}

@Override
public FieldDeserializer variant(Types.VariantType variantType, ObjectInspectorPair pair) {
return variantObj -> {
if (variantObj == null) {
return null;
}
// Extract data from the struct representation
StructObjectInspector variantOI = (StructObjectInspector) pair.sourceInspector();
Variant variant = Variant.from(variantOI.getStructFieldsDataAsList(variantObj));

VariantMetadata metadata = VariantMetadata.from(
ByteBuffer.wrap(variant.getMetadata()).order(ByteOrder.LITTLE_ENDIAN));

VariantValue value = VariantValue.from(metadata,
ByteBuffer.wrap(variant.getValue()).order(ByteOrder.LITTLE_ENDIAN));

return org.apache.iceberg.variants.Variant.of(metadata, value);
};
}

@Override
public FieldDeserializer map(MapType mapType, ObjectInspectorPair pair, FieldDeserializer keyDeserializer,
FieldDeserializer valueDeserializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,8 @@ public ObjectInspector struct(Types.StructType structType, List<ObjectInspector>
return new IcebergRecordObjectInspector(structType, fieldObjectInspectors);
}

@Override
public ObjectInspector variant(Types.VariantType variantType) {
return IcebergVariantObjectInspector.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive.serde.objectinspector;

import java.nio.ByteBuffer;
import java.util.List;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.VariantObjectInspector;
import org.apache.iceberg.variants.Variant;

/**
* ObjectInspector for Iceberg's Variant type in Hive.
* <p>
* This ObjectInspector enables Hive to work with Iceberg's Variant type, which stores
* polymorphic data in a single column. Variant types are particularly useful for
* semi-structured data like JSON where the actual type may vary per row.
* <p>
* The ObjectInspector exposes each Variant as a Hive struct with two binary fields:
* <ul>
* <li><strong>metadata</strong>: Binary metadata containing type information and schema</li>
* <li><strong>value</strong>: Binary representation of the actual data value</li>
* </ul>
* <p>
*/
public final class IcebergVariantObjectInspector extends VariantObjectInspector {

private static final ObjectInspector INSTANCE = new IcebergVariantObjectInspector();

private IcebergVariantObjectInspector() {
}

public static ObjectInspector get() {
return INSTANCE;
}

@Override
public Object getStructFieldData(Object data, StructField fieldRef) {
if (data == null) {
return null;
}
Variant variant = (Variant) data;
MyField field = (MyField) fieldRef;

switch (field.getFieldID()) {
case 0: // "metadata" field (binary)
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes());
variant.metadata().writeTo(metadata, 0);
return metadata.array();
case 1: // "value" field (binary)
ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes());
variant.value().writeTo(value, 0);
return value.array();
default:
throw new IllegalArgumentException("Unknown field position: " + field.getFieldID());
}
}

@Override
public List<Object> getStructFieldsDataAsList(Object data) {
if (data == null) {
return null;
}
Variant variant = (Variant) data;
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes());
variant.metadata().writeTo(metadata, 0);

ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes());
variant.value().writeTo(value, 0);

// Return the data for our fields in the correct order: metadata, value
return List.of(metadata.array(), value.array());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ public Type list(Types.ListType iList, GroupType array, Type element) {
return array;
}

@Override
public Type variant(Types.VariantType iVariant, GroupType variant, Type result) {
if (variant.getId() != null) {
typesById.put(variant.getId().intValue(), variant);
}
// Add the variant field name to the column names list
appendToColNamesList(variant instanceof MessageType, variant.getName());

return variant;
}

@Override
public Type map(Types.MapType iMap, GroupType map, Type key, Type value) {
if (map.getId() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {

@Override
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
builder.createWriterFunc(GenericParquetWriter::buildWriter);
builder.createWriterFunc(GenericParquetWriter::create);
}

@Override
Expand All @@ -87,7 +87,7 @@ protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {

@Override
protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
builder.createWriterFunc(GenericParquetWriter::buildWriter);
builder.createWriterFunc(GenericParquetWriter::create);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE EXTERNAL TABLE variant_test_partition (
id INT,
data VARIANT
) PARTITIONED BY spec (data)
STORED BY ICEBERG tblproperties('format-version'='3');
Loading
Loading