diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java index 56f4205d0b1b7..bb79d5a139eb2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java @@ -128,6 +128,10 @@ Path getCheckpointsDirectory() { return checkpointsDirectory; } + public Path getSharedStateDirectory() { + return sharedStateDirectory; + } + // ------------------------------------------------------------------------ // CheckpointStorage implementation // ------------------------------------------------------------------------ diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java index 5ae74ef7034cb..73e4344333271 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.description.TextElement; import static org.apache.flink.state.forst.ForStStateBackend.PriorityQueueStateType.ForStDB; +import static org.apache.flink.state.forst.ForStStateBackend.REMOTE_SHORTCUT_CHECKPOINT; /** Configuration options for the ForStStateBackend. */ @Experimental @@ -55,8 +56,9 @@ public class ForStOptions { .noDefaultValue() .withDescription( String.format( - "The remote directory where ForSt puts its SST files, fallback to %s if not configured.", - LOCAL_DIRECTORIES.key())); + "The remote directory where ForSt puts its SST files, fallback to %s if not configured." + + " Recognized shortcut name is '%s', which means that forst shares the directory with checkpoint.", + LOCAL_DIRECTORIES.key(), REMOTE_SHORTCUT_CHECKPOINT)); public static final ConfigOption CACHE_DIRECTORY = ConfigOptions.key("state.backend.forst.cache.dir") diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java index 47faf014ec49f..49d2f288141fc 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.StreamCompressionDecorator; +import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess; import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig; import org.apache.flink.state.forst.ForStMemoryControllerUtils.ForStMemoryFactory; import org.apache.flink.state.forst.sync.ForStPriorityQueueConfig; @@ -91,6 +92,8 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend implements ConfigurableStateBackend { + public static final String REMOTE_SHORTCUT_CHECKPOINT = "checkpoint-dir"; + private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(ForStStateBackend.class); @@ -173,6 +176,8 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend */ private final TernaryBoolean rescalingUseDeleteFilesInRange; + private boolean remoteShareWithCheckpoint; + // ------------------------------------------------------------------------ /** Creates a new {@code ForStStateBackend} for storing state. */ @@ -184,6 +189,7 @@ public ForStStateBackend() { this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD; this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED; this.rescalingUseDeleteFilesInRange = TernaryBoolean.UNDEFINED; + this.remoteShareWithCheckpoint = false; } /** @@ -200,11 +206,17 @@ private ForStStateBackend( original.memoryConfiguration, config); this.memoryConfiguration.validate(); + this.remoteShareWithCheckpoint = false; if (original.remoteForStDirectory != null) { this.remoteForStDirectory = original.remoteForStDirectory; } else { String remoteDirStr = config.get(ForStOptions.REMOTE_DIRECTORY); - this.remoteForStDirectory = remoteDirStr == null ? null : new Path(remoteDirStr); + if (REMOTE_SHORTCUT_CHECKPOINT.equals(remoteDirStr)) { + this.remoteForStDirectory = null; + this.remoteShareWithCheckpoint = true; + } else { + this.remoteForStDirectory = remoteDirStr == null ? null : new Path(remoteDirStr); + } } this.priorityQueueConfig = @@ -383,10 +395,22 @@ public ForStKeyedStateBackend createAsyncKeyedStateBackend( new Path( new File(new File(getNextStoragePath(), jobId.toHexString()), opChildPath) .getAbsolutePath()); - Path remoteBasePath = - remoteForStDirectory != null - ? new Path(new Path(remoteForStDirectory, jobId.toHexString()), opChildPath) - : null; + Path remoteBasePath = null; + if (remoteForStDirectory != null) { + remoteBasePath = + new Path(new Path(remoteForStDirectory, jobId.toHexString()), opChildPath); + } else if (remoteShareWithCheckpoint) { + if (env.getCheckpointStorageAccess() instanceof FsCheckpointStorageAccess) { + Path sharedStateDirectory = + ((FsCheckpointStorageAccess) env.getCheckpointStorageAccess()) + .getSharedStateDirectory(); + remoteBasePath = new Path(sharedStateDirectory, opChildPath); + LOG.info("Set remote ForSt directory to checkpoint directory {}", remoteBasePath); + } else { + LOG.warn( + "Remote ForSt directory can't be set, because checkpoint directory isn't on file system."); + } + } final OpaqueMemoryResource sharedResources = ForStOperationUtils.allocateSharedCachesIfConfigured( diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendV2Test.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendV2Test.java index 86af6bec57007..7bd4ab539fd53 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendV2Test.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendV2Test.java @@ -18,12 +18,20 @@ package org.apache.flink.state.forst; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.ConfigurableStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl; import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.state.v2.StateBackendTestV2Base; import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; @@ -31,17 +39,21 @@ import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.function.SupplierWithException; +import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import static org.apache.flink.state.forst.ForStConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING; import static org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; import static org.apache.flink.state.forst.ForStOptions.LOCAL_DIRECTORIES; import static org.apache.flink.state.forst.ForStOptions.REMOTE_DIRECTORY; +import static org.apache.flink.state.forst.ForStStateBackend.REMOTE_SHORTCUT_CHECKPOINT; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; /** Tests for the async keyed state backend part of {@link ForStStateBackend}. */ @ExtendWith(ParameterizedTestExtension.class) @@ -110,4 +122,59 @@ protected ConfigurableStateBackend getStateBackend() throws Exception { config.set(USE_DELETE_FILES_IN_RANGE_DURING_RESCALING, useDeleteFileInRange); return backend.configure(config, Thread.currentThread().getContextClassLoader()); } + + @TestTemplate + void testRemoteDirShareCheckpointDirWithJobId() throws Exception { + testRemoteDirShareCheckpointDir(true); + } + + @TestTemplate + void testRemoteDirShareCheckpointDirWOJob() throws Exception { + testRemoteDirShareCheckpointDir(false); + } + + void testRemoteDirShareCheckpointDir(boolean createJob) throws Exception { + JobID jobID = new JobID(); + String checkpointPath = TempDirUtils.newFolder(tempFolder).toURI().toString(); + FileSystemCheckpointStorage checkpointStorage = + new FileSystemCheckpointStorage(new Path(checkpointPath), 0, -1); + + Configuration config = new Configuration(); + config.set(LOCAL_DIRECTORIES, tempFolderForForStLocal.toString()); + config.set(REMOTE_DIRECTORY, REMOTE_SHORTCUT_CHECKPOINT); + config.set(CheckpointingOptions.CREATE_CHECKPOINT_SUB_DIR, createJob); + + checkpointStorage = + checkpointStorage.configure(config, Thread.currentThread().getContextClassLoader()); + MockEnvironment mockEnvironment = + MockEnvironment.builder().setTaskStateManager(getTestTaskStateManager()).build(); + mockEnvironment.setCheckpointStorageAccess( + checkpointStorage.createCheckpointStorage(jobID)); + + ForStStateBackend backend = new ForStStateBackend(); + backend = backend.configure(config, Thread.currentThread().getContextClassLoader()); + KeyGroupRange keyGroupRange = KeyGroupRange.of(0, 127); + ForStKeyedStateBackend keyedBackend = + backend.createAsyncKeyedStateBackend( + new KeyedStateBackendParametersImpl<>( + mockEnvironment, + jobID, + "test_op", + IntSerializer.INSTANCE, + keyGroupRange.getNumberOfKeyGroups(), + keyGroupRange, + env.getTaskKvStateRegistry(), + TtlTimeProvider.DEFAULT, + getMetricGroup(), + getCustomInitializationMetrics(), + Collections.emptyList(), + new CloseableRegistry(), + 1.0d)); + + assertThat(keyedBackend.getRemoteBasePath().getParent()) + .isEqualTo( + new Path( + checkpointStorage.getCheckpointPath(), + createJob ? jobID + "/shared" : "/shared")); + } }