diff --git a/src/it/scala/com/qubole/spark/hiveacid/ReadSuite.scala b/src/it/scala/com/qubole/spark/hiveacid/ReadSuite.scala index 585edbd..6097441 100644 --- a/src/it/scala/com/qubole/spark/hiveacid/ReadSuite.scala +++ b/src/it/scala/com/qubole/spark/hiveacid/ReadSuite.scala @@ -26,7 +26,7 @@ import org.scalatest._ import scala.util.control.NonFatal -@Ignore +//@Ignore class ReadSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll { val log: Logger = LogManager.getLogger(this.getClass) diff --git a/src/main/scala/com/qubole/spark/hiveacid/HiveAcidDataSourceV2Reader.scala b/src/main/scala/com/qubole/spark/hiveacid/HiveAcidDataSourceV2Reader.scala index 02c8587..27d09da 100644 --- a/src/main/scala/com/qubole/spark/hiveacid/HiveAcidDataSourceV2Reader.scala +++ b/src/main/scala/com/qubole/spark/hiveacid/HiveAcidDataSourceV2Reader.scala @@ -60,6 +60,7 @@ class HiveAcidDataSourceV2Reader if (dbName != null) { hiveAcidMetadata = HiveAcidMetadata.fromSparkSession(sparkSession, dbName + "." + tblName) } else { + // If db name is null, default db is chosen. hiveAcidMetadata = HiveAcidMetadata.fromSparkSession(sparkSession, tblName) } diff --git a/src/main/scala/com/qubole/spark/hiveacid/reader/hive/HiveAcidReader.scala b/src/main/scala/com/qubole/spark/hiveacid/reader/hive/HiveAcidReader.scala index 65cbe64..38afbeb 100644 --- a/src/main/scala/com/qubole/spark/hiveacid/reader/hive/HiveAcidReader.scala +++ b/src/main/scala/com/qubole/spark/hiveacid/reader/hive/HiveAcidReader.scala @@ -118,19 +118,20 @@ extends CastSupport with Reader with Logging { hiveAcidMetadata.hTable.getParameters, colNames, colTypes) _ - //TODO : Need to cahce it with some unique id. + //TODO : Need to cache it with some unique id. val jobConf = new JobConf(_broadcastedHadoopConf.value.value) initializeJobConfFunc(jobConf) + //TODO:Can this be done parallely in multiple threads? val partitionArray = new java.util.ArrayList[InputPartition[ColumnarBatch]] val inputSplits = HiveAcidCommon.getInputSplits(jobConf, validWriteIds, 0, hiveAcidMetadata.isFullAcidTable, ifc) - val reqFileds = hiveAcidMetadata.tableSchema.fields.filter(field => + val reqFields = hiveAcidMetadata.tableSchema.fields.filter(field => readerOptions.requiredNonPartitionedColumns.contains(field.name)) - for (i <- 0 until inputSplits.size) { + for (i <- inputSplits.indices) { partitionArray.add(new HiveAcidInputPartitionV2(inputSplits(i).asInstanceOf[HiveAcidPartition], sparkSession.sparkContext.broadcast(new SerializableConfiguration(jobConf)), - partitionValues, reqFileds, hiveAcidMetadata.partitionSchema, hiveAcidMetadata.isFullAcidTable)) + partitionValues, reqFields, hiveAcidMetadata.partitionSchema, hiveAcidMetadata.isFullAcidTable)) logInfo("getPartitions : Input split: " + inputSplits(i)) } partitionArray @@ -215,14 +216,12 @@ extends CastSupport with Reader with Logging { val mutableRow = new SpecificInternalRow(hiveAcidMetadata.partitionSchema) - // Splits all attributes into two groups, partition key attributes and those - // that are not. Attached indices indicate the position of each attribute in - // the output schema. - val (partitionKeyAttrs, nonPartitionKeyAttrs) = - readerOptions.requiredAttributes.zipWithIndex.partition { case (attr, _) => + val partitionKeyAttrs = + readerOptions.requiredAttributes.zipWithIndex.filter { attr => readerOptions.partitionAttributes.contains(attr) } + //TODO : The partition values can be filled directly using hive acid batch reader. def fillPartitionKeys(rawPartValues: Array[String], row: InternalRow): Unit = { var offset = 0 partitionKeyAttrs.foreach { case (attr, ordinal) => diff --git a/src/main/scala/com/qubole/spark/hiveacid/reader/v2/OrcColumnVector.java b/src/main/scala/com/qubole/spark/hiveacid/reader/v2/HiveAcidColumnVector.java similarity index 95% rename from src/main/scala/com/qubole/spark/hiveacid/reader/v2/OrcColumnVector.java rename to src/main/scala/com/qubole/spark/hiveacid/reader/v2/HiveAcidColumnVector.java index 3c087b6..fbe667c 100644 --- a/src/main/scala/com/qubole/spark/hiveacid/reader/v2/OrcColumnVector.java +++ b/src/main/scala/com/qubole/spark/hiveacid/reader/v2/HiveAcidColumnVector.java @@ -36,9 +36,9 @@ /** * A column vector class wrapping Hive's ColumnVector. Because Spark ColumnarBatch only accepts * Spark's vectorized.ColumnVector, this column vector is used to adapt Hive ColumnVector with - * Spark ColumnarVector. + * Spark ColumnarVector. This class is a copy of spark ColumnVector which is declared private. */ -public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVector { +public class HiveAcidColumnVector extends org.apache.spark.sql.vectorized.ColumnVector { private ColumnVector baseData; private LongColumnVector longData; private DoubleColumnVector doubleData; @@ -49,7 +49,7 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto private int batchSize; - OrcColumnVector(DataType type, ColumnVector vector) { + HiveAcidColumnVector(DataType type, ColumnVector vector) { super(type); if (type instanceof TimestampType) { diff --git a/src/main/scala/com/qubole/spark/hiveacid/reader/v2/HiveAcidInputPartitionReaderV2.scala b/src/main/scala/com/qubole/spark/hiveacid/reader/v2/HiveAcidInputPartitionReaderV2.scala index 4b9d5b2..042a166 100644 --- a/src/main/scala/com/qubole/spark/hiveacid/reader/v2/HiveAcidInputPartitionReaderV2.scala +++ b/src/main/scala/com/qubole/spark/hiveacid/reader/v2/HiveAcidInputPartitionReaderV2.scala @@ -35,9 +35,8 @@ private[v2] class HiveAcidInputPartitionReaderV2(split: HiveAcidPartition, isFullAcidTable: Boolean) extends InputPartitionReader[ColumnarBatch] { //TODO : Need to get a unique id to cache the jobConf. - private val jobConf : JobConf = new JobConf(broadcastedConf.value.value) - private val sparkOrcColReader : OrcColumnarBatchReader = - new OrcColumnarBatchReader(1024) + private val jobConf = new JobConf(broadcastedConf.value.value) + private val orcColumnarBatchReader = new OrcColumnarBatchReader(1024) private def initReader() : Unit = { // Get the reader schema using the column names and types set in hive conf. @@ -63,26 +62,27 @@ private[v2] class HiveAcidInputPartitionReaderV2(split: HiveAcidPartition, // Register the listener for closing the reader before init is done. val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val taskAttemptContext = new org.apache.hadoop.mapred.TaskAttemptContextImpl(jobConf, attemptId) - val iter = new org.apache.spark.sql.execution.datasources.RecordReaderIterator(sparkOrcColReader) + val iter = new org.apache.spark.sql.execution.datasources.RecordReaderIterator(orcColumnarBatchReader) Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) - sparkOrcColReader.initialize(fileSplit, taskAttemptContext) - sparkOrcColReader.initBatch(readerLocal.getSchema, requestedColIds, + //TODO: Need to generalize it for supporting other kind of file format. + orcColumnarBatchReader.initialize(fileSplit, taskAttemptContext) + orcColumnarBatchReader.initBatch(readerLocal.getSchema, requestedColIds, requiredFields, partitionSchema, partitionValues, isFullAcidTable && !fileSplit.isOriginal) } initReader() @throws(classOf[IOException]) override def next() : Boolean = { - sparkOrcColReader.nextKeyValue() + orcColumnarBatchReader.nextKeyValue() } override def get () : ColumnarBatch = { - sparkOrcColReader.getCurrentValue + orcColumnarBatchReader.getCurrentValue } @throws(classOf[IOException]) override def close() : Unit = { - sparkOrcColReader.close() + orcColumnarBatchReader.close() } } diff --git a/src/main/scala/com/qubole/spark/hiveacid/reader/v2/OrcColumnarBatchReader.java b/src/main/scala/com/qubole/spark/hiveacid/reader/v2/OrcColumnarBatchReader.java index bb3f9b6..02c6cd3 100644 --- a/src/main/scala/com/qubole/spark/hiveacid/reader/v2/OrcColumnarBatchReader.java +++ b/src/main/scala/com/qubole/spark/hiveacid/reader/v2/OrcColumnarBatchReader.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.stream.IntStream; -import com.qubole.spark.hiveacid.util.HiveAcidCommon; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; @@ -56,8 +55,8 @@ import com.qubole.shaded.hadoop.hive.ql.io.sarg.*; /** - * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. - * After creating, `initialize` and `initBatch` should be called sequentially. + * After creating, `initialize` and `initBatch` should be called sequentially. This internally uses + * the Hive ACID Vectorized ORC reader to support reading of deleted and updated data. */ public class OrcColumnarBatchReader extends RecordReader { @@ -67,34 +66,45 @@ public class OrcColumnarBatchReader extends RecordReader { // Vectorized ORC Row Batch private VectorizedRowBatch batch; - /** - * The column IDs of the physical ORC file schema which are required by this reader. - * -1 means this required column doesn't exist in the ORC file. - */ + // ROW schema. This has the columns that needs to be projected. Even though we dont need + // the ACID related columns like row id, write-id are also projected so that ACID reader can + // use it to filter out the deleted/updated records. + private TypeDescription schema; + + + // The column IDs of the physical ORC file schema which are required by this reader. + // -1 means this required column doesn't exist in the ORC file. private int[] requestedColIds; + private StructField[] requiredFields; + // Record reader from ORC row batch. private com.qubole.shaded.orc.RecordReader baseRecordReader; + // Wrapper reader over baseRecordReader for filtering out deleted/updated records. private VectorizedOrcAcidRowBatchReader fullAcidRecordReader; - private StructField[] requiredFields; - // The result columnar batch for vectorized execution by whole-stage codegen. private ColumnarBatch columnarBatch; // Writable column vectors of the result columnar batch. private WritableColumnVector[] columnVectors; - // The wrapped ORC column vectors. It should be null if `copyToSpark` is true. + // The wrapped ORC column vectors. private org.apache.spark.sql.vectorized.ColumnVector[] orcVectorWrappers; + // File(split) to be read. private OrcSplit fileSplit; private Configuration conf; - private int rootCol; + // For full ACID scan, the first 5 fields are transaction related. These fields are used by + // fullAcidRecordReader. While forming the batch to emit we skip the first 5 columns. For + // normal scan, this value will be 0 as ORC file will not have the transaction related columns. + private int rootColIdx; + + // Constructor. public OrcColumnarBatchReader(int capacity) { this.capacity = capacity; } @@ -111,7 +121,11 @@ public ColumnarBatch getCurrentValue() { @Override public float getProgress() throws IOException { - return fullAcidRecordReader.getProgress(); + if (fullAcidRecordReader != null) { + return fullAcidRecordReader.getProgress(); + } else { + return baseRecordReader.getProgress(); + } } @Override @@ -135,12 +149,14 @@ public void close() throws IOException { } } + // The columns that are pushed as search arguments to ORC file reader. private String[] getSargColumnNames(String[] originalColumnNames, List types, boolean[] includedColumns) { - String[] columnNames = new String[types.size() - rootCol]; + // Skip ACID related columns if present. + String[] columnNames = new String[types.size() - rootColIdx]; int i = 0; - Iterator iterator = ((OrcProto.Type)types.get(rootCol)).getSubtypesList().iterator(); + Iterator iterator = ((OrcProto.Type)types.get(rootColIdx)).getSubtypesList().iterator(); while(true) { int columnId; @@ -149,8 +165,8 @@ private String[] getSargColumnNames(String[] originalColumnNames, return columnNames; } columnId = (Integer)iterator.next(); - } while(includedColumns != null && !includedColumns[columnId - rootCol]); - columnNames[columnId - rootCol] = originalColumnNames[i++]; + } while(includedColumns != null && !includedColumns[columnId - rootColIdx]); + columnNames[columnId - rootColIdx] = originalColumnNames[i++]; } } @@ -161,6 +177,7 @@ private void setSearchArgument(Reader.Options options, if (neededColumnNames == null) { options.searchArgument((SearchArgument)null, (String[])null); } else { + // The filters which are pushed down are set in config using sarg.pushdown. SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf); if (sarg == null) { options.searchArgument((SearchArgument)null, (String[])null); @@ -175,7 +192,6 @@ private void setSearchArgument(Reader.Options options, private void setSearchArgumentForOption(Configuration conf, TypeDescription readerSchema, Reader.Options readerOptions) { - // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription. final List schemaTypes = OrcUtils.getOrcTypes(readerSchema); setSearchArgument(readerOptions, schemaTypes, conf); } @@ -191,6 +207,7 @@ public void initialize(InputSplit inputSplit, conf = taskAttemptContext.getConfiguration(); } + // Wrapper ACID reader over base ORC record reader. private VectorizedOrcAcidRowBatchReader initHiveAcidReader(Configuration conf, OrcSplit orcSplit, com.qubole.shaded.orc.RecordReader innerReader) { @@ -206,6 +223,8 @@ private VectorizedOrcAcidRowBatchReader initHiveAcidReader(Configuration conf, @Override public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { + // This is the baseRecordReader which will be called internally by ACID reader to fetch + // records. return innerReader.nextBatch(value); } @@ -216,6 +235,9 @@ public NullWritable createKey() { @Override public VectorizedRowBatch createValue() { + // Tis column batch will be passed as value by ACID reader while calling next. So the + // baseRecordReader will populate the batch directly and we dont have to do any + // extra copy if selected in use is false. return batch; } @@ -252,17 +274,18 @@ public void initBatch( StructField[] requiredFields, StructType partitionSchema, InternalRow partitionValues, - boolean isFullAcidTable) throws IOException { + boolean isAcidScan) throws IOException { - if (!isFullAcidTable) { + if (!isAcidScan) { //rootCol = org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getRootColumn(true); - rootCol = 0; + rootColIdx = 0; } else { //rootCol = org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getRootColumn(false) - 1; - // In ORC, for full ACID table, the first 5 fields stores the transaction metadata. - rootCol = 5; + // In ORC, for full ACID scan, the first 5 fields stores the transaction metadata. + rootColIdx = 5; } + // Create the baseRecordReader. This reader actually does the reading from ORC file. Reader readerInner = OrcFile.createReader( fileSplit.getPath(), OrcFile.readerOptions(conf) .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) @@ -272,6 +295,8 @@ public void initBatch( setSearchArgumentForOption(conf, orcSchema, options); baseRecordReader = readerInner.rows(options); + // This schema will have both required fields and the filed to be used by ACID reader. + schema = orcSchema; batch = orcSchema.createRowBatch(capacity); assert(!batch.selectedInUse); // `selectedInUse` should be initialized with `false`. @@ -279,20 +304,18 @@ public void initBatch( this.requestedColIds = requestedColIds; assert(requiredFields.length == requestedColIds.length); + // The result schema will just have those fields which are required to be projected. StructType resultSchema = new StructType(requiredFields); for (StructField f : partitionSchema.fields()) { resultSchema = resultSchema.add(f); } - ColumnVector[] fields; - orcVectorWrappers = new org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()]; - if (rootCol == 0) { - fields = batch.cols; - } else { - fields = ((StructColumnVector)batch.cols[rootCol]).fields; - } - - if (isFullAcidTable) { + // For ACID scan, the ACID batch reader might filter out some of the records read from + // ORC file. So we have to recreate the batch read from ORC files. This columnVectors + // will be used during that time. Missing columns and partition columns are filled here + // and other valid columns will be filled once the batch of record is read. + //TODO:We can set the config to let ORC reader fill the partition values. + if (isAcidScan) { columnVectors = OnHeapColumnVector.allocateColumns(capacity, resultSchema); // Initialize the missing columns once. @@ -312,7 +335,20 @@ public void initBatch( } } - // Just wrap the ORC column vector instead of copying it to Spark column vector. + + // Just wrap the ORC column vector instead of copying it to Spark column vector. This wrapper + // will be used for insert only table scan or scanning original files (ACID V1) or compacted + // file. In those cases, the batch read from ORC will be emitted as it is. So no need to + // prepare a separate copy. + + ColumnVector[] fields; + orcVectorWrappers = new org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()]; + if (rootColIdx == 0) { + fields = batch.cols; + } else { + fields = ((StructColumnVector)batch.cols[rootColIdx]).fields; + } + orcVectorWrappers = new org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()]; //StructColumnVector dataCols = (StructColumnVector)batch.cols[5]; for (int i = 0; i < requiredFields.length; i++) { @@ -325,7 +361,7 @@ public void initBatch( missingCol.setIsConstant(); orcVectorWrappers[i] = missingCol; } else { - orcVectorWrappers[i] = new OrcColumnVector(dt, fields[colId]); + orcVectorWrappers[i] = new HiveAcidColumnVector(dt, fields[colId]); } } @@ -340,7 +376,7 @@ public void initBatch( } } - if (isFullAcidTable) { + if (isAcidScan) { fullAcidRecordReader = initHiveAcidReader(conf, fileSplit, baseRecordReader); } else { fullAcidRecordReader = null; @@ -348,26 +384,39 @@ public void initBatch( } /** - * Return true if there exists more data in the next batch. If exists, prepare the next batch - * by copying from ORC VectorizedRowBatch columns to Spark ColumnarBatch columns. + * Return true if there exists more data in the next batch. For acid scan, the ACID batch + * reader is used. The ACID batch reader internally uses the baseRecordReader and then + * filters out the deleted/not visible records. This filter is propagated here using + * selectedInUse. If selectedInUse is false, that means there is no filtering happened + * so we can directly use the orcVectorWrappers. If selectedInUse is set to true, we + * have to recreate the column batch using selected array. */ private boolean nextBatch() throws IOException { + VectorizedRowBatch vrb; if (fullAcidRecordReader != null) { - fullAcidRecordReader.next(NullWritable.get(), batch); + vrb = schema.createRowBatch(capacity); + // Internally Acid batch reader changes the batch schema. So vrb is passed instead of batch. + if (!fullAcidRecordReader.next(NullWritable.get(), vrb)) { + // Should not use batch size for fullAcidRecordReader. The batch size may be 0 in some cases + // where whole batch of records are filtered out. + return false; + } } else { - baseRecordReader.nextBatch(batch); + if (!baseRecordReader.nextBatch(batch)) { + //TODO: Should we return false if batch size is 0? + return false; + } + vrb = batch; } - //recordReader.nextBatch(batch); - int batchSize = batch.size; - if (batchSize == 0) { - return false; - } + int batchSize = vrb.size; - if (!batch.selectedInUse) { + // selectedInUse is false means no filtering is done. We can use the wrapper directly. No need to + // recreate the column batch. + if (!vrb.selectedInUse) { for (int i = 0; i < requiredFields.length; i++) { if (requestedColIds[i] != -1) { - ((OrcColumnVector) orcVectorWrappers[i]).setBatchSize(batchSize); + ((HiveAcidColumnVector) orcVectorWrappers[i]).setBatchSize(batchSize); } } columnarBatch = new ColumnarBatch(orcVectorWrappers); @@ -375,27 +424,33 @@ private boolean nextBatch() throws IOException { return true; } - for (WritableColumnVector vector : columnVectors) { - vector.reset(); - } - - StructColumnVector dataCols = (StructColumnVector)batch.cols[rootCol]; - for (int i = 0; i < requiredFields.length; i++) { - StructField field = requiredFields[i]; - WritableColumnVector toColumn = columnVectors[i]; - if (requestedColIds[i] >= 0) { - ColumnVector fromColumn = dataCols.fields[requestedColIds[i]]; + // Recreate the batch using selected array. For those records with selected[idx] == 0, remove + // those from the resultant batch. So its possible that the batch size will become 0, but still we + // should return true, so that the caller calls next again. Before that we should reset the column + // vector to inform user that no data is there. + for (WritableColumnVector toColumn : columnVectors) { + toColumn.reset(); + } - if (fromColumn.isRepeating) { - putRepeatingValues(batchSize, field, fromColumn, toColumn); - } else if (fromColumn.noNulls) { - putNonNullValues(batchSize, field, fromColumn, toColumn, batch.selected); - } else { - putValues(batchSize, field, fromColumn, toColumn, batch.selected); + if (batchSize > 0) { + StructColumnVector dataCols = (StructColumnVector)vrb.cols[rootColIdx]; + for (int i = 0; i < requiredFields.length; i++) { + StructField field = requiredFields[i]; + WritableColumnVector toColumn = columnVectors[i]; + if (requestedColIds[i] >= 0) { + ColumnVector fromColumn = dataCols.fields[requestedColIds[i]]; + if (fromColumn.isRepeating) { + putRepeatingValues(batchSize, field, fromColumn, toColumn); + } else if (fromColumn.noNulls) { + putNonNullValues(batchSize, field, fromColumn, toColumn, vrb.selected); + } else { + putValues(batchSize, field, fromColumn, toColumn, vrb.selected); + } } } } + columnarBatch = new ColumnarBatch(columnVectors); columnarBatch.setNumRows(batchSize); return true; diff --git a/src/main/scala/com/qubole/spark/hiveacid/util/HiveAcidCommon.scala b/src/main/scala/com/qubole/spark/hiveacid/util/HiveAcidCommon.scala index 32cbdcb..8eef317 100644 --- a/src/main/scala/com/qubole/spark/hiveacid/util/HiveAcidCommon.scala +++ b/src/main/scala/com/qubole/spark/hiveacid/util/HiveAcidCommon.scala @@ -168,7 +168,7 @@ object HiveAcidCommon extends Logging { val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HiveAcidPartition(rddId, i, inputSplits(i)) - logWarning("getPartitions : Input split: " + inputSplits(i)) + logDebug("getPartitions : Input split: " + inputSplits(i)) } array } catch {