Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 86 : Add support for Datasource V2 : ORC #85

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

maheshk114
Copy link
Contributor

No description provided.

@maheshk114 maheshk114 changed the title Dv2 2 Issue 86 : Add support for Datasource V2 Jul 25, 2020
@maheshk114 maheshk114 changed the title Issue 86 : Add support for Datasource V2 Issue 86 : Add support for Datasource V2 : ORC Jul 25, 2020
@amoghmargoor amoghmargoor self-requested a review July 26, 2020 02:45
@amoghmargoor amoghmargoor self-assigned this Jul 26, 2020
colNames, colTypes) _

//TODO :Its a ugly hack, but avoids lots of duplicate code.
val rdd = new HiveAcidRDD(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we doing this only to get Splits ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Collaborator

@amoghmargoor amoghmargoor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have given one pass and see there is load code lift from Spark code base. We would like to understand more about why was that required and why we couldn't have just extended those classes or composed it or use them as it is.

}

def keyPrefix() : String = {
"HiveAcidV2"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would table created using 'USING HiveAcid' would never be able to use DSv2 untill not recreated ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Unless we make V2 the default reader.

*/
class HiveAcidMetadata(sparkSession: SparkSession,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an open API and it can break clients using it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like the changes are not required, will revert it.

partitions)
}

