From f7f8c33f804b1bb2cfecf3ac18edc9eb584c679e Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Mon, 21 Dec 2020 10:42:32 -0800 Subject: [PATCH] [HUDI-1469] Faster initialization of metadata table using parallelized listing. (#2343) * [HUDI-1469] Faster initialization of metadata table using parallelized listing which finds partitions and files in a single scan. * MINOR fixes Co-authored-by: Vinoth Chandar --- .../apache/hudi/config/HoodieWriteConfig.java | 12 +++ .../HoodieBackedTableMetadataWriter.java | 94 ++++++++++++------- 2 files changed, 71 insertions(+), 35 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 484880cc9f51b..bf8295330fe7f 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -73,6 +73,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = "hoodie.bulkinsert.user.defined.partitioner.class"; public static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"; public static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism"; + public static final String FILE_LISTING_PARALLELISM = "hoodie.file.listing.parallelism"; public static final String DEFAULT_ROLLBACK_PARALLELISM = "100"; public static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism"; public static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes"; @@ -213,6 +214,10 @@ public int getDeleteShuffleParallelism() { return Math.max(Integer.parseInt(props.getProperty(DELETE_PARALLELISM)), 1); } + public int getFileListingParallelism() { + return Math.max(Integer.parseInt(props.getProperty(FILE_LISTING_PARALLELISM)), 1); + } + public int getRollbackParallelism() { return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM)); } @@ -870,6 +875,11 @@ public Builder withDeleteParallelism(int parallelism) { return this; } + public Builder withFileListingParallelism(int parallelism) { + props.setProperty(FILE_LISTING_PARALLELISM, String.valueOf(parallelism)); + return this; + } + public Builder withParallelism(int insertShuffleParallelism, int upsertShuffleParallelism) { props.setProperty(INSERT_PARALLELISM, String.valueOf(insertShuffleParallelism)); props.setProperty(UPSERT_PARALLELISM, String.valueOf(upsertShuffleParallelism)); @@ -1024,6 +1034,8 @@ protected void setDefaults() { DEFAULT_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM); + setDefaultOnCondition(props, !props.containsKey(FILE_LISTING_PARALLELISM), FILE_LISTING_PARALLELISM, + DEFAULT_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM, DEFAULT_ROLLBACK_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(ROLLBACK_USING_MARKERS), ROLLBACK_USING_MARKERS, diff --git a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index b560428809a30..f89a198d9abf2 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -52,6 +52,7 @@ import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieMetricsConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -67,7 +68,6 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -82,8 +82,6 @@ import java.util.Set; import java.util.stream.Collectors; -import scala.Tuple2; - import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX; import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP; @@ -196,6 +194,7 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi .withParallelism(parallelism, parallelism) .withDeleteParallelism(parallelism) .withRollbackParallelism(parallelism) + .withFileListingParallelism(writeConfig.getFileListingParallelism()) .withFinalizeWriteParallelism(parallelism); if (writeConfig.isMetricsOn()) { @@ -311,43 +310,17 @@ private void bootstrapFromFilesystem(JavaSparkContext jsc, HoodieTableMetaClient initTableMetadata(); // List all partitions in the basePath of the containing dataset - FileSystem fs = datasetMetaClient.getFs(); - FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(hadoopConf, datasetWriteConfig.getBasePath(), - datasetWriteConfig.shouldAssumeDatePartitioning()); - List partitions = fileSystemBackedTableMetadata.getAllPartitionPaths(); - LOG.info("Initializing metadata table by using file listings in " + partitions.size() + " partitions"); - - // List all partitions in parallel and collect the files in them - int parallelism = Math.min(partitions.size(), jsc.defaultParallelism()) + 1; // +1 to prevent 0 parallelism - JavaPairRDD partitionFileListRDD = jsc.parallelize(partitions, parallelism) - .mapToPair(partition -> { - FileStatus[] statuses = fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(datasetWriteConfig.getBasePath(), partition)); - return new Tuple2<>(partition, statuses); - }); - - // Collect the list of partitions and file lists - List> partitionFileList = partitionFileListRDD.collect(); + LOG.info("Initializing metadata table by using file listings in " + datasetWriteConfig.getBasePath()); + Map> partitionToFileStatus = getPartitionsToFilesMapping(jsc, datasetMetaClient); // Create a HoodieCommitMetadata with writeStats for all discovered files int[] stats = {0}; HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); - partitionFileList.forEach(t -> { - final String partition = t._1; - try { - if (!fs.exists(new Path(datasetWriteConfig.getBasePath(), partition + Path.SEPARATOR + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))) { - return; - } - } catch (IOException e) { - throw new HoodieMetadataException("Failed to check partition " + partition, e); - } - + partitionToFileStatus.forEach((partition, statuses) -> { // Filter the statuses to only include files which were created before or on createInstantTime - Arrays.stream(t._2).filter(status -> { + statuses.stream().filter(status -> { String filename = status.getPath().getName(); - if (filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { - return false; - } if (HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), HoodieTimeline.GREATER_THAN, createInstantTime)) { return false; @@ -370,10 +343,59 @@ private void bootstrapFromFilesystem(JavaSparkContext jsc, HoodieTableMetaClient } }); - LOG.info("Committing " + partitionFileList.size() + " partitions and " + stats[0] + " files to metadata"); + LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata"); update(commitMetadata, createInstantTime); } + /** + * Function to find hoodie partitions and list files in them in parallel. + * + * @param jsc + * @param datasetMetaClient + * @return Map of partition names to a list of FileStatus for all the files in the partition + */ + private Map> getPartitionsToFilesMapping(JavaSparkContext jsc, HoodieTableMetaClient datasetMetaClient) { + + List pathsToList = new LinkedList<>(); + pathsToList.add(new Path(datasetWriteConfig.getBasePath())); + + Map> partitionToFileStatus = new HashMap<>(); + final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism(); + SerializableConfiguration conf = new SerializableConfiguration(datasetMetaClient.getHadoopConf()); + + while (!pathsToList.isEmpty()) { + int listingParallelism = Math.min(fileListingParallelism, pathsToList.size()); + // List all directories in parallel + List> dirToFileListing = jsc.parallelize(pathsToList, listingParallelism) + .map(path -> { + FileSystem fs = path.getFileSystem(conf.get()); + return Pair.of(path, fs.listStatus(path)); + }).collect(); + pathsToList.clear(); + + // If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to + // the results. + dirToFileListing.forEach(p -> { + List filesInDir = Arrays.stream(p.getRight()).parallel() + .filter(fs -> !fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) + .collect(Collectors.toList()); + + if (p.getRight().length > filesInDir.size()) { + // Is a partition. Add all data files to result. + partitionToFileStatus.put(p.getLeft().getName(), filesInDir); + } else { + // Add sub-dirs to the queue + pathsToList.addAll(Arrays.stream(p.getRight()) + .filter(fs -> fs.isDirectory() && !fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) + .map(fs -> fs.getPath()) + .collect(Collectors.toList())); + } + }); + } + + return partitionToFileStatus; + } + /** * Sync the Metadata Table from the instants created on the dataset. * @@ -454,7 +476,9 @@ public void update(HoodieCommitMetadata commitMetadata, String instantTime) { writeStats.forEach(hoodieWriteStat -> { String pathWithPartition = hoodieWriteStat.getPath(); if (pathWithPartition == null) { - throw new HoodieMetadataException("Unable to find path in write stat to update metadata table " + hoodieWriteStat); + // Empty partition + LOG.warn("Unable to find path in write stat to update metadata table " + hoodieWriteStat); + return; } int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1;