Skip to content

Commit

Permalink
Added comments
Browse files Browse the repository at this point in the history
  • Loading branch information
maheshk114 committed Jul 25, 2020
1 parent 383f86f commit 3fd63a3
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 84 deletions.
2 changes: 1 addition & 1 deletion src/it/scala/com/qubole/spark/hiveacid/ReadSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.scalatest._

import scala.util.control.NonFatal

@Ignore
//@Ignore
class ReadSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {

val log: Logger = LogManager.getLogger(this.getClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class HiveAcidDataSourceV2Reader
if (dbName != null) {
hiveAcidMetadata = HiveAcidMetadata.fromSparkSession(sparkSession, dbName + "." + tblName)
} else {
// If db name is null, default db is chosen.
hiveAcidMetadata = HiveAcidMetadata.fromSparkSession(sparkSession, tblName)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,20 @@ extends CastSupport with Reader with Logging {
hiveAcidMetadata.hTable.getParameters,
colNames, colTypes) _

//TODO : Need to cahce it with some unique id.
//TODO : Need to cache it with some unique id.
val jobConf = new JobConf(_broadcastedHadoopConf.value.value)
initializeJobConfFunc(jobConf)

//TODO:Can this be done parallely in multiple threads?
val partitionArray = new java.util.ArrayList[InputPartition[ColumnarBatch]]
val inputSplits = HiveAcidCommon.getInputSplits(jobConf, validWriteIds, 0,
hiveAcidMetadata.isFullAcidTable, ifc)
val reqFileds = hiveAcidMetadata.tableSchema.fields.filter(field =>
val reqFields = hiveAcidMetadata.tableSchema.fields.filter(field =>
readerOptions.requiredNonPartitionedColumns.contains(field.name))
for (i <- 0 until inputSplits.size) {
for (i <- inputSplits.indices) {
partitionArray.add(new HiveAcidInputPartitionV2(inputSplits(i).asInstanceOf[HiveAcidPartition],
sparkSession.sparkContext.broadcast(new SerializableConfiguration(jobConf)),
partitionValues, reqFileds, hiveAcidMetadata.partitionSchema, hiveAcidMetadata.isFullAcidTable))
partitionValues, reqFields, hiveAcidMetadata.partitionSchema, hiveAcidMetadata.isFullAcidTable))
logInfo("getPartitions : Input split: " + inputSplits(i))
}
partitionArray
Expand Down Expand Up @@ -215,14 +216,12 @@ extends CastSupport with Reader with Logging {

val mutableRow = new SpecificInternalRow(hiveAcidMetadata.partitionSchema)

// Splits all attributes into two groups, partition key attributes and those
// that are not. Attached indices indicate the position of each attribute in
// the output schema.
val (partitionKeyAttrs, nonPartitionKeyAttrs) =
readerOptions.requiredAttributes.zipWithIndex.partition { case (attr, _) =>
val partitionKeyAttrs =
readerOptions.requiredAttributes.zipWithIndex.filter { attr =>
readerOptions.partitionAttributes.contains(attr)
}

//TODO : The partition values can be filled directly using hive acid batch reader.
def fillPartitionKeys(rawPartValues: Array[String], row: InternalRow): Unit = {
var offset = 0
partitionKeyAttrs.foreach { case (attr, ordinal) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
/**
* A column vector class wrapping Hive's ColumnVector. Because Spark ColumnarBatch only accepts
* Spark's vectorized.ColumnVector, this column vector is used to adapt Hive ColumnVector with
* Spark ColumnarVector.
* Spark ColumnarVector. This class is a copy of spark ColumnVector which is declared private.
*/
public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVector {
public class HiveAcidColumnVector extends org.apache.spark.sql.vectorized.ColumnVector {
private ColumnVector baseData;
private LongColumnVector longData;
private DoubleColumnVector doubleData;
Expand All @@ -49,7 +49,7 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto

private int batchSize;

OrcColumnVector(DataType type, ColumnVector vector) {
HiveAcidColumnVector(DataType type, ColumnVector vector) {
super(type);

if (type instanceof TimestampType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ private[v2] class HiveAcidInputPartitionReaderV2(split: HiveAcidPartition,
isFullAcidTable: Boolean)
extends InputPartitionReader[ColumnarBatch] {
//TODO : Need to get a unique id to cache the jobConf.
private val jobConf : JobConf = new JobConf(broadcastedConf.value.value)
private val sparkOrcColReader : OrcColumnarBatchReader =
new OrcColumnarBatchReader(1024)
private val jobConf = new JobConf(broadcastedConf.value.value)
private val orcColumnarBatchReader = new OrcColumnarBatchReader(1024)

private def initReader() : Unit = {
// Get the reader schema using the column names and types set in hive conf.
Expand All @@ -63,26 +62,27 @@ private[v2] class HiveAcidInputPartitionReaderV2(split: HiveAcidPartition,
// Register the listener for closing the reader before init is done.
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val taskAttemptContext = new org.apache.hadoop.mapred.TaskAttemptContextImpl(jobConf, attemptId)
val iter = new org.apache.spark.sql.execution.datasources.RecordReaderIterator(sparkOrcColReader)
val iter = new org.apache.spark.sql.execution.datasources.RecordReaderIterator(orcColumnarBatchReader)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))

sparkOrcColReader.initialize(fileSplit, taskAttemptContext)
sparkOrcColReader.initBatch(readerLocal.getSchema, requestedColIds,
//TODO: Need to generalize it for supporting other kind of file format.
orcColumnarBatchReader.initialize(fileSplit, taskAttemptContext)
orcColumnarBatchReader.initBatch(readerLocal.getSchema, requestedColIds,
requiredFields, partitionSchema, partitionValues, isFullAcidTable && !fileSplit.isOriginal)
}
initReader()

@throws(classOf[IOException])
override def next() : Boolean = {
sparkOrcColReader.nextKeyValue()
orcColumnarBatchReader.nextKeyValue()
}

override def get () : ColumnarBatch = {
sparkOrcColReader.getCurrentValue
orcColumnarBatchReader.getCurrentValue
}

@throws(classOf[IOException])
override def close() : Unit = {
sparkOrcColReader.close()
orcColumnarBatchReader.close()
}
}
Loading

0 comments on commit 3fd63a3

Please sign in to comment.