def getReader(requiredColumns: Array[String],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are returning list of Partitions here. We should not be calling it as getReader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, shall change it to getPartitionV2

*
* @return - Returns RDD on top of partitioned hive acid table
*/
def makeRDDForPartitionedTable(hiveAcidMetadata: HiveAcidMetadata,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for removing partitions from method and making it member variable of class. I think one HiveAcidReader can be used multiple times to read 1, 2 or more partitions. HiveAcidReader should map to just tables and not to <Table, partitions>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the logic is same. Earlier getTableReader used to create the HiveAcidReader and then call getRDD. Now i have just split it into two methods. The readerOptions is already created using the filters passed by user and from that the partitions list is generated. So i am not sure if we should change the partitions list after HiveAcidReader object is created.

* Spark's vectorized.ColumnVector, this column vector is used to adapt Hive ColumnVector with
* Spark ColumnarVector. This class is a copy of spark ColumnVector which is declared private.
*/
public class HiveAcidColumnVector extends org.apache.spark.sql.vectorized.ColumnVector {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks similar to org.apache.spark.sql.execution.datasources.orc.OrcColumnVector. Should we not be extending it or using it directly here. We should also be Format agnostic but we can use something like Factory etc to get ColumnVector based on format.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrcColumnVector in spark is in private scope so had to copy it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file is removed

* After creating, `initialize` and `initBatch` should be called sequentially. This internally uses
* the Hive ACID Vectorized ORC reader to support reading of deleted and updated data.
*/
public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks similar to org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader ? Should we use that instead . what are the extra changes here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it has some code from OrcColumnarBatchReader , but it has logic to support sargs and ACID table reader.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maheshk114 : Can't we set searchArguments outside somewhere outside i.e before initializing this reader? There doesn't seem to be anything specific to the reader which we are setting in search arguments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The search arguments are set in reader option, so its better to set with the reader code.

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._

private[hiveacid] class HiveAcidInputPartitionV2(split: HiveAcidPartition,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of calling it HiveAcidInputPartitionV2, we should it ColumnarHiveAcidPartition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have kept the name inline with "org.apache.spark.sql.sources.v2" .

val newRelation = new HiveAcidDataSource().createRelation(spark.sqlContext, options)
LogicalRelation(newRelation, isStreaming = false)
}

private def convertV2(relation: HiveTableRelation): LogicalPlan = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
if (!serde.equals("org.apache.hadoop.hive.ql.io.orc.orcserde")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Use HiveSerde.sourceToSerDe() api to do the comparison.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private def convertV2(relation: HiveTableRelation): LogicalPlan = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
if (!serde.equals("org.apache.hadoop.hive.ql.io.orc.orcserde")) {
// Only ORC formatted is supported as of now. If its not ORC, then fallback to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a log line stating the reason to fallback to v1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -61,7 +75,11 @@ case class HiveAcidAutoConvert(spark: SparkSession) extends Rule[LogicalPlan] {
// Read path
case relation: HiveTableRelation
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
convert(relation)
if (spark.conf.get("spark.acid.use.datasource.v2", "false").toBoolean) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: change config to spark.hive-acid.read.datasource.version. The config will accept "v1" and "v2" as valid values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense. This way the config can be extended if later we want to support "v3" reader. Done


class HiveAcidDataSourceV2 extends DataSourceV2 with ReadSupport with Logging {
override def createReader (options: DataSourceOptions) : DataSourceReader = {
logInfo("Using Datasource V2 for table" + options.tableName.get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Change logging to "Creating datasource v2 reader for table ....."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

options.databaseName.get, options.tableName.get)
}

def keyPrefix() : String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maheshk114 : Where is this method used? Since it is not an overridden method, I am not sure if it is getting used anywhere in spark/acid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have kept it for future use. But i think we should use the same "HiveAcid" for both the readers and switch is based on config.

//TODO: Need to generalize it for supporting other kind of file format.
orcColumnarBatchReader.initialize(fileSplit, taskAttemptContext)
orcColumnarBatchReader.initBatch(readerLocal.getSchema, requestedColIds,
requiredFields, partitionSchema, partitionValues, isFullAcidTable && !fileSplit.isOriginal)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we be creating orcColumnarBatchReader only if these conditions are met:
isFullAcidTable && !fileSplit.isOriginal

Right now orcColumnarBatchReader would work for both acid and non acid reads because we are creating a baseRecordReader but creating AcidRecordReader only if fullAcidScan is true.

Here we should only create vectorized reader for acid reads. For normal reads, we should simply throw an exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fullAcidScan is for full ACID table. The base reader will be used for insert-only table. In that case there will not be any filtering from Hive ACID reader as it does not support update/delete. I think spark native reader does not support read from insert-only Hive ACID tables. If the user has a ACID table with old file format(original), then HiveAcid reader should be able to read it. I don't think we should throw exception.

import com.qubole.shaded.hadoop.hive.ql.io.sarg.*;

/**
* After creating, `initialize` and `initBatch` should be called sequentially. This internally uses
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid code duplication, we can extend spark's OrcColumnarBatchReader and override the methods required. For example -

  1. initialize() - where we can initialize our vectorizedAcidRecordReader.
  2. nextKeyValue() -> to read next batch using vectorizedAcidRecordReader.
  3. getProgress() -> return progress of acid reader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think most of the code is required ..like putRepeatingValues,putNonNullValues is required as spark reader does not take care of the "selected" rows. In case of Hive ACID row batch, some of the rows needs to be ignored. I think that logic is not present in spark currently. Anyways i have extended it to avoid few duplicate method like putDecimalWritables

extends InputPartitionReader[ColumnarBatch] {
//TODO : Need to get a unique id to cache the jobConf.
private val jobConf = new JobConf(broadcastedConf.value.value)
private val orcColumnarBatchReader = new OrcColumnarBatchReader(1024)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we make capacity configurable? Currently it is hard coded to 1024.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am not sure if anybody can use this configuration.But for perf test and all i have am making it configurable using "spark.hive.acid.default.row.batch.size"

@@ -41,9 +44,9 @@ private[hiveacid] class TableReader(sparkSession: SparkSession,
curTxn: HiveAcidTxn,
hiveAcidMetadata: HiveAcidMetadata) extends Logging {

def getRdd(requiredColumns: Array[String],
def getTableReader(requiredColumns: Array[String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def getPartitionsV2(requiredColumns: Array[String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We can rename it to getInputPartitionsInBatch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to keep the naming with "V2" to make it inline with spark naming for v2 reader.

@sourabh912
Copy link
Contributor

sourabh912 commented Sep 30, 2020

@maheshk114 : I was testing the changes locally. I created a simple unpartitioned table and tried reading data in a range. I got the wrong results. Here is the query I tried:

sql("select * from t1 where id > 15 and id < 25").show

the output was:

+----------+---+
|      name| id|
+----------+---+
|new_name11| 11|
|new_name12| 12|
|new_name13| 13|
|new_name14| 14|
|new_name15| 15|
|new_name16| 16|
|new_name17| 17|
|new_name18| 18|
|new_name19| 19|
|new_name20| 20|
|new_name21| 21|
|new_name22| 22|
|new_name23| 23|
|new_name24| 24|
|new_name25| 25|
|new_name26| 26|
|new_name27| 27|
|new_name28| 28|
|new_name29| 29|
|new_name30| 30|
+----------+---+

Table schema is:

org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(id,IntegerType,true))

I get the expected result if I disable dsv2 reader. Let me know if you need more details regarding the above query.

Have you guys tested this PR in your environment?

@maheshk114
Copy link
Contributor Author

@sourabh912 thanks for pointing it out. The issue is that ORC does not support row level filtering and thus the filtering has to be done again in spark. We have done some testing internally. But the hive/spark/ORC version is different. For this specific issue, this was already fixed internally, i forgot to merge it to this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants