Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of segment metadata cache on Overlord #17785

Merged
merged 16 commits into from
Mar 26, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public TypeReference<Integer> getReturnTypeReference()
public Integer perform(Task task, TaskActionToolbox toolbox)
{
return toolbox.getIndexerMetadataStorageCoordinator()
.markSegmentsAsUnusedWithinInterval(dataSource, interval);
.markSegmentsWithinIntervalAsUnused(dataSource, interval, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexing.common.task.Task;

import java.util.Set;
Expand All @@ -42,6 +43,8 @@ public RetrieveUpgradedFromSegmentIdsAction(
{
this.dataSource = dataSource;
this.segmentIds = segmentIds;

IdUtils.getValidSegmentIds(dataSource, segmentIds);
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexing.common.task.Task;

import java.util.Set;
Expand All @@ -47,6 +48,8 @@ public RetrieveUpgradedToSegmentIdsAction(
{
this.dataSource = dataSource;
this.segmentIds = segmentIds;

IdUtils.getValidSegmentIds(dataSource, segmentIds);
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
Expand All @@ -49,7 +50,6 @@
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
Expand All @@ -66,19 +66,22 @@ public class OverlordDataSourcesResource
private static final Logger log = new Logger(OverlordDataSourcesResource.class);

private final SegmentsMetadataManager segmentsMetadataManager;
private final IndexerMetadataStorageCoordinator metadataStorageCoordinator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that the SegmentsMetadataManager is the Coordinator version of a metadata cache. Do you think we're moving towards getting rid of it completely on the Overlord side of things, in favor of using only IndexerMetadataStorageCoordinator?

It looks like after this patch, the SegmentsMetadataManager is still used in two places on the OL:

  • this class, for markAsUsedNonOvershadowedSegmentsInInterval, markAsUsedNonOvershadowedSegments, and markSegmentAsUsed.
  • in OverlordCompactionScheduler, for getSnapshotOfDataSourcesWithAllUsedSegments. to me it seems like this could already be replaced by IndexerSQLMetadataStorageCoordinator#retrieveAllUsedSegments. (although the SegmentsMetadataManager version would perform better, since it caches the snapshot, and the IndexerSQLMetadataStorageCoordinator version needs to build a timeline. I am not sure how much this matters.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, @gianm , I plan to get rid of SqlSegmentsMetadataManager from the Overlord completely.
Coordinator will continue to use it for the time being, but eventually we might just get rid of it altogether.

  • The usage of SqlSegmentsMetadataManager by OverlordCompactionScheduler does matter for performance because of the timeline and caching reasons as you guessed, but I should be able to resolve this soon.

private final TaskMaster taskMaster;
private final AuditManager auditManager;

@Inject
public OverlordDataSourcesResource(
TaskMaster taskMaster,
SegmentsMetadataManager segmentsMetadataManager,
IndexerMetadataStorageCoordinator metadataStorageCoordinator,
AuditManager auditManager
)
{
this.taskMaster = taskMaster;
this.auditManager = auditManager;
this.segmentsMetadataManager = segmentsMetadataManager;
this.metadataStorageCoordinator = metadataStorageCoordinator;
}

private interface SegmentUpdateOperation
Expand Down Expand Up @@ -109,8 +112,8 @@ public Response markAllSegmentsAsUnused(
@Context HttpServletRequest req
)
{
SegmentUpdateOperation operation = () -> segmentsMetadataManager
.markAsUnusedAllSegmentsInDataSource(dataSourceName);
SegmentUpdateOperation operation =
() -> metadataStorageCoordinator.markAllSegmentsAsUnused(dataSourceName);
final Response response = performSegmentUpdate(dataSourceName, operation);

final int responseCode = response.getStatus();
Expand Down Expand Up @@ -147,18 +150,10 @@ public Response markNonOvershadowedSegmentsAsUsed(
return 0;
}

// Validate segmentIds
final List<String> invalidSegmentIds = new ArrayList<>();
for (String segmentId : segmentIds) {
if (SegmentId.iteratePossibleParsingsWithDataSource(dataSourceName, segmentId).isEmpty()) {
invalidSegmentIds.add(segmentId);
}
}
if (!invalidSegmentIds.isEmpty()) {
throw InvalidInput.exception("Could not parse invalid segment IDs[%s]", invalidSegmentIds);
}

return segmentsMetadataManager.markAsUsedNonOvershadowedSegments(dataSourceName, segmentIds);
return segmentsMetadataManager.markAsUsedNonOvershadowedSegments(
dataSourceName,
IdUtils.getValidSegmentIds(dataSourceName, segmentIds)
);
}
};

Expand Down Expand Up @@ -188,7 +183,8 @@ public Response markSegmentsAsUnused(
final List<String> versions = payload.getVersions();
final int numUpdatedSegments;
if (interval != null) {
numUpdatedSegments = segmentsMetadataManager.markAsUnusedSegmentsInInterval(dataSourceName, interval, versions);
numUpdatedSegments = metadataStorageCoordinator
.markSegmentsWithinIntervalAsUnused(dataSourceName, interval, versions);
} else {
final Set<SegmentId> segmentIds = payload.getSegmentIds()
.stream()
Expand All @@ -197,7 +193,8 @@ public Response markSegmentsAsUnused(
.collect(Collectors.toSet());

// Filter out segmentIds that do not belong to this datasource
numUpdatedSegments = segmentsMetadataManager.markSegmentsAsUnused(
numUpdatedSegments = metadataStorageCoordinator.markSegmentsAsUnused(
dataSourceName,
segmentIds.stream()
.filter(segmentId -> segmentId.getDataSource().equals(dataSourceName))
.collect(Collectors.toSet())
Expand Down Expand Up @@ -240,7 +237,7 @@ public Response markSegmentAsUnused(
}

SegmentUpdateOperation operation =
() -> segmentsMetadataManager.markSegmentAsUnused(segmentId) ? 1 : 0;
() -> metadataStorageCoordinator.markSegmentAsUnused(segmentId) ? 1 : 0;
return performSegmentUpdate(dataSourceName, operation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ public void testSegmentIdMustNotBeReused()
coordinator.deletePendingSegmentsForTaskAllocatorId(task1.getDataSource(), task1.getTaskAllocatorId());

// Drop all segments
coordinator.markSegmentsAsUnusedWithinInterval(task0.getDataSource(), Intervals.ETERNITY);
coordinator.markSegmentsWithinIntervalAsUnused(task0.getDataSource(), Intervals.ETERNITY, null);

// Allocate another id and ensure that it doesn't exist in the druid_segments table
final SegmentIdWithShardSpec theId =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.assertj.core.api.Assertions;
import org.easymock.Capture;
import org.easymock.EasyMock;
Expand All @@ -62,7 +63,7 @@
@RunWith(Parameterized.class)
public class KillUnusedSegmentsTaskTest extends IngestionTestBase
{
private static final String DATA_SOURCE = "dataSource";
private static final String DATA_SOURCE = "wiki";

private TestTaskRunner taskRunner;

Expand Down Expand Up @@ -147,14 +148,15 @@ public void testKill() throws Exception
@Test
public void testKillSegmentsDeleteUnreferencedSiblings() throws Exception
{
final SegmentId nonExistentParent = segment3.getId();
final Map<String, String> upgradeSegmentMapping = ImmutableMap.of(
segment1.getId().toString(),
"nonExistentParent",
nonExistentParent.toString(),
segment2.getId().toString(),
"nonExistentParent"
nonExistentParent.toString()
);
insertUsedSegments(ImmutableSet.of(segment1, segment2), upgradeSegmentMapping);
getStorageCoordinator().markSegmentsAsUnusedWithinInterval(DATA_SOURCE, Intervals.ETERNITY);
getStorageCoordinator().markSegmentsWithinIntervalAsUnused(DATA_SOURCE, Intervals.ETERNITY, null);


final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
Expand Down Expand Up @@ -185,14 +187,15 @@ public void testKillSegmentsDeleteUnreferencedSiblings() throws Exception
@Test
public void testKillSegmentsDoNotDeleteReferencedSibling() throws Exception
{
final SegmentId nonExistentParent = segment3.getId();
final Map<String, String> upgradeSegmentMapping = ImmutableMap.of(
segment1.getId().toString(),
"nonExistentParent",
nonExistentParent.toString(),
segment2.getId().toString(),
"nonExistentParent"
nonExistentParent.toString()
);
insertUsedSegments(ImmutableSet.of(segment1, segment2), upgradeSegmentMapping);
getStorageCoordinator().markSegmentsAsUnusedWithinInterval(DATA_SOURCE, Intervals.ETERNITY);
getStorageCoordinator().markSegmentsWithinIntervalAsUnused(DATA_SOURCE, Intervals.ETERNITY, null);


final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,8 @@ public DataSegment apply(String input)
}
);

mdc.setUnusedSegments(expectedUnusedSegments);
mdc.commitSegments(Set.copyOf(expectedUnusedSegments), null);
expectedUnusedSegments.forEach(segment -> mdc.markSegmentAsUnused(segment.getId()));

// manually create local segments files
List<File> segmentFiles = new ArrayList<>();
Expand Down Expand Up @@ -849,7 +850,7 @@ public DataSegment apply(String input)
final TaskStatus status = runTask(killUnusedSegmentsTask);
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("merged statusCode", TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
Assert.assertEquals("num segments published", 3, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 3, mdc.getNuked().size());
Assert.assertEquals("delete segment batch call count", 2, mdc.getDeleteSegmentsCount());
Assert.assertTrue(
Expand Down Expand Up @@ -914,7 +915,8 @@ public DataSegment apply(String input)
}
);

mdc.setUnusedSegments(expectedUnusedSegments);
mdc.commitSegments(Set.copyOf(expectedUnusedSegments), null);
expectedUnusedSegments.forEach(segment -> mdc.markSegmentAsUnused(segment.getId()));

// manually create local segments files
List<File> segmentFiles = new ArrayList<>();
Expand Down Expand Up @@ -947,7 +949,7 @@ public DataSegment apply(String input)
final TaskStatus status = runTask(killUnusedSegmentsTask);
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("merged statusCode", TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
Assert.assertEquals("num segments published", 3, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", maxSegmentsToKill, mdc.getNuked().size());
Assert.assertTrue(
"expected unused segments get killed",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

package org.apache.druid.indexing.overlord.http;

import com.google.common.collect.ImmutableSet;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.rpc.indexing.SegmentUpdateResponse;
Expand Down Expand Up @@ -63,17 +63,21 @@ public class OverlordDataSourcesResourceTest
public void setup()
{
AuditManager auditManager = EasyMock.createStrictMock(AuditManager.class);
segmentsMetadataManager = new TestSegmentsMetadataManager();
final TestIndexerMetadataStorageCoordinator metadataStorageCoordinator
= new TestIndexerMetadataStorageCoordinator();
segmentsMetadataManager = metadataStorageCoordinator.getSegmentsMetadataManager();

TaskMaster taskMaster = new TaskMaster(null, null);
dataSourcesResource = new OverlordDataSourcesResource(
taskMaster,
segmentsMetadataManager,
metadataStorageCoordinator,
auditManager
);
taskMaster.becomeFullLeader();

WIKI_SEGMENTS_10X1D.forEach(segmentsMetadataManager::addSegment);
metadataStorageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_10X1D), null);
}

@Test
Expand Down Expand Up @@ -113,7 +117,7 @@ public void testMarkAllSegmentsAsUnused()
@Test
public void testMarkSegmentsAsUnused_bySegmentIds()
{
final Set<String> segmentIdsToUpdate = ImmutableSet.of(
final Set<String> segmentIdsToUpdate = Set.of(
WIKI_SEGMENTS_10X1D.get(0).getId().toString(),
WIKI_SEGMENTS_10X1D.get(8).getId().toString()
);
Expand Down Expand Up @@ -279,7 +283,7 @@ public void testMarkNonOvershadowedSegmentsAsUsed_bySegmentIds()
{
dataSourcesResource.markAllSegmentsAsUnused(TestDataSource.WIKI, createHttpServletRequest());

final Set<String> segmentIdsToUpdate = ImmutableSet.of(
final Set<String> segmentIdsToUpdate = Set.of(
WIKI_SEGMENTS_10X1D.get(0).getId().toString(),
WIKI_SEGMENTS_10X1D.get(1).getId().toString()
);
Expand Down
Loading
Loading