From d16e9e1e69be9512b3dc366b05642bbcf803d2b6 Mon Sep 17 00:00:00 2001 From: Mahesh Kumar Behera Date: Mon, 24 Aug 2020 12:35:22 +0530 Subject: [PATCH] review comment fix --- .../hiveacid/HiveAcidDataSourceV2Reader.scala | 2 +- .../hiveacid/hive/HiveAcidMetadata.scala | 41 ++++++++----------- .../spark/hiveacid/reader/TableReader.scala | 6 +-- 3 files changed, 22 insertions(+), 27 deletions(-) diff --git a/src/main/scala/com/qubole/spark/hiveacid/HiveAcidDataSourceV2Reader.scala b/src/main/scala/com/qubole/spark/hiveacid/HiveAcidDataSourceV2Reader.scala index f6c65bb..5eab6c3 100644 --- a/src/main/scala/com/qubole/spark/hiveacid/HiveAcidDataSourceV2Reader.scala +++ b/src/main/scala/com/qubole/spark/hiveacid/HiveAcidDataSourceV2Reader.scala @@ -99,7 +99,7 @@ class HiveAcidDataSourceV2Reader txn: HiveAcidTxn => { import scala.collection.JavaConversions._ val reader = new TableReader(sparkSession, txn, hiveAcidMetadata) - val hiveReader = reader.getReader(schema.fieldNames, + val hiveReader = reader.getPartitionsV2(schema.fieldNames, pushedFilterArray, new SparkAcidConf(sparkSession, options.toMap)) factories.addAll(hiveReader) } diff --git a/src/main/scala/com/qubole/spark/hiveacid/hive/HiveAcidMetadata.scala b/src/main/scala/com/qubole/spark/hiveacid/hive/HiveAcidMetadata.scala index 3a30f95..9895e39 100644 --- a/src/main/scala/com/qubole/spark/hiveacid/hive/HiveAcidMetadata.scala +++ b/src/main/scala/com/qubole/spark/hiveacid/hive/HiveAcidMetadata.scala @@ -36,26 +36,27 @@ import org.apache.hadoop.mapred.{InputFormat, OutputFormat} import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.types._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan /** * Represents metadata for hive acid table and exposes API to perform operations on top of it - * @param database - name of the database - * @param identifier - table identifier - * @param hiveConf hiveConf - hive conf + * @param sparkSession - spark session object + * @param fullyQualifiedTableName - the fully qualified hive acid table name */ -class HiveAcidMetadata(database : Option[String], - identifier : String, - hiveConf: HiveConf, - caseSensitiveAnalysis : Boolean = false) extends Logging { +class HiveAcidMetadata(sparkSession: SparkSession, + fullyQualifiedTableName: String) extends Logging { + + // hive conf + private val hiveConf: HiveConf = HiveConverter.getHiveConf(sparkSession.sparkContext) + // a hive representation of the table val hTable: metadata.Table = { val hive: Hive = Hive.get(hiveConf) + val table = sparkSession.sessionState.sqlParser.parseTableIdentifier(fullyQualifiedTableName) val hTable = hive.getTable( - database match { + table.database match { case Some(database) => database case None => HiveAcidMetadata.DEFAULT_DATABASE - }, identifier) + }, table.identifier) Hive.closeCurrent() hTable } @@ -133,7 +134,7 @@ class HiveAcidMetadata(database : Option[String], } private def getColName(field: StructField): String = { - HiveAcidMetadata.getColName(caseSensitiveAnalysis, field) + HiveAcidMetadata.getColName(sparkSession, field) } } @@ -155,19 +156,13 @@ object HiveAcidMetadata { def fromSparkSession(sparkSession: SparkSession, fullyQualifiedTableName: String): HiveAcidMetadata = { - val logicalPlan = sparkSession.sessionState.sqlParser.parseTableIdentifier(fullyQualifiedTableName) - new HiveAcidMetadata(logicalPlan.database, - logicalPlan.table, - HiveConverter.getHiveConf(sparkSession.sparkContext), - sparkSession.sessionState.conf.caseSensitiveAnalysis) - } - - def fromTableName(database : Option[String], table : String, hiveConf : HiveConf): HiveAcidMetadata = { - new HiveAcidMetadata(database, table, hiveConf) + new HiveAcidMetadata( + sparkSession, + fullyQualifiedTableName) } - def getColName(caseSensitiveAnalysis : Boolean, field: StructField): String = { - if (caseSensitiveAnalysis) { + def getColName(sparkSession: SparkSession, field: StructField): String = { + if (sparkSession.sessionState.conf.caseSensitiveAnalysis) { field.name } else { field.name.toLowerCase(Locale.ROOT) @@ -175,6 +170,6 @@ object HiveAcidMetadata { } def getColNames(sparkSession: SparkSession, schema: StructType): Seq[String] = { - schema.map(getColName(sparkSession.sessionState.conf.caseSensitiveAnalysis, _)) + schema.map(getColName(sparkSession, _)) } } diff --git a/src/main/scala/com/qubole/spark/hiveacid/reader/TableReader.scala b/src/main/scala/com/qubole/spark/hiveacid/reader/TableReader.scala index 337e273..c5b879d 100644 --- a/src/main/scala/com/qubole/spark/hiveacid/reader/TableReader.scala +++ b/src/main/scala/com/qubole/spark/hiveacid/reader/TableReader.scala @@ -124,9 +124,9 @@ private[hiveacid] class TableReader(sparkSession: SparkSession, partitions) } - def getReader(requiredColumns: Array[String], - filters: Array[Filter], - readConf: SparkAcidConf): java.util.List[InputPartition[ColumnarBatch]] = { + def getPartitionsV2(requiredColumns: Array[String], + filters: Array[Filter], + readConf: SparkAcidConf): java.util.List[InputPartition[ColumnarBatch]] = { val reader = getTableReader(requiredColumns, filters, readConf) if (hiveAcidMetadata.isPartitioned) { logDebug("getReader for Partitioned table")