Skip to content

Commit

Permalink
[HUDI-1292] [RFC-15] Use metadata table (if present) to get all parti…
Browse files Browse the repository at this point in the history
…tion paths (apache#2351)
  • Loading branch information
umehrot2 authored Dec 20, 2020
1 parent 3cab54e commit c6c7e18
Show file tree
Hide file tree
Showing 26 changed files with 184 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitio
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
try {
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
config.shouldAssumeDatePartitioning());
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
return super.loadInvolvedFiles(allPartitionPaths, jsc, hoodieTable);
} catch (IOException e) {
throw new HoodieIOException("Failed to load all partitions", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ protected JavaPairRDD<HoodieKey, HoodieRecordLocation> fetchAllRecordLocations(J
protected List<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(final JavaSparkContext jsc, final HoodieTable hoodieTable) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
try {
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), config.shouldAssumeDatePartitioning());
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
// Obtain the latest data files from all the partitions.
return getLatestBaseFilesForAllPartitions(allPartitionPaths, jsc, hoodieTable);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,15 +312,16 @@ private void bootstrapFromFilesystem(JavaSparkContext jsc, HoodieTableMetaClient

// List all partitions in the basePath of the containing dataset
FileSystem fs = datasetMetaClient.getFs();
List<String> partitions = FSUtils.getAllPartitionPaths(fs, datasetWriteConfig.getBasePath(), datasetWriteConfig.shouldAssumeDatePartitioning());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(hadoopConf, datasetWriteConfig.getBasePath(),
datasetWriteConfig.shouldAssumeDatePartitioning());
List<String> partitions = fileSystemBackedTableMetadata.getAllPartitionPaths();
LOG.info("Initializing metadata table by using file listings in " + partitions.size() + " partitions");

// List all partitions in parallel and collect the files in them
int parallelism = Math.min(partitions.size(), jsc.defaultParallelism()) + 1; // +1 to prevent 0 parallelism
JavaPairRDD<String, FileStatus[]> partitionFileListRDD = jsc.parallelize(partitions, parallelism)
.mapToPair(partition -> {
FileSystem fsys = datasetMetaClient.getFs();
FileStatus[] statuses = FSUtils.getAllDataFilesInPartition(fsys, new Path(datasetWriteConfig.getBasePath(), partition));
FileStatus[] statuses = fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(datasetWriteConfig.getBasePath(), partition));
return new Tuple2<>(partition, statuses);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc, HoodieT
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime);
List<String> partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
config.shouldAssumeDatePartitioning());
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());

// filter the partition paths if needed to reduce list status
partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ protected List<HoodieRollbackStat> executeRollback() {
@Override
protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning());
config);
return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(jsc, instantToRollback, rollbackRequests);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ static HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRoll
* Generate all rollback requests that needs rolling back this action without actually performing rollback for COW table type.
* @param fs instance of {@link FileSystem} to use.
* @param basePath base path of interest.
* @param shouldAssumeDatePartitioning {@code true} if date partitioning should be assumed. {@code false} otherwise.
* @param config instance of {@link HoodieWriteConfig} to use.
* @return {@link List} of {@link ListingBasedRollbackRequest}s thus collected.
*/
public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(FileSystem fs, String basePath, boolean shouldAssumeDatePartitioning) {
public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(FileSystem fs, String basePath,
HoodieWriteConfig config) {
try {
return FSUtils.getAllPartitionPaths(fs, basePath, shouldAssumeDatePartitioning).stream()
return FSUtils.getAllPartitionPaths(fs, basePath, config.useFileListingMetadata(), config.getFileListingMetadataVerify(),
config.shouldAssumeDatePartitioning()).stream()
.map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction)
.collect(Collectors.toList());
} catch (IOException e) {
Expand All @@ -113,7 +115,7 @@ public static List<ListingBasedRollbackRequest> generateRollbackRequestsUsingFil
String commit = instantToRollback.getTimestamp();
HoodieWriteConfig config = table.getConfig();
List<String> partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning());
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
jsc.setJobGroup(RollbackUtils.class.getSimpleName(), "Generate all rollback requests");
return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public HoodieSavepointMetadata execute() {

jsc.setJobGroup(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime);
Map<String, List<String>> latestFilesMap = jsc.parallelize(FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()))
table.getMetaClient().getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()))
.mapToPair(partitionPath -> {
// Scan all partitions files with this commit time
LOG.info("Collecting latest files in partition path " + partitionPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private static void recreateMarkerFiles(final String commitInstantTime, HoodieTa
List<ListingBasedRollbackRequest> rollbackRequests;
if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
table.getConfig().shouldAssumeDatePartitioning());
table.getConfig());
} else {
rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, jsc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ public void testSavepointAndRollback() throws Exception {
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
List<String> partitionPaths =
FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning());
HoodieWriteConfig config = getConfig();
List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), config.useFileListingMetadata(),
config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.create(metaClient, getConfig(), hadoopConf);
final BaseFileOnlyView view1 = table.getBaseFileOnlyView();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ private void validateMetadata(HoodieWriteClient client) throws IOException {
// Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that function filters all directory
// in the .hoodie folder.
List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(fs, HoodieTableMetadata.getMetadataTableBasePath(basePath),
false);
false, false,false);
assertEquals(MetadataPartitionType.values().length, metadataTablePartitions.size());

// Metadata table should automatically compact and clean
Expand Down
10 changes: 7 additions & 3 deletions hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.InvalidHoodiePathException;
import org.apache.hudi.metadata.HoodieTableMetadata;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -248,12 +250,14 @@ public static void processFiles(FileSystem fs, String basePathStr, Function<File
}
}

