Skip to content

Commit

Permalink
Allow disabling client segment acknowledgment
Browse files Browse the repository at this point in the history
This allows for storage-wide expiration policies to be implemented
that doesn't require explicit API call to remove an expired segment.
  • Loading branch information
wendigo committed Nov 13, 2024
1 parent da9d3ea commit 2125ef9
Show file tree
Hide file tree
Showing 11 changed files with 73 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static Segment inlined(byte[] data, DataAttributes attributes)
return new InlineSegment(data, attributes);
}

public static Segment spooled(URI retrieveUri, URI ackUri, DataAttributes attributes, Map<String, List<String>> headers)
public static Segment spooled(URI retrieveUri, Optional<URI> ackUri, DataAttributes attributes, Map<String, List<String>> headers)
{
return new SpooledSegment(retrieveUri, ackUri, attributes, headers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -55,7 +56,7 @@ public InputStream load(SpooledSegment segment)
return loadFromURI(segment.getDataUri(), segment.getAckUri(), segment.getHeaders());
}

public InputStream loadFromURI(URI segmentUri, URI ackUri, Map<String, List<String>> headers)
public InputStream loadFromURI(URI segmentUri, Optional<URI> ackUri, Map<String, List<String>> headers)
throws IOException
{
Headers requestHeaders = toHeaders(headers);
Expand Down Expand Up @@ -99,7 +100,7 @@ public void onResponse(Call call, Response response)
});
}

