Skip to content

Commit

Permalink
Some code refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
maheshk114 committed Jul 26, 2020
1 parent 0fe9c79 commit 55ad669
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 102 deletions.
177 changes: 82 additions & 95 deletions src/main/scala/com/qubole/spark/hiveacid/rdd/HiveAcidRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ class HiveAcidPartition(rddId: Int, override val index: Int, s: InputSplit)
* `org.apache.spark.SparkContext.HiveAcidRDD()`
*/
private[hiveacid] class HiveAcidRDD[K, V](sc: SparkContext,
@transient val validWriteIds: ValidWriteIdList,
@transient val isFullAcidTable: Boolean,
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int)
@transient val validWriteIds: ValidWriteIdList,
@transient val isFullAcidTable: Boolean,
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int)
extends RDD[(RecordIdentifier, V)](sc, Nil) with Logging {

def this(
Expand Down Expand Up @@ -127,10 +127,22 @@ private[hiveacid] class HiveAcidRDD[K, V](sc: SparkContext,
// used to build JobTracker ID
private val createTime = new Date()

private val shouldCloneJobConf =
sparkContext.getConf.getBoolean("spark.hadoop.cloneConf", defaultValue = false)

private val ignoreCorruptFiles =
sparkContext.getConf.getBoolean("spark.files.ignoreCorruptFiles", defaultValue = false)

private val ignoreMissingFiles =
sparkContext.getConf.getBoolean("spark.files.ignoreMissingFiles", defaultValue = false)

private val ignoreEmptySplits =
sparkContext.getConf.getBoolean("spark.hadoopRDD.ignoreEmptySplits", defaultValue = false)

// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf: JobConf = {
def getJobConf: JobConf = {
val conf: Configuration = broadcastedConf.value.value
if (HiveAcidRDD.shouldCloneJobConf(sc)) {
if (shouldCloneJobConf) {
// Hadoop Configuration objects are not thread-safe, which may lead to various problems if
// one job modifies a configuration while another reads it (SPARK-2546). This problem occurs
// somewhat rarely because most jobs treat the configuration as though it's immutable. One
Expand Down Expand Up @@ -177,13 +189,36 @@ private[hiveacid] class HiveAcidRDD[K, V](sc: SparkContext,

def getHiveSplitsInfo: HiveSplitInfo = {
HiveSplitInfo(id, broadcastedConf, validWriteIds.writeToString(), minPartitions,
inputFormatClass.getCanonicalName, isFullAcidTable, HiveAcidRDD.shouldCloneJobConf(sc),
inputFormatClass.getCanonicalName, isFullAcidTable, shouldCloneJobConf,
initLocalJobConfFuncOpt)
}

protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
.asInstanceOf[InputFormat[K, V]]
newInputFormat match {
case c: Configurable => c.setConf(conf)
case _ =>
}
newInputFormat
}

override def getPartitions: Array[Partition] = {
HiveAcidRDD.getPartitions(sc, Some(getJobConf), inputFormatClass, id, isFullAcidTable,
validWriteIds, broadcastedConf, initLocalJobConfFuncOpt, minPartitions)
val jobConf: JobConf = HiveAcidRDD.setInputPathToJobConf(Some(getJobConf), isFullAcidTable, validWriteIds,
broadcastedConf, shouldCloneJobConf, initLocalJobConfFuncOpt)

// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
val paths = FileInputFormat.getInputPaths(jobConf)
val partitions = HiveAcidPartitionComputer.getFromSplitsCache(paths, validWriteIds)
if (partitions.isDefined) {
partitions.get.asInstanceOf[Array[Partition]]
} else {
logDebug(s"Splits Info is not cached, hence computing it here. Paths: ${paths.mkString(",")}")
new HiveAcidPartitionComputer(ignoreEmptySplits, ignoreMissingFiles)
.getPartitions[K, V](id, jobConf, getInputFormat(jobConf), minPartitions)
.asInstanceOf[Array[Partition]]
}
}

override def compute(theSplit: Partition,
Expand All @@ -195,7 +230,7 @@ private[hiveacid] class HiveAcidRDD[K, V](sc: SparkContext,
val jobConf: JobConf = getJobConf

private var reader: RecordReader[K, V] = _
private val inputFormat = HiveAcidRDD.getInputFormat(jobConf, inputFormatClass)
private val inputFormat = getInputFormat(jobConf)
HiveAcidRDD.addLocalConfiguration(
new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime),
context.stageId, theSplit.index, context.attemptNumber, jobConf)
Expand All @@ -208,13 +243,13 @@ private[hiveacid] class HiveAcidRDD[K, V](sc: SparkContext,
inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
}
} catch {
case e: FileNotFoundException if HiveAcidRDD.ignoreMissingFiles(sc) =>
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${split.inputSplit}", e)
finished = true
null
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !HiveAcidRDD.ignoreMissingFiles(sc) => throw e
case e: IOException if HiveAcidRDD.ignoreCorruptFiles(sc) =>
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
finished = true
null
Expand All @@ -241,12 +276,12 @@ private[hiveacid] class HiveAcidRDD[K, V](sc: SparkContext,
recordIdentifier = acidRecordReader.getRecordIdentifier
}
} catch {
case e: FileNotFoundException if HiveAcidRDD.ignoreMissingFiles(sc) =>
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${split.inputSplit}", e)
finished = true
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !HiveAcidRDD.ignoreMissingFiles(sc) => throw e
case e: IOException if HiveAcidRDD.ignoreCorruptFiles(sc) =>
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
finished = true
}
Expand Down Expand Up @@ -473,54 +508,6 @@ object HiveAcidRDD extends Logging {
}
jobConf
}

def shouldCloneJobConf(sparkContext : SparkContext) : Boolean =
sparkContext.getConf.getBoolean("spark.hadoop.cloneConf", defaultValue = false)

def ignoreCorruptFiles(sparkContext : SparkContext) : Boolean =
sparkContext.getConf.getBoolean("spark.files.ignoreCorruptFiles", defaultValue = false)

def ignoreMissingFiles(sparkContext : SparkContext) : Boolean =
sparkContext.getConf.getBoolean("spark.files.ignoreMissingFiles", defaultValue = false)

def ignoreEmptySplits(sparkContext : SparkContext) : Boolean =
sparkContext.getConf.getBoolean("spark.hadoopRDD.ignoreEmptySplits", defaultValue = false)

def getInputFormat[K, V](conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]]): InputFormat[K, V] = {
val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
.asInstanceOf[InputFormat[K, V]]
newInputFormat match {
case c: Configurable => c.setConf(conf)
case _ =>
}
newInputFormat
}

def getPartitions[K, V](sparkContext : SparkContext,
jobConfOpt: Option[JobConf],
inputFormatClass: Class[_ <: InputFormat[K, V]],
id : scala.Int,
isFullAcidTable: Boolean,
validWriteIds: ValidWriteIdList,
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
minPartitions: Int): Array[Partition] = {
val jobConf: JobConf = HiveAcidRDD.setInputPathToJobConf(jobConfOpt, isFullAcidTable, validWriteIds,
broadcastedConf, shouldCloneJobConf(sparkContext), initLocalJobConfFuncOpt)

// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
val paths = FileInputFormat.getInputPaths(jobConf)
val partitions = HiveAcidPartitionComputer.getFromSplitsCache(paths, validWriteIds)
if (partitions.isDefined) {
partitions.get.asInstanceOf[Array[Partition]]
} else {
logDebug(s"Splits Info is not cached, hence computing it here. Paths: ${paths.mkString(",")}")
new HiveAcidPartitionComputer(ignoreEmptySplits(sparkContext), ignoreMissingFiles(sparkContext))
.getPartitions[K, V](id, jobConf, getInputFormat(jobConf, inputFormatClass), minPartitions)
.asInstanceOf[Array[Partition]]
}
}
}

/**
Expand All @@ -534,38 +521,38 @@ private abstract class NextIterator[U] extends Iterator[U] {
protected var finished = false

/**
* Method for subclasses to implement to provide the next element.
*
* If no next element is available, the subclass should set `finished`
* to `true` and may return any value (it will be ignored).
*
* This convention is required because `null` may be a valid value,
* and using `Option` seems like it might create unnecessary Some/None
* instances, given some iterators might be called in a tight loop.
*
* @return U, or set 'finished' when done
*/
* Method for subclasses to implement to provide the next element.
*
* If no next element is available, the subclass should set `finished`
* to `true` and may return any value (it will be ignored).
*
* This convention is required because `null` may be a valid value,
* and using `Option` seems like it might create unnecessary Some/None
* instances, given some iterators might be called in a tight loop.
*
* @return U, or set 'finished' when done
*/
protected def getNext(): U

/**
* Method for subclasses to implement when all elements have been successfully
* iterated, and the iteration is done.
*
* <b>Note:</b> `NextIterator` cannot guarantee that `close` will be
* called because it has no control over what happens when an exception
* happens in the user code that is calling hasNext/next.
*
* Ideally you should have another try/catch, as in HadoopRDD, that
* ensures any resources are closed should iteration fail.
*/
* Method for subclasses to implement when all elements have been successfully
* iterated, and the iteration is done.
*
* <b>Note:</b> `NextIterator` cannot guarantee that `close` will be
* called because it has no control over what happens when an exception
* happens in the user code that is calling hasNext/next.
*
* Ideally you should have another try/catch, as in HadoopRDD, that
* ensures any resources are closed should iteration fail.
*/
protected def close()

/**
* Calls the subclass-defined close method, but only once.
*
* Usually calling `close` multiple times should be fine, but historically
* there have been issues with some InputFormats throwing exceptions.
*/
* Calls the subclass-defined close method, but only once.
*
* Usually calling `close` multiple times should be fine, but historically
* there have been issues with some InputFormats throwing exceptions.
*/
def closeIfNeeded() {
if (!closed) {
// Note: it's important that we set closed = true before calling close(), since setting it
Expand Down Expand Up @@ -595,4 +582,4 @@ private abstract class NextIterator[U] extends Iterator[U] {
gotNext = false
nextValue
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ extends CastSupport with Reader with Logging {
setReaderOptions(hiveAcidMetadata)

val ifcName = hiveAcidMetadata.hTable.getInputFormatClass.getName
val ifc = Util.classForName(ifcName, loadShaded = true)
val inputFormatClass = Util.classForName(ifcName, loadShaded = true)
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]

val colNames = getColumnNamesFromFieldSchema(fieldSchemas)
Expand All @@ -118,14 +118,23 @@ extends CastSupport with Reader with Logging {
hiveAcidMetadata.hTable.getParameters,
colNames, colTypes) _


val rdd = new HiveAcidRDD(
sparkSession.sparkContext,
validWriteIds,
hiveAcidOptions.isFullAcidTable,
_broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
Some(initializeJobConfFunc),
inputFormatClass,
classOf[Writable],
classOf[Writable],
_minSplitsPerRDD)


//TODO : Need to cache it with some unique id.
val jobConf = HiveAcidRDD.getJobConf(_broadcastedHadoopConf,
HiveAcidRDD.shouldCloneJobConf(sparkSession.sparkContext),
Some(initializeJobConfFunc))
val jobConf = rdd.getJobConf

val inputSplits = HiveAcidRDD.getPartitions(sparkSession.sparkContext, Some(jobConf), ifc, 0,
hiveAcidMetadata.isFullAcidTable, validWriteIds,
_broadcastedHadoopConf, Some(initializeJobConfFunc), _minSplitsPerRDD)
val inputSplits = rdd.getPartitions

val reqFields = hiveAcidMetadata.tableSchema.fields.filter(field =>
readerOptions.requiredNonPartitionedColumns.contains(field.name))
Expand Down

0 comments on commit 55ad669

Please sign in to comment.