public static List<String> getAllPartitionPaths(FileSystem fs, String basePathStr, boolean assumeDatePartitioning)
throws IOException {
public static List<String> getAllPartitionPaths(FileSystem fs, String basePathStr, boolean useFileListingFromMetadata, boolean verifyListings,
boolean assumeDatePartitioning) throws IOException {
if (assumeDatePartitioning) {
return getAllPartitionFoldersThreeLevelsDown(fs, basePathStr);
} else {
return getAllFoldersWithPartitionMetaFile(fs, basePathStr);
HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(fs.getConf(), basePathStr, "/tmp/", useFileListingFromMetadata,
verifyListings, false, false);
return tableMetadata.getAllPartitionPaths();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.metadata;

import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.util.List;

public class FileSystemBackedTableMetadata implements HoodieTableMetadata {

private final SerializableConfiguration hadoopConf;
private final String datasetBasePath;
private final boolean assumeDatePartitioning;

public FileSystemBackedTableMetadata(SerializableConfiguration conf, String datasetBasePath, boolean assumeDatePartitioning) {
this.hadoopConf = conf;
this.datasetBasePath = datasetBasePath;
this.assumeDatePartitioning = assumeDatePartitioning;
}

@Override
public FileStatus[] getAllFilesInPartition(Path partitionPath) throws IOException {
FileSystem fs = partitionPath.getFileSystem(hadoopConf.get());
return FSUtils.getAllDataFilesInPartition(fs, partitionPath);
}

@Override
public List<String> getAllPartitionPaths() throws IOException {
FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get());
if (assumeDatePartitioning) {
return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, datasetBasePath);
} else {
return FSUtils.getAllFoldersWithPartitionMetaFile(fs, datasetBasePath);
}
}

@Override
public Option<String> getSyncedInstantTime() {
throw new UnsupportedOperationException();
}

@Override
public boolean isInSync() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
Expand Down Expand Up @@ -108,7 +107,7 @@ public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, Str
try {
this.metaClient = new HoodieTableMetaClient(hadoopConf.get(), metadataBasePath);
} catch (TableNotFoundException e) {
LOG.error("Metadata table was not found at path " + metadataBasePath);
LOG.warn("Metadata table was not found at path " + metadataBasePath + ", hence not using it.");
this.enabled = false;
} catch (Exception e) {
LOG.error("Failed to initialize metadata table at path " + metadataBasePath, e);
Expand Down Expand Up @@ -145,8 +144,7 @@ public List<String> getAllPartitionPaths()
}
}

FileSystem fs = FSUtils.getFs(datasetBasePath, hadoopConf.get());
return FSUtils.getAllPartitionPaths(fs, datasetBasePath, assumeDatePartitioning);
return new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, assumeDatePartitioning).getAllPartitionPaths();
}

/**
Expand Down Expand Up @@ -199,7 +197,8 @@ protected List<String> fetchAllPartitionPaths() throws IOException {
if (validateLookups) {
// Validate the Metadata Table data by listing the partitions from the file system
timer.startTimer();
List<String> actualPartitions = FSUtils.getAllPartitionPaths(metaClient.getFs(), datasetBasePath, false);
FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, assumeDatePartitioning);
List<String> actualPartitions = fileSystemBackedTableMetadata.getAllPartitionPaths();
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR, timer.endTimer()));

Collections.sort(actualPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecord;
Expand Down Expand Up @@ -79,7 +80,8 @@ public DFSHoodieDatasetInputReader(JavaSparkContext jsc, String basePath, String

protected List<String> getPartitions(Option<Integer> partitionsLimit) throws IOException {
List<String> partitionPaths = FSUtils
.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), false);
.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE,false);
// Sort partition so we can pick last N partitions by default
Collections.sort(partitionPaths);
if (!partitionPaths.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteClient, HoodieWriteResult}
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
Expand Down Expand Up @@ -341,6 +341,8 @@ private[hudi] object HoodieSparkSqlWriter {
ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList: _*)
hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean
hiveSyncConfig.useFileListingFromMetadata = parameters(HoodieMetadataConfig.METADATA_ENABLE_PROP).toBoolean
hiveSyncConfig.verifyMetadataFileListing = parameters(HoodieMetadataConfig.METADATA_VALIDATE_PROP).toBoolean
hiveSyncConfig
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.hudi

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}

import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters.mapAsScalaMapConverter
Expand Down Expand Up @@ -46,6 +46,8 @@ object HoodieWriterUtils {
RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL,
PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL,
KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL,
HoodieMetadataConfig.METADATA_ENABLE_PROP -> HoodieMetadataConfig.DEFAULT_METADATA_ENABLE.toString,
HoodieMetadataConfig.METADATA_VALIDATE_PROP -> HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE.toString,
COMMIT_METADATA_KEYPREFIX_OPT_KEY -> DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL,
INSERT_DROP_DUPS_OPT_KEY -> DEFAULT_INSERT_DROP_DUPS_OPT_VAL,
STREAMING_RETRY_CNT_OPT_KEY -> DEFAULT_STREAMING_RETRY_CNT_OPT_VAL,
Expand Down
Loading

0 comments on commit c6c7e18

Please sign in to comment.