private InputStream delegatingInputStream(Response response, InputStream delegate, URI ackUri, Headers headers)
private InputStream delegatingInputStream(Response response, InputStream delegate, Optional<URI> ackUri, Headers headers)
{
return new FilterInputStream(delegate)
{
Expand All @@ -108,7 +109,7 @@ public void close()
throws IOException
{
try (Response ignored = response; InputStream ignored2 = delegate) {
acknowledge(ackUri, headers);
ackUri.ifPresent(uri -> acknowledge(uri, headers));
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.MoreObjects.firstNonNull;
import static java.lang.String.format;
Expand All @@ -30,20 +31,20 @@ public final class SpooledSegment
extends Segment
{
private final URI dataUri;
private final Optional<URI> ackUri;
private final Map<String, List<String>> headers;
private final URI ackUri;

@JsonCreator
public SpooledSegment(
@JsonProperty("uri") URI dataUri,
@JsonProperty("ackUri") URI ackUri,
@JsonProperty("ackUri") Optional<URI> ackUri,
@JsonProperty("metadata") Map<String, Object> metadata,
@JsonProperty("headers") Map<String, List<String>> headers)
{
this(dataUri, ackUri, new DataAttributes(metadata), headers);
}

SpooledSegment(URI dataUri, URI ackUri, DataAttributes metadata, Map<String, List<String>> headers)
SpooledSegment(URI dataUri, Optional<URI> ackUri, DataAttributes metadata, Map<String, List<String>> headers)
{
super(metadata);
this.dataUri = requireNonNull(dataUri, "dataUri is null");
Expand All @@ -58,7 +59,7 @@ public URI getDataUri()
}

@JsonProperty("ackUri")
public URI getAckUri()
public Optional<URI> getAckUri()
{
return ackUri;
}
Expand All @@ -73,6 +74,6 @@ public Map<String, List<String>> getHeaders()
@Override
public String toString()
{
return format("SpooledSegment{offset=%d, rows=%d, size=%d, headers=%s}", getOffset(), getRowsCount(), getSegmentSize(), headers.keySet());
return format("SpooledSegment{offset=%d, rows=%d, size=%d, headers=%s, ack=%b}", getOffset(), getRowsCount(), getSegmentSize(), headers.keySet(), ackUri.isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ static class OutputSpoolingOperator
implements Operator
{
private final OutputSpoolingController controller;
private final boolean explicitAck;

enum State
{
Expand Down Expand Up @@ -165,6 +166,7 @@ public OutputSpoolingOperator(OperatorContext operatorContext, QueryDataEncoder
spoolingConfig.getMaximumSegmentSize().toBytes(),
spoolingConfig.getInitialSegmentSize().toBytes(),
spoolingConfig.getMaximumSegmentSize().toBytes());
this.explicitAck = spoolingConfig.isExplicitAck();
this.userMemoryContext = operatorContext.newLocalUserMemoryContext(OutputSpoolingOperator.class.getSimpleName());
this.queryDataEncoder = requireNonNull(queryDataEncoder, "queryDataEncoder is null");
this.spoolingManager = requireNonNull(spoolingManager, "spoolingManager is null");
Expand Down Expand Up @@ -278,7 +280,7 @@ private Page spool(List<Page> pages, boolean finished)
controller.recordEncoded(attributes.get(SEGMENT_SIZE, Integer.class));

// This page is small (hundreds of bytes) so there is no point in tracking its memory usage
return emptySingleRowPage(SpooledBlock.forLocation(spoolingManager.location(segmentHandle), attributes).serialize());
return emptySingleRowPage(SpooledBlock.forLocation(spoolingManager.location(segmentHandle), attributes, explicitAck).serialize());
}
catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ public class CoordinatorSegmentResource
private final SpoolingManager spoolingManager;
private final SegmentRetrievalMode retrievalMode;
private final InternalNodeManager nodeManager;
private final boolean explicitAck;

@Inject
public CoordinatorSegmentResource(SpoolingManager spoolingManager, SpoolingConfig config, InternalNodeManager nodeManager)
{
this.spoolingManager = requireNonNull(spoolingManager, "spoolingManager is null");
this.retrievalMode = requireNonNull(config, "config is null").getRetrievalMode();
this.explicitAck = config.isExplicitAck();
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
}

Expand Down Expand Up @@ -97,6 +99,12 @@ public Response download(@Context UriInfo uriInfo, @PathParam("identifier") Stri
public Response acknowledge(@PathParam("identifier") String identifier, @Context HttpHeaders headers)
throws IOException
{
if (!explicitAck) {
return Response.status(Response.Status.NOT_ACCEPTABLE)
.entity("Explicit segment acknowledgment is disabled")
.build();
}

try {
spoolingManager.acknowledge(handle(identifier, headers));
return Response.ok().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@
import static io.airlift.json.JsonCodec.listJsonCodec;
import static io.airlift.json.JsonCodec.mapJsonCodec;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.VarcharType.VARCHAR;

public record SpooledBlock(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers, DataAttributes attributes)
public record SpooledBlock(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers, DataAttributes attributes, boolean explicitAck)
{
private static final JsonCodec<Map<String, List<String>>> HEADERS_CODEC = mapJsonCodec(String.class, listJsonCodec(String.class));
private static final JsonCodec<DataAttributes> ATTRIBUTES_CODEC = JsonCodec.jsonCodec(DataAttributes.class);
Expand All @@ -47,7 +48,8 @@ public record SpooledBlock(Slice identifier, Optional<URI> directUri, Map<String
new RowType.Field(Optional.of("identifier"), VARCHAR),
new RowType.Field(Optional.of("directLocation"), VARCHAR),
new RowType.Field(Optional.of("headers"), VARCHAR),
new RowType.Field(Optional.of("metadata"), VARCHAR)));
new RowType.Field(Optional.of("metadata"), VARCHAR),
new RowType.Field(Optional.of("explicitAck"), BOOLEAN)));

public static final String SPOOLING_METADATA_COLUMN_NAME = "$spooling:metadata$";
public static final Symbol SPOOLING_METADATA_SYMBOL = new Symbol(SPOOLING_METADATA_TYPE, SPOOLING_METADATA_COLUMN_NAME);
Expand All @@ -63,29 +65,33 @@ public static SpooledBlock deserialize(Page page)
VARCHAR.getSlice(row.getRawFieldBlock(0), 0),
Optional.empty(), // Not a direct location
HEADERS_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(2), 0).toStringUtf8()),
ATTRIBUTES_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(3), 0).toStringUtf8()));
ATTRIBUTES_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(3), 0).toStringUtf8()),
BOOLEAN.getBoolean(row.getRawFieldBlock(4), 0));
}

return new SpooledBlock(
VARCHAR.getSlice(row.getRawFieldBlock(0), 0),
Optional.of(URI.create(VARCHAR.getSlice(row.getRawFieldBlock(1), 0).toStringUtf8())),
HEADERS_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(2), 0).toStringUtf8()),
ATTRIBUTES_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(3), 0).toStringUtf8()));
ATTRIBUTES_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(3), 0).toStringUtf8()),
BOOLEAN.getBoolean(row.getRawFieldBlock(4), 0));
}

