Skip to content

Commit

Permalink
Fixed issue with range filter
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahesh Kumar Behera committed Oct 16, 2020
1 parent 1c9cf82 commit 5a84118
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class HiveAcidDataSourceV2Reader
override def pushFilters (filters: Array[Filter]): Array[Filter] = {
this.pushedFilterArray = HiveAcidSearchArgument.
getSupportedFilters(hiveAcidMetadata.tableSchema, filters.toSeq).toArray
filters.filterNot(filter => this.pushedFilterArray.contains(filter))
// ORC does not do row level filtering. So the filters has to be applied again.
filters
}

override def pushedFilters(): Array[Filter] = this.pushedFilterArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private[v2] class HiveAcidInputPartitionReaderV2(split: HiveAcidPartition,
//TODO: Need to generalize it for supporting other kind of file format.
orcColumnarBatchReader.initialize(fileSplit, taskAttemptContext)
orcColumnarBatchReader.initBatch(readerLocal.getSchema, requestedColIds,
requiredFields, partitionSchema, partitionValues, isFullAcidTable && !fileSplit.isOriginal)
requiredFields, partitionSchema, partitionValues, isFullAcidTable, fileSplit.isOriginal)
}
initReader()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,13 @@ public void close() throws IOException {
// The columns that are pushed as search arguments to ORC file reader.
private String[] getSargColumnNames(String[] originalColumnNames,
List<OrcProto.Type> types,
boolean[] includedColumns) {
boolean[] includedColumns,
boolean isOriginal) {
// Skip ACID related columns if present.
String[] columnNames = new String[types.size() - rootColIdx];
int dataColIdx = isOriginal ? 0 : rootColIdx + 1;
String[] columnNames = new String[types.size() - dataColIdx];
int i = 0;
Iterator iterator = ((OrcProto.Type)types.get(rootColIdx)).getSubtypesList().iterator();
Iterator iterator = ((OrcProto.Type)types.get(dataColIdx)).getSubtypesList().iterator();

while(true) {
int columnId;
Expand All @@ -165,14 +167,15 @@ private String[] getSargColumnNames(String[] originalColumnNames,
return columnNames;
}
columnId = (Integer)iterator.next();
} while(includedColumns != null && !includedColumns[columnId - rootColIdx]);
columnNames[columnId - rootColIdx] = originalColumnNames[i++];
} while(includedColumns != null && !includedColumns[columnId - dataColIdx]);
columnNames[columnId - dataColIdx] = originalColumnNames[i++];
}
}

private void setSearchArgument(Reader.Options options,
List<OrcProto.Type> types,
Configuration conf) {
Configuration conf,
boolean isOriginal) {
String neededColumnNames = conf.get("hive.io.file.readcolumn.names");
if (neededColumnNames == null) {
options.searchArgument((SearchArgument)null, (String[])null);
Expand All @@ -183,17 +186,18 @@ private void setSearchArgument(Reader.Options options,
options.searchArgument((SearchArgument)null, (String[])null);
} else {
String[] colNames = getSargColumnNames(neededColumnNames.split(","),
types, options.getInclude());
types, options.getInclude(), isOriginal);
options.searchArgument(sarg, colNames);
}
}
}

private void setSearchArgumentForOption(Configuration conf,
TypeDescription readerSchema,
Reader.Options readerOptions) {
Reader.Options readerOptions,
boolean isOriginal) {
final List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(readerSchema);
setSearchArgument(readerOptions, schemaTypes, conf);
setSearchArgument(readerOptions, schemaTypes, conf, isOriginal);
}

/**
Expand Down Expand Up @@ -274,8 +278,10 @@ public void initBatch(
StructField[] requiredFields,
StructType partitionSchema,
InternalRow partitionValues,
boolean isAcidScan) throws IOException {

boolean isFullAcidTable,
boolean isOriginal
) throws IOException {
boolean isAcidScan = isFullAcidTable && !isOriginal;
if (!isAcidScan) {
//rootCol = org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getRootColumn(true);
rootColIdx = 0;
Expand All @@ -292,7 +298,7 @@ public void initBatch(
.filesystem(fileSplit.getPath().getFileSystem(conf)));
Reader.Options options = /*createOptionsForReader(conf, orcSchema);*/
OrcInputFormat.buildOptions(conf, readerInner, fileSplit.getStart(), fileSplit.getLength());
setSearchArgumentForOption(conf, orcSchema, options);
setSearchArgumentForOption(conf, orcSchema, options, isOriginal);
baseRecordReader = readerInner.rows(options);

// This schema will have both required fields and the filed to be used by ACID reader.
Expand Down

0 comments on commit 5a84118

Please sign in to comment.