From 55ad669ce10ffbb05f9269bc94f9d3eab370ed61 Mon Sep 17 00:00:00 2001 From: Mahesh Kumar Behera Date: Sun, 26 Jul 2020 10:41:38 +0530 Subject: [PATCH] Some code refactoring --- .../spark/hiveacid/rdd/HiveAcidRDD.scala | 177 ++++++++---------- .../hiveacid/reader/hive/HiveAcidReader.scala | 23 ++- 2 files changed, 98 insertions(+), 102 deletions(-) diff --git a/src/main/scala/com/qubole/spark/hiveacid/rdd/HiveAcidRDD.scala b/src/main/scala/com/qubole/spark/hiveacid/rdd/HiveAcidRDD.scala index 0bbea9a..84e2d1f 100644 --- a/src/main/scala/com/qubole/spark/hiveacid/rdd/HiveAcidRDD.scala +++ b/src/main/scala/com/qubole/spark/hiveacid/rdd/HiveAcidRDD.scala @@ -88,14 +88,14 @@ class HiveAcidPartition(rddId: Int, override val index: Int, s: InputSplit) * `org.apache.spark.SparkContext.HiveAcidRDD()` */ private[hiveacid] class HiveAcidRDD[K, V](sc: SparkContext, - @transient val validWriteIds: ValidWriteIdList, - @transient val isFullAcidTable: Boolean, - broadcastedConf: Broadcast[SerializableConfiguration], - initLocalJobConfFuncOpt: Option[JobConf => Unit], - inputFormatClass: Class[_ <: InputFormat[K, V]], - keyClass: Class[K], - valueClass: Class[V], - minPartitions: Int) + @transient val validWriteIds: ValidWriteIdList, + @transient val isFullAcidTable: Boolean, + broadcastedConf: Broadcast[SerializableConfiguration], + initLocalJobConfFuncOpt: Option[JobConf => Unit], + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minPartitions: Int) extends RDD[(RecordIdentifier, V)](sc, Nil) with Logging { def this( @@ -127,10 +127,22 @@ private[hiveacid] class HiveAcidRDD[K, V](sc: SparkContext, // used to build JobTracker ID private val createTime = new Date() + private val shouldCloneJobConf = + sparkContext.getConf.getBoolean("spark.hadoop.cloneConf", defaultValue = false) + + private val ignoreCorruptFiles = + sparkContext.getConf.getBoolean("spark.files.ignoreCorruptFiles", defaultValue = false) + + private val ignoreMissingFiles = + sparkContext.getConf.getBoolean("spark.files.ignoreMissingFiles", defaultValue = false) + + private val ignoreEmptySplits = + sparkContext.getConf.getBoolean("spark.hadoopRDD.ignoreEmptySplits", defaultValue = false) + // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. - protected def getJobConf: JobConf = { + def getJobConf: JobConf = { val conf: Configuration = broadcastedConf.value.value - if (HiveAcidRDD.shouldCloneJobConf(sc)) { + if (shouldCloneJobConf) { // Hadoop Configuration objects are not thread-safe, which may lead to various problems if // one job modifies a configuration while another reads it (SPARK-2546). This problem occurs // somewhat rarely because most jobs treat the configuration as though it's immutable. One @@ -177,13 +189,36 @@ private[hiveacid] class HiveAcidRDD[K, V](sc: SparkContext, def getHiveSplitsInfo: HiveSplitInfo = { HiveSplitInfo(id, broadcastedConf, validWriteIds.writeToString(), minPartitions, - inputFormatClass.getCanonicalName, isFullAcidTable, HiveAcidRDD.shouldCloneJobConf(sc), + inputFormatClass.getCanonicalName, isFullAcidTable, shouldCloneJobConf, initLocalJobConfFuncOpt) } + protected def getInputFormat(conf: JobConf): InputFormat[K, V] = { + val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) + .asInstanceOf[InputFormat[K, V]] + newInputFormat match { + case c: Configurable => c.setConf(conf) + case _ => + } + newInputFormat + } + override def getPartitions: Array[Partition] = { - HiveAcidRDD.getPartitions(sc, Some(getJobConf), inputFormatClass, id, isFullAcidTable, - validWriteIds, broadcastedConf, initLocalJobConfFuncOpt, minPartitions) + val jobConf: JobConf = HiveAcidRDD.setInputPathToJobConf(Some(getJobConf), isFullAcidTable, validWriteIds, + broadcastedConf, shouldCloneJobConf, initLocalJobConfFuncOpt) + + // add the credentials here as this can be called before SparkContext initialized + SparkHadoopUtil.get.addCredentials(jobConf) + val paths = FileInputFormat.getInputPaths(jobConf) + val partitions = HiveAcidPartitionComputer.getFromSplitsCache(paths, validWriteIds) + if (partitions.isDefined) { + partitions.get.asInstanceOf[Array[Partition]] + } else { + logDebug(s"Splits Info is not cached, hence computing it here. Paths: ${paths.mkString(",")}") + new HiveAcidPartitionComputer(ignoreEmptySplits, ignoreMissingFiles) + .getPartitions[K, V](id, jobConf, getInputFormat(jobConf), minPartitions) + .asInstanceOf[Array[Partition]] + } } override def compute(theSplit: Partition, @@ -195,7 +230,7 @@ private[hiveacid] class HiveAcidRDD[K, V](sc: SparkContext, val jobConf: JobConf = getJobConf private var reader: RecordReader[K, V] = _ - private val inputFormat = HiveAcidRDD.getInputFormat(jobConf, inputFormatClass) + private val inputFormat = getInputFormat(jobConf) HiveAcidRDD.addLocalConfiguration( new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime), context.stageId, theSplit.index, context.attemptNumber, jobConf) @@ -208,13 +243,13 @@ private[hiveacid] class HiveAcidRDD[K, V](sc: SparkContext, inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) } } catch { - case e: FileNotFoundException if HiveAcidRDD.ignoreMissingFiles(sc) => + case e: FileNotFoundException if ignoreMissingFiles => logWarning(s"Skipped missing file: ${split.inputSplit}", e) finished = true null // Throw FileNotFoundException even if `ignoreCorruptFiles` is true - case e: FileNotFoundException if !HiveAcidRDD.ignoreMissingFiles(sc) => throw e - case e: IOException if HiveAcidRDD.ignoreCorruptFiles(sc) => + case e: FileNotFoundException if !ignoreMissingFiles => throw e + case e: IOException if ignoreCorruptFiles => logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e) finished = true null @@ -241,12 +276,12 @@ private[hiveacid] class HiveAcidRDD[K, V](sc: SparkContext, recordIdentifier = acidRecordReader.getRecordIdentifier } } catch { - case e: FileNotFoundException if HiveAcidRDD.ignoreMissingFiles(sc) => + case e: FileNotFoundException if ignoreMissingFiles => logWarning(s"Skipped missing file: ${split.inputSplit}", e) finished = true // Throw FileNotFoundException even if `ignoreCorruptFiles` is true - case e: FileNotFoundException if !HiveAcidRDD.ignoreMissingFiles(sc) => throw e - case e: IOException if HiveAcidRDD.ignoreCorruptFiles(sc) => + case e: FileNotFoundException if !ignoreMissingFiles => throw e + case e: IOException if ignoreCorruptFiles => logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e) finished = true } @@ -473,54 +508,6 @@ object HiveAcidRDD extends Logging { } jobConf } - - def shouldCloneJobConf(sparkContext : SparkContext) : Boolean = - sparkContext.getConf.getBoolean("spark.hadoop.cloneConf", defaultValue = false) - - def ignoreCorruptFiles(sparkContext : SparkContext) : Boolean = - sparkContext.getConf.getBoolean("spark.files.ignoreCorruptFiles", defaultValue = false) - - def ignoreMissingFiles(sparkContext : SparkContext) : Boolean = - sparkContext.getConf.getBoolean("spark.files.ignoreMissingFiles", defaultValue = false) - - def ignoreEmptySplits(sparkContext : SparkContext) : Boolean = - sparkContext.getConf.getBoolean("spark.hadoopRDD.ignoreEmptySplits", defaultValue = false) - - def getInputFormat[K, V](conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]]): InputFormat[K, V] = { - val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) - .asInstanceOf[InputFormat[K, V]] - newInputFormat match { - case c: Configurable => c.setConf(conf) - case _ => - } - newInputFormat - } - - def getPartitions[K, V](sparkContext : SparkContext, - jobConfOpt: Option[JobConf], - inputFormatClass: Class[_ <: InputFormat[K, V]], - id : scala.Int, - isFullAcidTable: Boolean, - validWriteIds: ValidWriteIdList, - broadcastedConf: Broadcast[SerializableConfiguration], - initLocalJobConfFuncOpt: Option[JobConf => Unit], - minPartitions: Int): Array[Partition] = { - val jobConf: JobConf = HiveAcidRDD.setInputPathToJobConf(jobConfOpt, isFullAcidTable, validWriteIds, - broadcastedConf, shouldCloneJobConf(sparkContext), initLocalJobConfFuncOpt) - - // add the credentials here as this can be called before SparkContext initialized - SparkHadoopUtil.get.addCredentials(jobConf) - val paths = FileInputFormat.getInputPaths(jobConf) - val partitions = HiveAcidPartitionComputer.getFromSplitsCache(paths, validWriteIds) - if (partitions.isDefined) { - partitions.get.asInstanceOf[Array[Partition]] - } else { - logDebug(s"Splits Info is not cached, hence computing it here. Paths: ${paths.mkString(",")}") - new HiveAcidPartitionComputer(ignoreEmptySplits(sparkContext), ignoreMissingFiles(sparkContext)) - .getPartitions[K, V](id, jobConf, getInputFormat(jobConf, inputFormatClass), minPartitions) - .asInstanceOf[Array[Partition]] - } - } } /** @@ -534,38 +521,38 @@ private abstract class NextIterator[U] extends Iterator[U] { protected var finished = false /** - * Method for subclasses to implement to provide the next element. - * - * If no next element is available, the subclass should set `finished` - * to `true` and may return any value (it will be ignored). - * - * This convention is required because `null` may be a valid value, - * and using `Option` seems like it might create unnecessary Some/None - * instances, given some iterators might be called in a tight loop. - * - * @return U, or set 'finished' when done - */ + * Method for subclasses to implement to provide the next element. + * + * If no next element is available, the subclass should set `finished` + * to `true` and may return any value (it will be ignored). + * + * This convention is required because `null` may be a valid value, + * and using `Option` seems like it might create unnecessary Some/None + * instances, given some iterators might be called in a tight loop. + * + * @return U, or set 'finished' when done + */ protected def getNext(): U /** - * Method for subclasses to implement when all elements have been successfully - * iterated, and the iteration is done. - * - * Note: `NextIterator` cannot guarantee that `close` will be - * called because it has no control over what happens when an exception - * happens in the user code that is calling hasNext/next. - * - * Ideally you should have another try/catch, as in HadoopRDD, that - * ensures any resources are closed should iteration fail. - */ + * Method for subclasses to implement when all elements have been successfully + * iterated, and the iteration is done. + * + * Note: `NextIterator` cannot guarantee that `close` will be + * called because it has no control over what happens when an exception + * happens in the user code that is calling hasNext/next. + * + * Ideally you should have another try/catch, as in HadoopRDD, that + * ensures any resources are closed should iteration fail. + */ protected def close() /** - * Calls the subclass-defined close method, but only once. - * - * Usually calling `close` multiple times should be fine, but historically - * there have been issues with some InputFormats throwing exceptions. - */ + * Calls the subclass-defined close method, but only once. + * + * Usually calling `close` multiple times should be fine, but historically + * there have been issues with some InputFormats throwing exceptions. + */ def closeIfNeeded() { if (!closed) { // Note: it's important that we set closed = true before calling close(), since setting it @@ -595,4 +582,4 @@ private abstract class NextIterator[U] extends Iterator[U] { gotNext = false nextValue } -} \ No newline at end of file +} diff --git a/src/main/scala/com/qubole/spark/hiveacid/reader/hive/HiveAcidReader.scala b/src/main/scala/com/qubole/spark/hiveacid/reader/hive/HiveAcidReader.scala index fce5949..1c72ad9 100644 --- a/src/main/scala/com/qubole/spark/hiveacid/reader/hive/HiveAcidReader.scala +++ b/src/main/scala/com/qubole/spark/hiveacid/reader/hive/HiveAcidReader.scala @@ -108,7 +108,7 @@ extends CastSupport with Reader with Logging { setReaderOptions(hiveAcidMetadata) val ifcName = hiveAcidMetadata.hTable.getInputFormatClass.getName - val ifc = Util.classForName(ifcName, loadShaded = true) + val inputFormatClass = Util.classForName(ifcName, loadShaded = true) .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] val colNames = getColumnNamesFromFieldSchema(fieldSchemas) @@ -118,14 +118,23 @@ extends CastSupport with Reader with Logging { hiveAcidMetadata.hTable.getParameters, colNames, colTypes) _ + + val rdd = new HiveAcidRDD( + sparkSession.sparkContext, + validWriteIds, + hiveAcidOptions.isFullAcidTable, + _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]], + Some(initializeJobConfFunc), + inputFormatClass, + classOf[Writable], + classOf[Writable], + _minSplitsPerRDD) + + //TODO : Need to cache it with some unique id. - val jobConf = HiveAcidRDD.getJobConf(_broadcastedHadoopConf, - HiveAcidRDD.shouldCloneJobConf(sparkSession.sparkContext), - Some(initializeJobConfFunc)) + val jobConf = rdd.getJobConf - val inputSplits = HiveAcidRDD.getPartitions(sparkSession.sparkContext, Some(jobConf), ifc, 0, - hiveAcidMetadata.isFullAcidTable, validWriteIds, - _broadcastedHadoopConf, Some(initializeJobConfFunc), _minSplitsPerRDD) + val inputSplits = rdd.getPartitions val reqFields = hiveAcidMetadata.tableSchema.fields.filter(field => readerOptions.requiredNonPartitionedColumns.contains(field.name))