public static SpooledBlock forLocation(SpooledLocation location, DataAttributes attributes)
public static SpooledBlock forLocation(SpooledLocation location, DataAttributes attributes, boolean explicitAck)
{
return switch (location) {
case DirectLocation directLocation -> new SpooledBlock(
directLocation.identifier(),
Optional.of(directLocation.directUri()),
directLocation.headers(),
attributes);
attributes,
explicitAck);
case CoordinatorLocation coordinatorLocation -> new SpooledBlock(
coordinatorLocation.identifier(),
Optional.empty(),
coordinatorLocation.headers(),
attributes);
attributes,
explicitAck);
};
}

Expand All @@ -109,6 +115,7 @@ void serialize(RowBlockBuilder rowBlockBuilder)
}
VARCHAR.writeSlice(rowEntryBuilder.get(2), utf8Slice(HEADERS_CODEC.toJson(headers)));
VARCHAR.writeSlice(rowEntryBuilder.get(3), utf8Slice(ATTRIBUTES_CODEC.toJson(attributes)));
BOOLEAN.writeBoolean(rowEntryBuilder.get(4), explicitAck);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;

import static io.trino.client.spooling.DataAttribute.ROWS_COUNT;
Expand Down Expand Up @@ -76,8 +77,9 @@ public QueryData produce(ExternalUriInfo uriInfo, Session session, QueryResultRo
.set(ROW_OFFSET, currentOffset)
.build();
builder.withSegment(spooled(
metadata.directUri().orElseGet(() -> buildSegmentDownloadURI(uriBuilder, metadata.identifier())),
buildSegmentAckURI(uriBuilder, metadata.identifier()),
metadata.directUri()
.orElseGet(() -> buildSegmentDownloadURI(uriBuilder, metadata.identifier())),
metadata.explicitAck() ? Optional.of(buildSegmentAckURI(uriBuilder, metadata.identifier())) : Optional.empty(),
attributes,
metadata.headers()));
currentOffset += attributes.get(ROWS_COUNT, Long.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class SpoolingConfig
private Optional<Duration> storageRedirectTtl = Optional.empty();

private boolean allowInlining = true;
private boolean explicitAck = true;
private long maximumInlinedRows = 1000;
private DataSize maximumInlinedSize = DataSize.of(128, KILOBYTE);
private DataSize initialSegmentSize = DataSize.of(8, MEGABYTE);
Expand Down Expand Up @@ -123,6 +124,19 @@ public SpoolingConfig setAllowInlining(boolean allowInlining)
return this;
}

public boolean isExplicitAck()
{
return explicitAck;
}

@ConfigDescription("Allow client to acknowledge segment retrieval and its eager removal")
@Config("protocol.spooling.explicit-ack.enabled")
public SpoolingConfig setExplicitAck(boolean explicitAck)
{
this.explicitAck = explicitAck;
return this;
}

public long getMaximumInlinedRows()
{
return maximumInlinedRows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.Set;

Expand Down Expand Up @@ -138,11 +139,11 @@ public void testSpooledQueryDataSerialization()
inlined("super".getBytes(UTF_8), dataAttributes(0, 100, 5)),
spooled(
URI.create("http://localhost:8080/v1/download/20160128_214710_00012_rk68b/segments/1"),
URI.create("http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/1"),
Optional.of(URI.create("http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/1")),
dataAttributes(100, 100, 1024), Map.of("x-amz-server-side-encryption", List.of("AES256"))),
spooled(
URI.create("http://localhost:8080/v1/download/20160128_214710_00012_rk68b/segments/2"),
URI.create("http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/2"),
Optional.empty(),
dataAttributes(200, 100, 1024), Map.of("x-amz-server-side-encryption", List.of("AES256")))))
.withAttributes(DataAttributes.builder()
.set(SCHEMA, "serializedSchema")
Expand Down Expand Up @@ -179,7 +180,6 @@ public void testSpooledQueryDataSerialization()
{
"type": "spooled",
"uri": "http://localhost:8080/v1/download/20160128_214710_00012_rk68b/segments/2",
"ackUri": "http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/2",
"metadata": {
"rowOffset": 200,
"rowsCount": 100,
Expand All @@ -199,9 +199,15 @@ public void testEncodedQueryDataToString()

EncodedQueryData spooledQueryData = new EncodedQueryData("json+zstd", ImmutableMap.of("decryption_key", "secret"), ImmutableList.of(spooled(
URI.create("http://coordinator:8080/v1/segments/uuid"),
Optional.of(URI.create("http://coordinator:8080/v1/segments/uuid")),
dataAttributes(10, 2, 1256), headers())));
assertThat(spooledQueryData.toString()).isEqualTo("EncodedQueryData{encoding=json+zstd, segments=[SpooledSegment{offset=10, rows=2, size=1256, headers=[x-amz-server-side-encryption], ack=true}], metadata=[decryption_key]}");

EncodedQueryData spooledQueryDataWithoutAck = new EncodedQueryData("json+zstd", ImmutableMap.of("decryption_key", "secret"), ImmutableList.of(spooled(
URI.create("http://coordinator:8080/v1/segments/uuid"),
Optional.empty(),
dataAttributes(10, 2, 1256), headers())));
assertThat(spooledQueryData.toString()).isEqualTo("EncodedQueryData{encoding=json+zstd, segments=[SpooledSegment{offset=10, rows=2, size=1256, headers=[x-amz-server-side-encryption]}], metadata=[decryption_key]}");
assertThat(spooledQueryDataWithoutAck.toString()).isEqualTo("EncodedQueryData{encoding=json+zstd, segments=[SpooledSegment{offset=10, rows=2, size=1256, headers=[x-amz-server-side-encryption], ack=false}], metadata=[decryption_key]}");
}

private void testRoundTrip(QueryData queryData, String expectedDataRepresentation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,31 +55,31 @@ public void verifySerialization(Slice identifier, Optional<URI> directUri, Map<S

public void verifySerializationRoundTrip(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers)
{
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1200));
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1200), true);
Page page = new Page(metadata.serialize());
SpooledBlock retrieved = SpooledBlock.deserialize(page);
assertThat(metadata).isEqualTo(retrieved);
}

private void verifySerializationRoundTripWithNonEmptyPage(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers)
{
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1100));
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1100), false);
Page page = new Page(blockWithPositions(1, true), metadata.serialize());
SpooledBlock retrieved = SpooledBlock.deserialize(page);
assertThat(metadata).isEqualTo(retrieved);
}

private void verifyThrowsErrorOnNonNullPositions(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers)
{
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(20, 1200));
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(20, 1200), true);

assertThatThrownBy(() -> SpooledBlock.deserialize(new Page(blockWithPositions(1, false), metadata.serialize())))
.hasMessage("Spooling metadata block must have all but last channels null");
}

private void verifyThrowsErrorOnMultiplePositions(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers)
{
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(30, 1300));
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(30, 1300), false);
RowBlockBuilder rowBlockBuilder = SPOOLING_METADATA_TYPE.createBlockBuilder(null, 2);
metadata.serialize(rowBlockBuilder);
metadata.serialize(rowBlockBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public void testDefaults()
.setMaximumSegmentSize(DataSize.of(16, MEGABYTE))
.setMaximumInlinedRows(1000)
.setMaximumInlinedSize(DataSize.of(128, KILOBYTE))
.setAllowInlining(true));
.setAllowInlining(true)
.setExplicitAck(true));
}

@Test
Expand All @@ -57,6 +58,7 @@ public void testExplicitPropertyMappings()
.put("protocol.spooling.retrieval-mode", "coordinator_storage_redirect")
.put("protocol.spooling.coordinator-storage-redirect-ttl", "60s")
.put("protocol.spooling.inlining.enabled", "false")
.put("protocol.spooling.explicit-ack.enabled", "false")
.put("protocol.spooling.initial-segment-size", "1kB")
.put("protocol.spooling.maximum-segment-size", "8kB")
.put("protocol.spooling.inlining.max-rows", "1024")
Expand All @@ -71,7 +73,8 @@ public void testExplicitPropertyMappings()
.setMaximumSegmentSize(DataSize.of(8, KILOBYTE))
.setMaximumInlinedRows(1024)
.setMaximumInlinedSize(DataSize.of(1, MEGABYTE))
.setAllowInlining(false);
.setAllowInlining(false)
.setExplicitAck(false);

assertFullMapping(properties, expected);
}
Expand Down

0 comments on commit 2125ef9

Please sign in to comment.