Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC-15][HUDI-1325] Merge updates of unsynced instants to metadata table #6

Open
wants to merge 5 commits into
base: rfc-15
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.hadoop.fs.Path;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieMetadataConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadata;
Expand Down Expand Up @@ -131,18 +131,15 @@ public String init(@CliOption(key = {"readonly"}, unspecifiedDefaultValue = "fal
throw new RuntimeException("Metadata directory (" + metadataPath.toString() + ") does not exist.");
}

long t1 = System.currentTimeMillis();
if (readOnly) {
//HoodieMetadata.init(HoodieCLI.conf, HoodieCLI.basePath);
} else {
HoodieTimer timer = new HoodieTimer().startTimer();
if (!readOnly) {
HoodieWriteConfig writeConfig = getWriteConfig();
initJavaSparkContext();
HoodieTableMetadataWriter.create(HoodieCLI.conf, writeConfig, jsc);
}
long t2 = System.currentTimeMillis();

String action = readOnly ? "Opened" : "Initialized";
return String.format(action + " Metadata Table in %s (duration=%.2fsec)", metadataPath, (t2 - t1) / 1000.0);
return String.format(action + " Metadata Table in %s (duration=%.2fsec)", metadataPath, (timer.endTimer()) / 1000.0);
}

@CliCommand(value = "metadata stats", help = "Print stats about the metadata")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
finalizeWrite(table, instantTime, stats);

try {
table.metadataWriter(jsc).update(metadata, instantTime);

activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
postCommit(table, metadata, instantTime, extraMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig writeConfig, bo
super(jsc, index, writeConfig, timelineService);
this.metrics = new HoodieMetrics(config, config.getTableName());
this.rollbackPending = rollbackPending;

// Initialize Metadata Table
HoodieTableMetadataWriter.create(hadoopConf, writeConfig, jsc);
syncTableMetadata();
}

/**
Expand Down Expand Up @@ -179,7 +177,6 @@ protected void rollBackInflightBootstrap() {
table.rollbackBootstrap(jsc, HoodieActiveTimeline.createNewInstantTime());
LOG.info("Finished rolling back pending bootstrap");
}

}

/**
Expand Down Expand Up @@ -384,7 +381,6 @@ private JavaRDD<WriteStatus> postWrite(HoodieWriteMetadata result, String instan
@Override
protected void postCommit(HoodieTable<?> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
try {

// Delete the marker directory for the instant.
new MarkerFiles(table, instantTime).quietDeleteMarkerDir(jsc, config.getMarkersDeleteParallelism());

Expand All @@ -400,6 +396,8 @@ protected void postCommit(HoodieTable<?> table, HoodieCommitMetadata metadata, S
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, hadoopConf);
archiveLog.archiveIfRequired(jsc);
autoCleanOnCommit(instantTime);

syncTableMetadata();
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
Expand Down Expand Up @@ -587,6 +585,11 @@ public HoodieCleanMetadata clean() {
return clean(HoodieActiveTimeline.createNewInstantTime());
}

public void syncTableMetadata() {
// Open up the metadata table again, for syncing
HoodieTableMetadataWriter.create(hadoopConf, config, jsc);
}

/**
* Provides a new commit time for a write operation (insert/update/delete).
*/
Expand Down Expand Up @@ -701,8 +704,6 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteSt
finalizeWrite(table, compactionCommitTime, writeStats);
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);

table.metadataWriter(jsc).update(metadata, compactionCommitTime);

CompactHelpers.completeInflightCompaction(table, compactionCommitTime, metadata);

if (compactionTimer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.common.config.DefaultHoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = "hoodie.bulkinsert.user.defined.partitioner.class";
public static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
public static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism";
public static final String FILE_LISTING_PARALLELISM = "hoodie.file.listing.parallelism";
public static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
public static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism";
public static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
Expand Down Expand Up @@ -212,6 +214,10 @@ public int getDeleteShuffleParallelism() {
return Math.max(Integer.parseInt(props.getProperty(DELETE_PARALLELISM)), 1);
}

public int getFileListingParallelism() {
return Math.max(Integer.parseInt(props.getProperty(FILE_LISTING_PARALLELISM)), 1);
}

public int getRollbackParallelism() {
return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM));
}
Expand Down Expand Up @@ -869,6 +875,11 @@ public Builder withDeleteParallelism(int parallelism) {
return this;
}

public Builder withFileListingParallelism(int parallelism) {
props.setProperty(FILE_LISTING_PARALLELISM, String.valueOf(parallelism));
return this;
}

public Builder withParallelism(int insertShuffleParallelism, int upsertShuffleParallelism) {
props.setProperty(INSERT_PARALLELISM, String.valueOf(insertShuffleParallelism));
props.setProperty(UPSERT_PARALLELISM, String.valueOf(upsertShuffleParallelism));
Expand Down Expand Up @@ -1023,6 +1034,8 @@ protected void setDefaults() {
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(FILE_LISTING_PARALLELISM), FILE_LISTING_PARALLELISM,
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM,
DEFAULT_ROLLBACK_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_USING_MARKERS), ROLLBACK_USING_MARKERS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitio
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
try {
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
config.shouldAssumeDatePartitioning());
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
return super.loadInvolvedFiles(allPartitionPaths, jsc, hoodieTable);
} catch (IOException e) {
throw new HoodieIOException("Failed to load all partitions", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ protected JavaPairRDD<HoodieKey, HoodieRecordLocation> fetchAllRecordLocations(J
protected List<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(final JavaSparkContext jsc, final HoodieTable hoodieTable) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
try {
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), config.shouldAssumeDatePartitioning());
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
// Obtain the latest data files from all the partitions.
return getLatestBaseFilesForAllPartitions(allPartitionPaths, jsc, hoodieTable);
} catch (IOException e) {
Expand Down
Loading