From 1bbfec08e48799998cc034f22281af7717bafee3 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 19 Dec 2024 16:31:51 +0100 Subject: [PATCH 1/8] [8.x] Handle empty index case in LuceneSyntheticSourceChangesSnapshot (#119089) Backport of #118996 to 8.x branch. In case of synthetic recovery source when the mapping is empty. A test that reproduces failure in #118955 consistently with a potential fix. `MapperService#updateMapping(...)` doesn't set the mapper field if a mapping has no fields, which is what is used in InternalEngine#newChangesSnapshot(...) . This happens when `newMappingMetadata` variable in `MapperService updateMapping(...)` is `null`. Causing an assertion to trip. This change adjusts that assertion to handle an empty index. Closes #118955 --- .../indices.create/20_synthetic_source.yml | 10 +++--- .../indices.recovery/20_synthetic_source.yml | 33 +++++++++++++++++++ .../elasticsearch/index/IndexSettings.java | 3 ++ .../LuceneSyntheticSourceChangesSnapshot.java | 5 +-- 4 files changed, 43 insertions(+), 8 deletions(-) create mode 100644 rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.recovery/20_synthetic_source.yml diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.create/20_synthetic_source.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.create/20_synthetic_source.yml index cd2471eda0a0a..37f4fcc957f72 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.create/20_synthetic_source.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.create/20_synthetic_source.yml @@ -2036,14 +2036,12 @@ create index with use_synthetic_source: - is_true: test.settings.index.recovery.use_synthetic_source - do: - bulk: + index: index: test + id: 1 refresh: true - body: - - '{ "create": { } }' - - '{ "field": "aaaa" }' - - '{ "create": { } }' - - '{ "field": "bbbb" }' + body: { foo: bar } + - match: { _version: 1 } - do: indices.disk_usage: diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.recovery/20_synthetic_source.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.recovery/20_synthetic_source.yml new file mode 100644 index 0000000000000..493b834fc5a90 --- /dev/null +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.recovery/20_synthetic_source.yml @@ -0,0 +1,33 @@ +--- +test recovery empty index with use_synthetic_source: + - requires: + cluster_features: ["mapper.synthetic_recovery_source"] + reason: requires synthetic recovery source + + - do: + indices.create: + index: test + body: + settings: + index: + number_of_replicas: 0 + recovery: + use_synthetic_source: true + mapping: + source: + mode: synthetic + + - do: + indices.get_settings: {} + - match: { test.settings.index.mapping.source.mode: synthetic} + - is_true: test.settings.index.recovery.use_synthetic_source + + - do: + indices.put_settings: + index: test + body: + index.number_of_replicas: 1 + + - do: + cluster.health: + wait_for_events: languid diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 02617b7dc376d..82e3142f21a03 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -1036,6 +1036,9 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti recoverySourceEnabled = RecoverySettings.INDICES_RECOVERY_SOURCE_ENABLED_SETTING.get(nodeSettings); recoverySourceSyntheticEnabled = scopedSettings.get(RECOVERY_USE_SYNTHETIC_SOURCE_SETTING); if (recoverySourceSyntheticEnabled) { + if (DiscoveryNode.isStateless(settings)) { + throw new IllegalArgumentException("synthetic recovery source is only allowed in stateful"); + } // Verify that all nodes can handle this setting if (version.before(IndexVersions.USE_SYNTHETIC_SOURCE_FOR_RECOVERY_BACKPORT)) { throw new IllegalArgumentException( diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java index f21a3c06ab015..08508103181ed 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java @@ -77,12 +77,13 @@ public LuceneSyntheticSourceChangesSnapshot( IndexVersion indexVersionCreated ) throws IOException { super(engineSearcher, searchBatchSize, fromSeqNo, toSeqNo, requiredFullRange, accessStats, indexVersionCreated); - assert mappingLookup.isSourceSynthetic(); + // a MapperService#updateMapping(...) of empty index may not have been invoked and then mappingLookup is empty + assert engineSearcher.getDirectoryReader().maxDoc() == 0 || mappingLookup.isSourceSynthetic() + : "either an empty index or synthetic source must be enabled for proper functionality."; // ensure we can buffer at least one document this.maxMemorySizeInBytes = maxMemorySizeInBytes > 0 ? maxMemorySizeInBytes : 1; this.sourceLoader = mappingLookup.newSourceLoader(null, SourceFieldMetrics.NOOP); Set storedFields = sourceLoader.requiredStoredFields(); - assert mappingLookup.isSourceSynthetic() : "synthetic source must be enabled for proper functionality."; this.storedFieldLoader = StoredFieldLoader.create(false, storedFields); this.lastSeenSeqNo = fromSeqNo - 1; } From f281ba9287ac634c4813a01e3512e720ca65f4ad Mon Sep 17 00:00:00 2001 From: Salvatore Campagna <93581129+salvatore-campagna@users.noreply.github.com> Date: Thu, 19 Dec 2024 16:32:15 +0100 Subject: [PATCH 2/8] [8.x] Replace encoder with url encoder (#116699) (#119079) Document IDs are frequently used in HTTP requests, such as `GET /index/_doc/{id}`, where they must be URL-safe to avoid issues with invalid characters. This change ensures that IDs generated by `TimeBasedKOrderedUUIDGenerator` are properly Base64 URL-encoded, free of characters that could break URLs. We also test that no IDs include invalid characters like +, /, or = to guarantee they are fully compliant with URL-safe requirements. Moreover `TimeBasedKOrderedUUIDGenerator` and `TimeBasedUUIDGenerator` are refactored to allow injection of dependencies which enables us to increase test coverage by including tests for high-throughput scenarios, sequence id overflow and unreliable clocks usage. --- .../TimeBasedKOrderedUUIDGenerator.java | 18 +- .../common/TimeBasedUUIDGenerator.java | 34 ++- .../java/org/elasticsearch/common/UUIDs.java | 24 +- .../common/TimeBasedUUIDGeneratorTests.java | 270 ++++++++++++++++++ .../org/elasticsearch/common/UUIDTests.java | 53 ++-- 5 files changed, 356 insertions(+), 43 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/common/TimeBasedUUIDGeneratorTests.java diff --git a/server/src/main/java/org/elasticsearch/common/TimeBasedKOrderedUUIDGenerator.java b/server/src/main/java/org/elasticsearch/common/TimeBasedKOrderedUUIDGenerator.java index 9c97cb8fe7e85..1c8307dbddede 100644 --- a/server/src/main/java/org/elasticsearch/common/TimeBasedKOrderedUUIDGenerator.java +++ b/server/src/main/java/org/elasticsearch/common/TimeBasedKOrderedUUIDGenerator.java @@ -11,6 +11,7 @@ import java.nio.ByteBuffer; import java.util.Base64; +import java.util.function.Supplier; /** * Generates a base64-encoded, k-ordered UUID string optimized for compression and efficient indexing. @@ -28,18 +29,27 @@ * The result is a compact base64-encoded string, optimized for efficient compression of the _id field in an inverted index. */ public class TimeBasedKOrderedUUIDGenerator extends TimeBasedUUIDGenerator { - private static final Base64.Encoder BASE_64_NO_PADDING = Base64.getEncoder().withoutPadding(); + + private static final Base64.Encoder BASE_64_NO_PADDING_URL_ENCODER = Base64.getUrlEncoder().withoutPadding(); + + public TimeBasedKOrderedUUIDGenerator( + final Supplier timestampSupplier, + final Supplier sequenceIdSupplier, + final Supplier macAddressSupplier + ) { + super(timestampSupplier, sequenceIdSupplier, macAddressSupplier); + } @Override public String getBase64UUID() { - final int sequenceId = this.sequenceNumber.incrementAndGet() & 0x00FF_FFFF; + final int sequenceId = sequenceNumber.incrementAndGet() & 0x00FF_FFFF; // Calculate timestamp to ensure ordering and avoid backward movement in case of time shifts. // Uses AtomicLong to guarantee that timestamp increases even if the system clock moves backward. // If the sequenceId overflows (reaches 0 within the same millisecond), the timestamp is incremented // to ensure strict ordering. long timestamp = this.lastTimestamp.accumulateAndGet( - currentTimeMillis(), + timestampSupplier.get(), sequenceId == 0 ? (lastTimestamp, currentTimeMillis) -> Math.max(lastTimestamp, currentTimeMillis) + 1 : Math::max ); @@ -68,6 +78,6 @@ public String getBase64UUID() { assert buffer.position() == uuidBytes.length; - return BASE_64_NO_PADDING.encodeToString(uuidBytes); + return BASE_64_NO_PADDING_URL_ENCODER.encodeToString(uuidBytes); } } diff --git a/server/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java b/server/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java index 00d031a835ef1..36b5e0c3697ca 100644 --- a/server/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java +++ b/server/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java @@ -12,6 +12,7 @@ import java.util.Base64; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; /** * These are essentially flake ids but we use 6 (not 8) bytes for timestamp, and use 3 (not 2) bytes for sequence number. We also reorder @@ -20,36 +21,41 @@ * For more information about flake ids, check out * https://archive.fo/2015.07.08-082503/http://www.boundary.com/blog/2012/01/flake-a-decentralized-k-ordered-unique-id-generator-in-erlang/ */ - class TimeBasedUUIDGenerator implements UUIDGenerator { // We only use bottom 3 bytes for the sequence number. Paranoia: init with random int so that if JVM/OS/machine goes down, clock slips // backwards, and JVM comes back up, we are less likely to be on the same sequenceNumber at the same time: - protected final AtomicInteger sequenceNumber = new AtomicInteger(SecureRandomHolder.INSTANCE.nextInt()); + protected final AtomicInteger sequenceNumber; + protected final AtomicLong lastTimestamp; - // Used to ensure clock moves forward: - protected final AtomicLong lastTimestamp = new AtomicLong(0); + protected final Supplier timestampSupplier; private static final byte[] SECURE_MUNGED_ADDRESS = MacAddressProvider.getSecureMungedAddress(); static { assert SECURE_MUNGED_ADDRESS.length == 6; } - private static final Base64.Encoder BASE_64_NO_PADDING = Base64.getUrlEncoder().withoutPadding(); - - // protected for testing - protected long currentTimeMillis() { - return System.currentTimeMillis(); + static final int SIZE_IN_BYTES = 15; + private final byte[] macAddress; + + TimeBasedUUIDGenerator( + final Supplier timestampSupplier, + final Supplier sequenceIdSupplier, + final Supplier macAddressSupplier + ) { + this.timestampSupplier = timestampSupplier; + // NOTE: getting the mac address every time using the supplier is expensive, hence we cache it. + this.macAddress = macAddressSupplier.get(); + this.sequenceNumber = new AtomicInteger(sequenceIdSupplier.get()); + // Used to ensure clock moves forward: + this.lastTimestamp = new AtomicLong(0); } - // protected for testing protected byte[] macAddress() { - return SECURE_MUNGED_ADDRESS; + return macAddress; } - static final int SIZE_IN_BYTES = 15; - @Override public String getBase64UUID() { final int sequenceId = sequenceNumber.incrementAndGet() & 0xffffff; @@ -58,7 +64,7 @@ public String getBase64UUID() { // still vulnerable if we are shut down, clock goes backwards, and we restart... for this we // randomize the sequenceNumber on init to decrease chance of collision: long timestamp = this.lastTimestamp.accumulateAndGet( - currentTimeMillis(), + timestampSupplier.get(), // Always force the clock to increment whenever sequence number is 0, in case we have a long // time-slip backwards: sequenceId == 0 ? (lastTimestamp, currentTimeMillis) -> Math.max(lastTimestamp, currentTimeMillis) + 1 : Math::max diff --git a/server/src/main/java/org/elasticsearch/common/UUIDs.java b/server/src/main/java/org/elasticsearch/common/UUIDs.java index 0f73b8172c10f..ebcb375bc01bc 100644 --- a/server/src/main/java/org/elasticsearch/common/UUIDs.java +++ b/server/src/main/java/org/elasticsearch/common/UUIDs.java @@ -12,13 +12,29 @@ import org.elasticsearch.common.settings.SecureString; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +/** + * Utility class for generating various types of UUIDs. + */ public class UUIDs { + private static final AtomicInteger sequenceNumber = new AtomicInteger(SecureRandomHolder.INSTANCE.nextInt()); + public static final Supplier DEFAULT_TIMESTAMP_SUPPLIER = System::currentTimeMillis; + public static final Supplier DEFAULT_SEQUENCE_ID_SUPPLIER = sequenceNumber::incrementAndGet; + public static final Supplier DEFAULT_MAC_ADDRESS_SUPPLIER = MacAddressProvider::getSecureMungedAddress; + private static final UUIDGenerator RANDOM_UUID_GENERATOR = new RandomBasedUUIDGenerator(); + private static final UUIDGenerator TIME_BASED_K_ORDERED_GENERATOR = new TimeBasedKOrderedUUIDGenerator( + DEFAULT_TIMESTAMP_SUPPLIER, + DEFAULT_SEQUENCE_ID_SUPPLIER, + DEFAULT_MAC_ADDRESS_SUPPLIER + ); - private static final RandomBasedUUIDGenerator RANDOM_UUID_GENERATOR = new RandomBasedUUIDGenerator(); - - private static final UUIDGenerator TIME_BASED_K_ORDERED_GENERATOR = new TimeBasedKOrderedUUIDGenerator(); - private static final UUIDGenerator TIME_UUID_GENERATOR = new TimeBasedUUIDGenerator(); + private static final UUIDGenerator TIME_UUID_GENERATOR = new TimeBasedUUIDGenerator( + DEFAULT_TIMESTAMP_SUPPLIER, + DEFAULT_SEQUENCE_ID_SUPPLIER, + DEFAULT_MAC_ADDRESS_SUPPLIER + ); /** * The length of a UUID string generated by {@link #base64UUID}. diff --git a/server/src/test/java/org/elasticsearch/common/TimeBasedUUIDGeneratorTests.java b/server/src/test/java/org/elasticsearch/common/TimeBasedUUIDGeneratorTests.java new file mode 100644 index 0000000000000..964683a1972ba --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/TimeBasedUUIDGeneratorTests.java @@ -0,0 +1,270 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common; + +import org.elasticsearch.test.ESTestCase; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Base64; +import java.util.HashSet; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.IntStream; + +public class TimeBasedUUIDGeneratorTests extends ESTestCase { + + public void testTimeBasedUUIDGeneration() { + assertUUIDFormat(createGenerator(() -> Instant.now().toEpochMilli(), () -> 0, new TestRandomMacAddressSupplier()), 100_000); + } + + public void testTimeBasedUUIDUniqueness() { + assertUUIDUniqueness(createGenerator(() -> Instant.now().toEpochMilli(), () -> 0, new TestRandomMacAddressSupplier()), 100_000); + } + + public void testTimeBasedUUIDSequenceOverflow() { + // The assumption here is that our system will not generate more than 1000 UUIDs within the same millisecond. + // The sequence ID is set close to its max value (0x00FF_FFFF) to quickly trigger an overflow. + // However, since we are generating only 1000 UUIDs, the timestamp is expected to change at least once, + // ensuring uniqueness even if the sequence ID wraps around. + assertEquals( + 1000, + generateUUIDs( + createGenerator(() -> Instant.now().toEpochMilli(), () -> 0x00FF_FFFF - 10, new TestRandomMacAddressSupplier()), + 1000 + ).size() + ); + } + + public void testTimeBasedUUIDClockReset() { + // Simulate a clock that resets itself after reaching a threshold. + final Supplier unreliableClock = new TestClockResetTimestampSupplier( + Instant.now(), + 1, + 50, + ChronoUnit.MILLIS, + Instant.now().plus(100, ChronoUnit.MILLIS) + ); + final UUIDGenerator generator = createGenerator(unreliableClock, () -> 0, new TestRandomMacAddressSupplier()); + + final Set beforeReset = generateUUIDs(generator, 5_000); + final Set afterReset = generateUUIDs(generator, 5_000); + + // Ensure all UUIDs are unique, even after the clock resets. + assertEquals(5_000, beforeReset.size()); + assertEquals(5_000, afterReset.size()); + beforeReset.addAll(afterReset); + assertEquals(10_000, beforeReset.size()); + } + + public void testKOrderedUUIDGeneration() { + assertUUIDFormat(createKOrderedGenerator(() -> Instant.now().toEpochMilli(), () -> 0, new TestRandomMacAddressSupplier()), 100_000); + } + + public void testKOrderedUUIDUniqueness() { + assertUUIDUniqueness( + createKOrderedGenerator(() -> Instant.now().toEpochMilli(), () -> 0, new TestRandomMacAddressSupplier()), + 100_000 + ); + } + + public void testKOrderedUUIDSequenceOverflow() { + final UUIDGenerator generator = createKOrderedGenerator( + () -> Instant.now().toEpochMilli(), + () -> 0x00FF_FFFF - 10, + new TestRandomMacAddressSupplier() + ); + final Set uuids = generateUUIDs(generator, 1000); + + // The assumption here is that our system will not generate more than 1000 UUIDs within the same millisecond. + // The sequence ID is set close to its max value (0x00FF_FFFF) to quickly trigger an overflow. + // However, since we are generating only 1000 UUIDs, the timestamp is expected to change at least once, + // ensuring uniqueness even if the sequence ID wraps around. + assertEquals(1000, uuids.size()); + } + + public void testUUIDEncodingDecoding() { + testUUIDEncodingDecodingHelper( + Instant.parse("2024-11-13T10:12:43Z").toEpochMilli(), + 12345, + new TestRandomMacAddressSupplier().get() + ); + } + + public void testUUIDEncodingDecodingWithRandomValues() { + testUUIDEncodingDecodingHelper( + randomInstantBetween(Instant.now().minus(1, ChronoUnit.DAYS), Instant.now()).toEpochMilli(), + randomIntBetween(0, 0x00FF_FFFF), + new TestRandomMacAddressSupplier().get() + ); + } + + private void testUUIDEncodingDecodingHelper(final long timestamp, final int sequenceId, final byte[] macAddress) { + final TestTimeBasedKOrderedUUIDDecoder decoder = new TestTimeBasedKOrderedUUIDDecoder( + createKOrderedGenerator(() -> timestamp, () -> sequenceId, () -> macAddress).getBase64UUID() + ); + + // The sequence ID is incremented by 1 when generating the UUID. + assertEquals("Sequence ID does not match", sequenceId + 1, decoder.decodeSequenceId()); + // Truncate the timestamp to milliseconds to match the UUID generation granularity. + assertEquals( + "Timestamp does not match", + Instant.ofEpochMilli(timestamp).truncatedTo(ChronoUnit.MILLIS), + Instant.ofEpochMilli(decoder.decodeTimestamp()).truncatedTo(ChronoUnit.MILLIS) + ); + assertArrayEquals("MAC address does not match", macAddress, decoder.decodeMacAddress()); + } + + private void assertUUIDUniqueness(final UUIDGenerator generator, final int count) { + assertEquals(count, generateUUIDs(generator, count).size()); + } + + private Set generateUUIDs(final UUIDGenerator generator, final int count) { + return IntStream.range(0, count).mapToObj(i -> generator.getBase64UUID()).collect(HashSet::new, Set::add, Set::addAll); + } + + private void assertUUIDFormat(final UUIDGenerator generator, final int count) { + IntStream.range(0, count).forEach(i -> { + final String uuid = generator.getBase64UUID(); + assertNotNull(uuid); + assertEquals(20, uuid.length()); + assertFalse(uuid.contains("+")); + assertFalse(uuid.contains("/")); + assertFalse(uuid.contains("=")); + }); + } + + private UUIDGenerator createGenerator( + final Supplier timestampSupplier, + final Supplier sequenceIdSupplier, + final Supplier macAddressSupplier + ) { + return new TimeBasedUUIDGenerator(timestampSupplier, sequenceIdSupplier, macAddressSupplier); + } + + private UUIDGenerator createKOrderedGenerator( + final Supplier timestampSupplier, + final Supplier sequenceIdSupplier, + final Supplier macAddressSupplier + ) { + return new TimeBasedKOrderedUUIDGenerator(timestampSupplier, sequenceIdSupplier, macAddressSupplier); + } + + private static class TestRandomMacAddressSupplier implements Supplier { + private final byte[] macAddress = new byte[] { randomByte(), randomByte(), randomByte(), randomByte(), randomByte(), randomByte() }; + + @Override + public byte[] get() { + return macAddress; + } + } + + /** + * A {@link Supplier} implementation that simulates a clock that can move forward or backward in time. + * This supplier provides timestamps in milliseconds since the epoch, adjusting based on a given delta + * until a reset threshold is reached. After crossing the threshold, the timestamp moves backwards by a reset delta. + */ + private static class TestClockResetTimestampSupplier implements Supplier { + private Instant currentTime; + private final long delta; + private final long resetDelta; + private final ChronoUnit unit; + private final Instant resetThreshold; + + /** + * Constructs a new {@link TestClockResetTimestampSupplier}. + * + * @param startTime The initial starting time. + * @param delta The amount of time to add to the current time in each forward step. + * @param resetDelta The amount of time to subtract once the reset threshold is reached. + * @param unit The unit of time for both delta and resetDelta. + * @param resetThreshold The threshold after which the time is reset backwards. + */ + TestClockResetTimestampSupplier( + final Instant startTime, + final long delta, + final long resetDelta, + final ChronoUnit unit, + final Instant resetThreshold + ) { + this.currentTime = startTime; + this.delta = delta; + this.resetDelta = resetDelta; + this.unit = unit; + this.resetThreshold = resetThreshold; + } + + /** + * Provides the next timestamp in milliseconds since the epoch. + * If the current time is before the reset threshold, it advances the time by the delta. + * Otherwise, it subtracts the reset delta. + * + * @return The current time in milliseconds since the epoch. + */ + @Override + public Long get() { + if (currentTime.isBefore(resetThreshold)) { + currentTime = currentTime.plus(delta, unit); + } else { + currentTime = currentTime.minus(resetDelta, unit); + } + return currentTime.toEpochMilli(); + } + } + + /** + * A utility class to decode the K-ordered UUID extracting the original timestamp, MAC address and sequence ID. + */ + private static class TestTimeBasedKOrderedUUIDDecoder { + + private final byte[] decodedBytes; + + /** + * Constructs a new {@link TestTimeBasedKOrderedUUIDDecoder} using a base64-encoded UUID string. + * + * @param base64UUID The base64-encoded UUID string to decode. + */ + TestTimeBasedKOrderedUUIDDecoder(final String base64UUID) { + this.decodedBytes = Base64.getUrlDecoder().decode(base64UUID); + } + + /** + * Decodes the timestamp from the UUID using the following bytes: + * 0 (most significant), 1, 2, 3, 11, 13 (least significant). + * + * @return The decoded timestamp in milliseconds. + */ + public long decodeTimestamp() { + return ((long) (decodedBytes[0] & 0xFF) << 40) | ((long) (decodedBytes[1] & 0xFF) << 32) | ((long) (decodedBytes[2] & 0xFF) + << 24) | ((long) (decodedBytes[3] & 0xFF) << 16) | ((long) (decodedBytes[11] & 0xFF) << 8) | (decodedBytes[13] & 0xFF); + } + + /** + * Decodes the MAC address from the UUID using bytes 4 to 9. + * + * @return The decoded MAC address as a byte array. + */ + public byte[] decodeMacAddress() { + byte[] macAddress = new byte[6]; + System.arraycopy(decodedBytes, 4, macAddress, 0, 6); + return macAddress; + } + + /** + * Decodes the sequence ID from the UUID using bytes: + * 10 (most significant), 12 (middle), 14 (least significant). + * + * @return The decoded sequence ID. + */ + public int decodeSequenceId() { + return ((decodedBytes[10] & 0xFF) << 16) | ((decodedBytes[12] & 0xFF) << 8) | (decodedBytes[14] & 0xFF); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/common/UUIDTests.java b/server/src/test/java/org/elasticsearch/common/UUIDTests.java index 9fbeaf1c6c081..71c705f5df511 100644 --- a/server/src/test/java/org/elasticsearch/common/UUIDTests.java +++ b/server/src/test/java/org/elasticsearch/common/UUIDTests.java @@ -27,26 +27,37 @@ import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; +import java.util.Base64; import java.util.HashSet; import java.util.Random; import java.util.Set; +import java.util.function.Supplier; public class UUIDTests extends ESTestCase { - static UUIDGenerator timeUUIDGen = new TimeBasedUUIDGenerator(); + static final Base64.Decoder BASE_64_URL_DECODER = Base64.getUrlDecoder(); + static UUIDGenerator timeUUIDGen = new TimeBasedUUIDGenerator( + UUIDs.DEFAULT_TIMESTAMP_SUPPLIER, + UUIDs.DEFAULT_SEQUENCE_ID_SUPPLIER, + UUIDs.DEFAULT_MAC_ADDRESS_SUPPLIER + ); static UUIDGenerator randomUUIDGen = new RandomBasedUUIDGenerator(); - static UUIDGenerator kOrderedUUIDGen = new TimeBasedKOrderedUUIDGenerator(); + static UUIDGenerator kOrderedUUIDGen = new TimeBasedKOrderedUUIDGenerator( + UUIDs.DEFAULT_TIMESTAMP_SUPPLIER, + UUIDs.DEFAULT_SEQUENCE_ID_SUPPLIER, + UUIDs.DEFAULT_MAC_ADDRESS_SUPPLIER + ); public void testRandomUUID() { - verifyUUIDSet(100000, randomUUIDGen); + verifyUUIDSet(100000, randomUUIDGen).forEach(this::verifyUUIDIsUrlSafe); } public void testTimeUUID() { - verifyUUIDSet(100000, timeUUIDGen); + verifyUUIDSet(100000, timeUUIDGen).forEach(this::verifyUUIDIsUrlSafe); } public void testKOrderedUUID() { - verifyUUIDSet(100000, kOrderedUUIDGen); + verifyUUIDSet(100000, kOrderedUUIDGen).forEach(this::verifyUUIDIsUrlSafe); } public void testThreadedRandomUUID() { @@ -143,6 +154,7 @@ public void testUUIDThreaded(UUIDGenerator uuidSource) { globalSet.addAll(runner.uuidSet); } assertEquals(count * uuids, globalSet.size()); + globalSet.forEach(this::verifyUUIDIsUrlSafe); } private static double testCompression(final UUIDGenerator generator, int numDocs, int numDocsPerSecond, int numNodes, Logger logger) @@ -158,35 +170,25 @@ private static double testCompression(final UUIDGenerator generator, int numDocs UUIDGenerator uuidSource = generator; if (generator instanceof TimeBasedUUIDGenerator) { if (generator instanceof TimeBasedKOrderedUUIDGenerator) { - uuidSource = new TimeBasedKOrderedUUIDGenerator() { + uuidSource = new TimeBasedKOrderedUUIDGenerator(new Supplier<>() { double currentTimeMillis = TestUtil.nextLong(random(), 0L, 10000000000L); @Override - protected long currentTimeMillis() { + public Long get() { currentTimeMillis += intervalBetweenDocs * 2 * r.nextDouble(); return (long) currentTimeMillis; } - - @Override - protected byte[] macAddress() { - return RandomPicks.randomFrom(r, macAddresses); - } - }; + }, () -> 0, () -> RandomPicks.randomFrom(r, macAddresses)); } else { - uuidSource = new TimeBasedUUIDGenerator() { + uuidSource = new TimeBasedUUIDGenerator(new Supplier<>() { double currentTimeMillis = TestUtil.nextLong(random(), 0L, 10000000000L); @Override - protected long currentTimeMillis() { + public Long get() { currentTimeMillis += intervalBetweenDocs * 2 * r.nextDouble(); return (long) currentTimeMillis; } - - @Override - protected byte[] macAddress() { - return RandomPicks.randomFrom(r, macAddresses); - } - }; + }, () -> 0, () -> RandomPicks.randomFrom(r, macAddresses)); } } @@ -237,4 +239,13 @@ public void testStringLength() { private static int getUnpaddedBase64StringLength(int sizeInBytes) { return (int) Math.ceil(sizeInBytes * 4.0 / 3.0); } + + private void verifyUUIDIsUrlSafe(final String uuid) { + assertFalse("UUID should not contain padding characters: " + uuid, uuid.contains("=")); + try { + BASE_64_URL_DECODER.decode(uuid); + } catch (IllegalArgumentException e) { + throw new AssertionError("UUID is not a valid Base64 URL-safe encoded string: " + uuid); + } + } } From 0103a97e80fc77576201ae5d2b5b405fa30a6e53 Mon Sep 17 00:00:00 2001 From: Mary Gouseti Date: Thu, 19 Dec 2024 17:35:04 +0200 Subject: [PATCH 3/8] [8.x] Failure store - Reconciliate failure indices during snapshotting (#118834) (#119077) In this PR we reconciliate the failure indices of a data stream just like we do for the backing indices. The only difference is that a data stream can have an empty list of failure indices, while it cannot have an empty list of backing indices. An easy way to create a situation where certain backing or failure indices are not included in a snapshot is via using exclusions in the multi-target expression of the snapshot. For example: ``` PUT /_snapshot/my_repository/my-snapshot?wait_for_completion=true { "indices": "my-ds*", "-.fs-my-ds-000001" } ``` Backport of #118834 --- .../datastreams/DataStreamsSnapshotsIT.java | 286 +++++++++--------- .../cluster/metadata/DataStream.java | 49 ++- .../snapshots/SnapshotsService.java | 10 +- .../cluster/metadata/DataStreamTests.java | 66 +++- 4 files changed, 229 insertions(+), 182 deletions(-) diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java index 0df4b599ab272..8f45d054f01a6 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java @@ -58,7 +58,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -73,6 +72,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase { @@ -80,6 +80,8 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase { private static final Map DOCUMENT_SOURCE = Collections.singletonMap("@timestamp", 123); public static final String REPO = "repo"; public static final String SNAPSHOT = "snap"; + public static final String TEMPLATE_1_ID = "t1"; + public static final String TEMPLATE_2_ID = "t2"; private Client client; private String dsBackingIndexName; @@ -103,8 +105,8 @@ public void setup() throws Exception { Path location = randomRepoPath(); createRepository(REPO, "fs", location); - DataStreamIT.putComposableIndexTemplate("t1", List.of("ds", "other-ds")); - DataStreamIT.putComposableIndexTemplate("t2", """ + DataStreamIT.putComposableIndexTemplate(TEMPLATE_1_ID, List.of("ds", "other-ds")); + DataStreamIT.putComposableIndexTemplate(TEMPLATE_2_ID, """ { "properties": { "@timestamp": { @@ -139,18 +141,11 @@ public void setup() throws Exception { // Resolve backing index names after data streams have been created: // (these names have a date component, and running around midnight could lead to test failures otherwise) - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "*" }); - GetDataStreamAction.Response getDataStreamResponse = client.execute(GetDataStreamAction.INSTANCE, getDataStreamRequest).actionGet(); - dsBackingIndexName = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices().get(0).getName(); - otherDsBackingIndexName = getDataStreamResponse.getDataStreams().get(1).getDataStream().getIndices().get(0).getName(); - fsBackingIndexName = getDataStreamResponse.getDataStreams().get(2).getDataStream().getIndices().get(0).getName(); - fsFailureIndexName = getDataStreamResponse.getDataStreams() - .get(2) - .getDataStream() - .getFailureIndices() - .getIndices() - .get(0) - .getName(); + List dataStreamInfos = getDataStreamInfo("*"); + dsBackingIndexName = dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName(); + otherDsBackingIndexName = dataStreamInfos.get(1).getDataStream().getIndices().get(0).getName(); + fsBackingIndexName = dataStreamInfos.get(2).getDataStream().getIndices().get(0).getName(); + fsFailureIndexName = dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices().get(0).getName(); // Will be used in some tests, to test renaming while restoring a snapshot: ds2BackingIndexName = dsBackingIndexName.replace("-ds-", "-ds2-"); @@ -192,9 +187,7 @@ public void testSnapshotAndRestore() throws Exception { assertEquals(Collections.singletonList(dsBackingIndexName), getSnapshot(REPO, SNAPSHOT).indices()); - assertAcked( - client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "ds" })) - ); + assertAcked(client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, "ds"))); RestoreSnapshotResponse restoreSnapshotResponse = client.admin() .cluster() @@ -212,13 +205,10 @@ public void testSnapshotAndRestore() throws Exception { assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap()); }); - GetDataStreamAction.Response ds = client.execute( - GetDataStreamAction.INSTANCE, - new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "ds" }) - ).get(); - assertEquals(1, ds.getDataStreams().size()); - assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size()); - assertEquals(dsBackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName()); + List ds = getDataStreamInfo("ds"); + assertEquals(1, ds.size()); + assertEquals(1, ds.get(0).getDataStream().getIndices().size()); + assertEquals(dsBackingIndexName, ds.get(0).getDataStream().getIndices().get(0).getName()); GetAliasesResponse getAliasesResponse = client.admin().indices().getAliases(new GetAliasesRequest("my-alias")).actionGet(); assertThat(getAliasesResponse.getDataStreamAliases().keySet(), containsInAnyOrder("ds", "other-ds")); @@ -272,19 +262,18 @@ public void testSnapshotAndRestoreAllDataStreamsInPlace() throws Exception { assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap()); }); - GetDataStreamAction.Request getDataSteamRequest = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "*" }); - GetDataStreamAction.Response ds = client.execute(GetDataStreamAction.INSTANCE, getDataSteamRequest).get(); + List dataStreamInfos = getDataStreamInfo("*"); assertThat( - ds.getDataStreams().stream().map(e -> e.getDataStream().getName()).collect(Collectors.toList()), + dataStreamInfos.stream().map(e -> e.getDataStream().getName()).collect(Collectors.toList()), contains(equalTo("ds"), equalTo("other-ds"), equalTo("with-fs")) ); - List backingIndices = ds.getDataStreams().get(0).getDataStream().getIndices(); + List backingIndices = dataStreamInfos.get(0).getDataStream().getIndices(); assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(dsBackingIndexName)); - backingIndices = ds.getDataStreams().get(1).getDataStream().getIndices(); + backingIndices = dataStreamInfos.get(1).getDataStream().getIndices(); assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(otherDsBackingIndexName)); - backingIndices = ds.getDataStreams().get(2).getDataStream().getIndices(); + backingIndices = dataStreamInfos.get(2).getDataStream().getIndices(); assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(fsBackingIndexName)); - List failureIndices = ds.getDataStreams().get(2).getDataStream().getFailureIndices().getIndices(); + List failureIndices = dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices(); assertThat(failureIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(fsFailureIndexName)); } @@ -331,14 +320,10 @@ public void testSnapshotAndRestoreInPlace() { assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap()); }); - GetDataStreamAction.Request getDataSteamRequest = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "ds" }); - GetDataStreamAction.Response ds = client.execute(GetDataStreamAction.INSTANCE, getDataSteamRequest).actionGet(); - assertThat( - ds.getDataStreams().stream().map(e -> e.getDataStream().getName()).collect(Collectors.toList()), - contains(equalTo("ds")) - ); - List backingIndices = ds.getDataStreams().get(0).getDataStream().getIndices(); - assertThat(ds.getDataStreams().get(0).getDataStream().getIndices(), hasSize(1)); + List dsInfo = getDataStreamInfo("ds"); + assertThat(dsInfo.stream().map(e -> e.getDataStream().getName()).collect(Collectors.toList()), contains(equalTo("ds"))); + List backingIndices = dsInfo.get(0).getDataStream().getIndices(); + assertThat(dsInfo.get(0).getDataStream().getIndices(), hasSize(1)); assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(equalTo(dsBackingIndexName))); // The backing index created as part of rollover should still exist (but just not part of the data stream) @@ -351,39 +336,40 @@ public void testSnapshotAndRestoreInPlace() { } public void testFailureStoreSnapshotAndRestore() throws Exception { + String dataStreamName = "with-fs"; CreateSnapshotResponse createSnapshotResponse = client.admin() .cluster() .prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT) .setWaitForCompletion(true) - .setIndices("with-fs") + .setIndices(dataStreamName) .setIncludeGlobalState(false) .get(); RestStatus status = createSnapshotResponse.getSnapshotInfo().status(); assertEquals(RestStatus.OK, status); + assertThat(getSnapshot(REPO, SNAPSHOT).dataStreams(), containsInAnyOrder(dataStreamName)); assertThat(getSnapshot(REPO, SNAPSHOT).indices(), containsInAnyOrder(fsBackingIndexName, fsFailureIndexName)); - assertAcked(client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, "with-fs"))); + assertAcked( + client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, dataStreamName)) + ); { RestoreSnapshotResponse restoreSnapshotResponse = client.admin() .cluster() .prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT) .setWaitForCompletion(true) - .setIndices("with-fs") + .setIndices(dataStreamName) .get(); assertEquals(2, restoreSnapshotResponse.getRestoreInfo().successfulShards()); - GetDataStreamAction.Response ds = client.execute( - GetDataStreamAction.INSTANCE, - new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "with-fs" }) - ).get(); - assertEquals(1, ds.getDataStreams().size()); - assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size()); - assertEquals(fsBackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName()); - assertEquals(fsFailureIndexName, ds.getDataStreams().get(0).getDataStream().getFailureIndices().getIndices().get(0).getName()); + List dataStreamInfos = getDataStreamInfo(dataStreamName); + assertEquals(1, dataStreamInfos.size()); + assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size()); + assertEquals(fsBackingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName()); + assertEquals(fsFailureIndexName, dataStreamInfos.get(0).getDataStream().getFailureIndices().getIndices().get(0).getName()); } { // With rename pattern @@ -391,21 +377,18 @@ public void testFailureStoreSnapshotAndRestore() throws Exception { .cluster() .prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT) .setWaitForCompletion(true) - .setIndices("with-fs") + .setIndices(dataStreamName) .setRenamePattern("-fs") .setRenameReplacement("-fs2") .get(); assertEquals(2, restoreSnapshotResponse.getRestoreInfo().successfulShards()); - GetDataStreamAction.Response ds = client.execute( - GetDataStreamAction.INSTANCE, - new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "with-fs2" }) - ).get(); - assertEquals(1, ds.getDataStreams().size()); - assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size()); - assertEquals(fs2BackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName()); - assertEquals(fs2FailureIndexName, ds.getDataStreams().get(0).getDataStream().getFailureIndices().getIndices().get(0).getName()); + List dataStreamInfos = getDataStreamInfo("with-fs2"); + assertEquals(1, dataStreamInfos.size()); + assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size()); + assertEquals(fs2BackingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName()); + assertEquals(fs2FailureIndexName, dataStreamInfos.get(0).getDataStream().getFailureIndices().getIndices().get(0).getName()); } } @@ -471,13 +454,10 @@ public void testSnapshotAndRestoreAllIncludeSpecificDataStream() throws Exceptio assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap()); }); - GetDataStreamAction.Response ds = client.execute( - GetDataStreamAction.INSTANCE, - new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStreamToSnapshot }) - ).get(); - assertEquals(1, ds.getDataStreams().size()); - assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size()); - assertEquals(backingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName()); + List dataStreamInfos = getDataStreamInfo(dataStreamToSnapshot); + assertEquals(1, dataStreamInfos.size()); + assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size()); + assertEquals(backingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName()); GetAliasesResponse getAliasesResponse = client.admin().indices().getAliases(new GetAliasesRequest("my-alias")).actionGet(); assertThat(getAliasesResponse.getDataStreamAliases().keySet(), contains(dataStreamToSnapshot)); @@ -530,13 +510,10 @@ public void testSnapshotAndRestoreReplaceAll() throws Exception { assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap()); }); - GetDataStreamAction.Response ds = client.execute( - GetDataStreamAction.INSTANCE, - new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "*" }) - ).get(); - assertEquals(3, ds.getDataStreams().size()); + List dataStreamInfos = getDataStreamInfo("*"); + assertEquals(3, dataStreamInfos.size()); assertThat( - ds.getDataStreams().stream().map(i -> i.getDataStream().getName()).collect(Collectors.toList()), + dataStreamInfos.stream().map(i -> i.getDataStream().getName()).collect(Collectors.toList()), containsInAnyOrder("ds", "other-ds", "with-fs") ); @@ -590,19 +567,16 @@ public void testSnapshotAndRestoreAll() throws Exception { assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap()); }); - GetDataStreamAction.Response ds = client.execute( - GetDataStreamAction.INSTANCE, - new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "*" }) - ).get(); - assertEquals(3, ds.getDataStreams().size()); - assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size()); - assertEquals(dsBackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName()); - assertEquals(1, ds.getDataStreams().get(1).getDataStream().getIndices().size()); - assertEquals(otherDsBackingIndexName, ds.getDataStreams().get(1).getDataStream().getIndices().get(0).getName()); - assertEquals(1, ds.getDataStreams().get(2).getDataStream().getIndices().size()); - assertEquals(fsBackingIndexName, ds.getDataStreams().get(2).getDataStream().getIndices().get(0).getName()); - assertEquals(1, ds.getDataStreams().get(2).getDataStream().getFailureIndices().getIndices().size()); - assertEquals(fsFailureIndexName, ds.getDataStreams().get(2).getDataStream().getFailureIndices().getIndices().get(0).getName()); + List dataStreamInfos = getDataStreamInfo("*"); + assertEquals(3, dataStreamInfos.size()); + assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size()); + assertEquals(dsBackingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName()); + assertEquals(1, dataStreamInfos.get(1).getDataStream().getIndices().size()); + assertEquals(otherDsBackingIndexName, dataStreamInfos.get(1).getDataStream().getIndices().get(0).getName()); + assertEquals(1, dataStreamInfos.get(2).getDataStream().getIndices().size()); + assertEquals(fsBackingIndexName, dataStreamInfos.get(2).getDataStream().getIndices().get(0).getName()); + assertEquals(1, dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices().size()); + assertEquals(fsFailureIndexName, dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices().get(0).getName()); GetAliasesResponse getAliasesResponse = client.admin().indices().getAliases(new GetAliasesRequest("my-alias")).actionGet(); assertThat(getAliasesResponse.getDataStreamAliases().keySet(), containsInAnyOrder("ds", "other-ds")); @@ -661,19 +635,16 @@ public void testSnapshotAndRestoreIncludeAliasesFalse() throws Exception { assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap()); }); - GetDataStreamAction.Response ds = client.execute( - GetDataStreamAction.INSTANCE, - new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "*" }) - ).get(); - assertEquals(3, ds.getDataStreams().size()); - assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size()); - assertEquals(dsBackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName()); - assertEquals(1, ds.getDataStreams().get(1).getDataStream().getIndices().size()); - assertEquals(otherDsBackingIndexName, ds.getDataStreams().get(1).getDataStream().getIndices().get(0).getName()); - assertEquals(1, ds.getDataStreams().get(2).getDataStream().getIndices().size()); - assertEquals(fsBackingIndexName, ds.getDataStreams().get(2).getDataStream().getIndices().get(0).getName()); - assertEquals(1, ds.getDataStreams().get(2).getDataStream().getIndices().size()); - assertEquals(fsFailureIndexName, ds.getDataStreams().get(2).getDataStream().getFailureIndices().getIndices().get(0).getName()); + List dataStreamInfos = getDataStreamInfo("*"); + assertEquals(3, dataStreamInfos.size()); + assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size()); + assertEquals(dsBackingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName()); + assertEquals(1, dataStreamInfos.get(1).getDataStream().getIndices().size()); + assertEquals(otherDsBackingIndexName, dataStreamInfos.get(1).getDataStream().getIndices().get(0).getName()); + assertEquals(1, dataStreamInfos.get(2).getDataStream().getIndices().size()); + assertEquals(fsBackingIndexName, dataStreamInfos.get(2).getDataStream().getIndices().get(0).getName()); + assertEquals(1, dataStreamInfos.get(2).getDataStream().getIndices().size()); + assertEquals(fsFailureIndexName, dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices().get(0).getName()); GetAliasesResponse getAliasesResponse = client.admin().indices().getAliases(new GetAliasesRequest("*")).actionGet(); assertThat(getAliasesResponse.getDataStreamAliases(), anEmptyMap()); @@ -715,13 +686,10 @@ public void testRename() throws Exception { .setRenameReplacement("ds2") .get(); - GetDataStreamAction.Response ds = client.execute( - GetDataStreamAction.INSTANCE, - new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "ds2" }) - ).get(); - assertEquals(1, ds.getDataStreams().size()); - assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size()); - assertEquals(ds2BackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName()); + List dataStreamInfos = getDataStreamInfo("ds2"); + assertEquals(1, dataStreamInfos.size()); + assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size()); + assertEquals(ds2BackingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName()); assertResponse( client.prepareSearch("ds2"), response -> assertEquals(DOCUMENT_SOURCE, response.getHits().getHits()[0].getSourceAsMap()) @@ -773,13 +741,10 @@ public void testRenameWriteDataStream() throws Exception { .setRenameReplacement("other-ds2") .get(); - GetDataStreamAction.Response ds = client.execute( - GetDataStreamAction.INSTANCE, - new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "other-ds2" }) - ).get(); - assertEquals(1, ds.getDataStreams().size()); - assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size()); - assertEquals(otherDs2BackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName()); + List dataStreamInfos = getDataStreamInfo("other-ds2"); + assertEquals(1, dataStreamInfos.size()); + assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size()); + assertEquals(otherDs2BackingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName()); GetAliasesResponse getAliasesResponse = client.admin().indices().getAliases(new GetAliasesRequest("my-alias")).actionGet(); assertThat(getAliasesResponse.getDataStreamAliases().keySet(), containsInAnyOrder("ds", "other-ds", "other-ds2")); @@ -843,9 +808,8 @@ public void testBackingIndexIsNotRenamedWhenRestoringDataStream() { assertThat(restoreSnapshotResponse.status(), is(RestStatus.OK)); - GetDataStreamAction.Request getDSRequest = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "ds" }); - GetDataStreamAction.Response response = client.execute(GetDataStreamAction.INSTANCE, getDSRequest).actionGet(); - assertThat(response.getDataStreams().get(0).getDataStream().getIndices().get(0).getName(), is(dsBackingIndexName)); + List dataStreamInfos = getDataStreamInfo("ds"); + assertThat(dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName(), is(dsBackingIndexName)); } public void testDataStreamAndBackingIndicesAreRenamedUsingRegex() { @@ -882,17 +846,15 @@ public void testDataStreamAndBackingIndicesAreRenamedUsingRegex() { assertThat(restoreSnapshotResponse.status(), is(RestStatus.OK)); // assert "ds" was restored as "test-ds" and the backing index has a valid name - GetDataStreamAction.Request getRenamedDS = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "test-ds" }); - GetDataStreamAction.Response response = client.execute(GetDataStreamAction.INSTANCE, getRenamedDS).actionGet(); + List dataStreamInfos = getDataStreamInfo("test-ds"); assertThat( - response.getDataStreams().get(0).getDataStream().getIndices().get(0).getName(), + dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName(), is(DataStream.getDefaultBackingIndexName("test-ds", 1L)) ); // data stream "ds" should still exist in the system - GetDataStreamAction.Request getDSRequest = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "ds" }); - response = client.execute(GetDataStreamAction.INSTANCE, getDSRequest).actionGet(); - assertThat(response.getDataStreams().get(0).getDataStream().getIndices().get(0).getName(), is(dsBackingIndexName)); + dataStreamInfos = getDataStreamInfo("ds"); + assertThat(dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName(), is(dsBackingIndexName)); } public void testWildcards() throws Exception { @@ -918,16 +880,13 @@ public void testWildcards() throws Exception { assertEquals(RestStatus.OK, restoreSnapshotResponse.status()); - GetDataStreamAction.Response ds = client.execute( - GetDataStreamAction.INSTANCE, - new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "ds2" }) - ).get(); - assertEquals(1, ds.getDataStreams().size()); - assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size()); - assertEquals(ds2BackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName()); + List dataStreamInfos = getDataStreamInfo("ds2"); + assertEquals(1, dataStreamInfos.size()); + assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size()); + assertEquals(ds2BackingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName()); assertThat( "we renamed the restored data stream to one that doesn't match any existing composable template", - ds.getDataStreams().get(0).getIndexTemplate(), + dataStreamInfos.get(0).getIndexTemplate(), is(nullValue()) ); } @@ -949,7 +908,7 @@ public void testDataStreamNotStoredWhenIndexRequested() { ); } - public void testDataStreamNotRestoredWhenIndexRequested() throws Exception { + public void testDataStreamNotRestoredWhenIndexRequested() { CreateSnapshotResponse createSnapshotResponse = client.admin() .cluster() .prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO, "snap2") @@ -978,7 +937,7 @@ public void testDataStreamNotRestoredWhenIndexRequested() throws Exception { expectThrows(ResourceNotFoundException.class, client.execute(GetDataStreamAction.INSTANCE, getRequest)); } - public void testDataStreamNotIncludedInLimitedSnapshot() throws ExecutionException, InterruptedException { + public void testDataStreamNotIncludedInLimitedSnapshot() { final String snapshotName = "test-snap"; CreateSnapshotResponse createSnapshotResponse = client.admin() .cluster() @@ -1036,12 +995,7 @@ public void testDeleteDataStreamDuringSnapshot() throws Exception { assertDocCount(dataStream, 100L); // Resolve backing index name after the data stream has been created because it has a date component, // and running around midnight could lead to test failures otherwise - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { dataStream } - ); - GetDataStreamAction.Response getDataStreamResponse = client.execute(GetDataStreamAction.INSTANCE, getDataStreamRequest).actionGet(); - String backingIndexName = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices().get(0).getName(); + String backingIndexName = getDataStreamInfo(dataStream).get(0).getDataStream().getIndices().get(0).getName(); logger.info("--> snapshot"); ActionFuture future = client1.admin() @@ -1229,7 +1183,7 @@ public void testSnapshotDSDuringRolloverAndDeleteOldIndex() throws Exception { assertEquals(restoreSnapshotResponse.failedShards(), 0); } - public void testExcludeDSFromSnapshotWhenExcludingItsIndices() { + public void testExcludeDSFromSnapshotWhenExcludingAnyOfItsIndices() { final String snapshot = "test-snapshot"; final String indexWithoutDataStream = "test-idx-no-ds"; createIndexWithContent(indexWithoutDataStream); @@ -1245,10 +1199,47 @@ public void testExcludeDSFromSnapshotWhenExcludingItsIndices() { .getRestoreInfo(); assertThat(restoreInfo.failedShards(), is(0)); assertThat(restoreInfo.successfulShards(), is(1)); + + // Exclude only failure store indices + { + String dataStreamName = "with-fs"; + CreateSnapshotResponse createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIndices(dataStreamName + "*", "-.fs*") + .setIncludeGlobalState(false) + .get(); + + RestStatus status = createSnapshotResponse.getSnapshotInfo().status(); + assertEquals(RestStatus.OK, status); + + SnapshotInfo retrievedSnapshot = getSnapshot(REPO, SNAPSHOT); + assertThat(retrievedSnapshot.dataStreams(), contains(dataStreamName)); + assertThat(retrievedSnapshot.indices(), containsInAnyOrder(fsBackingIndexName)); + + assertAcked( + safeGet(client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, "*"))) + ); + + RestoreInfo restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIndices(dataStreamName) + .get() + .getRestoreInfo(); + + assertThat(restoreSnapshotResponse, notNullValue()); + assertThat(restoreSnapshotResponse.successfulShards(), equalTo(restoreSnapshotResponse.totalShards())); + assertThat(restoreSnapshotResponse.failedShards(), is(0)); + + GetDataStreamAction.Response.DataStreamInfo dataStream = getDataStreamInfo(dataStreamName).get(0); + assertThat(dataStream.getDataStream().getBackingIndices().getIndices(), not(empty())); + assertThat(dataStream.getDataStream().getFailureIndices().getIndices(), empty()); + } } /** - * This test is a copy of the {@link #testExcludeDSFromSnapshotWhenExcludingItsIndices()} the only difference + * This test is a copy of the {@link #testExcludeDSFromSnapshotWhenExcludingAnyOfItsIndices()} ()} the only difference * is that one include the global state and one doesn't. In general this shouldn't matter that's why it used to be * a random parameter of the test, but because of #107515 it fails when we include the global state. Keep them * separate until this is fixed. @@ -1278,10 +1269,7 @@ public void testRestoreSnapshotFully() throws Exception { createIndexWithContent(indexName); createFullSnapshot(REPO, snapshotName); - assertAcked( - client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "*" })) - .get() - ); + assertAcked(client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, "*")).get()); assertAcked(client.admin().indices().prepareDelete("*").setIndicesOptions(IndicesOptions.lenientExpandOpenHidden()).get()); RestoreSnapshotResponse restoreSnapshotResponse = client.admin() @@ -1291,8 +1279,7 @@ public void testRestoreSnapshotFully() throws Exception { .get(); assertEquals(RestStatus.OK, restoreSnapshotResponse.status()); - GetDataStreamAction.Request getRequest = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "*" }); - assertThat(client.execute(GetDataStreamAction.INSTANCE, getRequest).get().getDataStreams(), hasSize(3)); + assertThat(getDataStreamInfo("*"), hasSize(3)); assertNotNull(client.admin().indices().prepareGetIndex().setIndices(indexName).get()); } @@ -1320,7 +1307,7 @@ public void testRestoreDataStreamAliasWithConflictingDataStream() throws Excepti } } - public void testRestoreDataStreamAliasWithConflictingIndicesAlias() throws Exception { + public void testRestoreDataStreamAliasWithConflictingIndicesAlias() { var snapshotName = "test-snapshot"; createFullSnapshot(REPO, snapshotName); client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, "*")).actionGet(); @@ -1333,4 +1320,9 @@ public void testRestoreDataStreamAliasWithConflictingIndicesAlias() throws Excep ); assertThat(e.getMessage(), containsString("data stream alias and indices alias have the same name (my-alias)")); } + + protected List getDataStreamInfo(String... dataStreamNames) { + GetDataStreamAction.Request getRequest = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, dataStreamNames); + return safeGet(client.execute(GetDataStreamAction.INSTANCE, getRequest)).getDataStreams(); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 7745ec9cc75b2..db602ef6ef291 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -49,7 +49,6 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -794,27 +793,57 @@ public DataStream promoteDataStream() { /** * Reconciles this data stream with a list of indices available in a snapshot. Allows snapshots to store accurate data - * stream definitions that do not reference backing indices not contained in the snapshot. + * stream definitions that do not reference backing indices and failure indices not contained in the snapshot. * * @param indicesInSnapshot List of indices in the snapshot + * @param snapshotMetadataBuilder a metadata builder with the current view of the snapshot metadata * @return Reconciled {@link DataStream} instance or {@code null} if no reconciled version of this data stream could be built from the * given indices */ @Nullable - public DataStream snapshot(Collection indicesInSnapshot) { + public DataStream snapshot(Set indicesInSnapshot, Metadata.Builder snapshotMetadataBuilder) { + boolean backingIndicesChanged = false; + boolean failureIndicesChanged = false; + // do not include indices not available in the snapshot - List reconciledIndices = new ArrayList<>(this.backingIndices.indices); - if (reconciledIndices.removeIf(x -> indicesInSnapshot.contains(x.getName()) == false) == false) { + List reconciledBackingIndices = this.backingIndices.indices; + if (isAnyIndexMissing(this.backingIndices.getIndices(), snapshotMetadataBuilder, indicesInSnapshot)) { + reconciledBackingIndices = new ArrayList<>(this.backingIndices.indices); + backingIndicesChanged = reconciledBackingIndices.removeIf(x -> indicesInSnapshot.contains(x.getName()) == false); + if (reconciledBackingIndices.isEmpty()) { + return null; + } + } + + List reconciledFailureIndices = this.failureIndices.indices; + if (DataStream.isFailureStoreFeatureFlagEnabled() + && isAnyIndexMissing(failureIndices.indices, snapshotMetadataBuilder, indicesInSnapshot)) { + reconciledFailureIndices = new ArrayList<>(this.failureIndices.indices); + failureIndicesChanged = reconciledFailureIndices.removeIf(x -> indicesInSnapshot.contains(x.getName()) == false); + } + + if (backingIndicesChanged == false && failureIndicesChanged == false) { return this; } - if (reconciledIndices.size() == 0) { - return null; + Builder builder = copy(); + if (backingIndicesChanged) { + builder.setBackingIndices(backingIndices.copy().setIndices(reconciledBackingIndices).build()); + } + if (failureIndicesChanged) { + builder.setFailureIndices(failureIndices.copy().setIndices(reconciledFailureIndices).build()); } + return builder.setMetadata(metadata == null ? null : new HashMap<>(metadata)).build(); + } - return copy().setBackingIndices(backingIndices.copy().setIndices(reconciledIndices).build()) - .setMetadata(metadata == null ? null : new HashMap<>(metadata)) - .build(); + private static boolean isAnyIndexMissing(List indices, Metadata.Builder builder, Set indicesInSnapshot) { + for (Index index : indices) { + final String indexName = index.getName(); + if (builder.get(indexName) == null || indicesInSnapshot.contains(indexName) == false) { + return true; + } + } + return false; } /** diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 8d526f3e114e1..6f690a9e6ccd5 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -786,15 +786,7 @@ private static Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot, assert snapshot.partial() : "Data stream [" + dataStreamName + "] was deleted during a snapshot but snapshot was not partial."; } else { - boolean missingIndex = false; - for (Index index : dataStream.getIndices()) { - final String indexName = index.getName(); - if (builder.get(indexName) == null || indicesInSnapshot.contains(indexName) == false) { - missingIndex = true; - break; - } - } - final DataStream reconciled = missingIndex ? dataStream.snapshot(indicesInSnapshot) : dataStream; + final DataStream reconciled = dataStream.snapshot(indicesInSnapshot, builder); if (reconciled != null) { dataStreams.put(dataStreamName, reconciled); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index ee83cef4f41ae..b974e4f3cf9a6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -45,8 +45,10 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; +import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance; @@ -870,23 +872,39 @@ public void testReplaceFailureIndexThrowsExceptionIfReplacingWriteIndex() { } public void testSnapshot() { - var preSnapshotDataStream = DataStreamTestHelper.randomInstance(); - var indicesToRemove = randomSubsetOf(preSnapshotDataStream.getIndices()); - if (indicesToRemove.size() == preSnapshotDataStream.getIndices().size()) { + var preSnapshotDataStream = DataStreamTestHelper.randomInstance(true); + + // Mutate backing indices + var backingIndicesToRemove = randomSubsetOf(preSnapshotDataStream.getIndices()); + if (backingIndicesToRemove.size() == preSnapshotDataStream.getIndices().size()) { // never remove them all - indicesToRemove.remove(0); + backingIndicesToRemove.remove(0); } - var indicesToAdd = randomIndexInstances(); - var postSnapshotIndices = new ArrayList<>(preSnapshotDataStream.getIndices()); - postSnapshotIndices.removeAll(indicesToRemove); - postSnapshotIndices.addAll(indicesToAdd); + var backingIndicesToAdd = randomIndexInstances(); + var postSnapshotBackingIndices = new ArrayList<>(preSnapshotDataStream.getIndices()); + postSnapshotBackingIndices.removeAll(backingIndicesToRemove); + postSnapshotBackingIndices.addAll(backingIndicesToAdd); + + // Mutate failure indices + var failureIndicesToRemove = randomSubsetOf(preSnapshotDataStream.getFailureIndices().getIndices()); + var failureIndicesToAdd = randomIndexInstances(); + var postSnapshotFailureIndices = new ArrayList<>(preSnapshotDataStream.getFailureIndices().getIndices()); + postSnapshotFailureIndices.removeAll(failureIndicesToRemove); + postSnapshotFailureIndices.addAll(failureIndicesToAdd); var replicated = preSnapshotDataStream.isReplicated() && randomBoolean(); var postSnapshotDataStream = preSnapshotDataStream.copy() .setBackingIndices( preSnapshotDataStream.getBackingIndices() .copy() - .setIndices(postSnapshotIndices) + .setIndices(postSnapshotBackingIndices) + .setRolloverOnWrite(replicated == false && preSnapshotDataStream.rolloverOnWrite()) + .build() + ) + .setFailureIndices( + preSnapshotDataStream.getFailureIndices() + .copy() + .setIndices(postSnapshotFailureIndices) .setRolloverOnWrite(replicated == false && preSnapshotDataStream.rolloverOnWrite()) .build() ) @@ -895,9 +913,10 @@ public void testSnapshot() { .setReplicated(replicated) .build(); - var reconciledDataStream = postSnapshotDataStream.snapshot( - preSnapshotDataStream.getIndices().stream().map(Index::getName).toList() - ); + Set indicesInSnapshot = new HashSet<>(); + preSnapshotDataStream.getIndices().forEach(index -> indicesInSnapshot.add(index.getName())); + preSnapshotDataStream.getFailureIndices().getIndices().forEach(index -> indicesInSnapshot.add(index.getName())); + var reconciledDataStream = postSnapshotDataStream.snapshot(indicesInSnapshot, Metadata.builder()); assertThat(reconciledDataStream.getName(), equalTo(postSnapshotDataStream.getName())); assertThat(reconciledDataStream.getGeneration(), equalTo(postSnapshotDataStream.getGeneration())); @@ -911,9 +930,19 @@ public void testSnapshot() { } assertThat(reconciledDataStream.isHidden(), equalTo(postSnapshotDataStream.isHidden())); assertThat(reconciledDataStream.isReplicated(), equalTo(postSnapshotDataStream.isReplicated())); - assertThat(reconciledDataStream.getIndices(), everyItem(not(in(indicesToRemove)))); - assertThat(reconciledDataStream.getIndices(), everyItem(not(in(indicesToAdd)))); - assertThat(reconciledDataStream.getIndices().size(), equalTo(preSnapshotDataStream.getIndices().size() - indicesToRemove.size())); + assertThat(reconciledDataStream.getIndices(), everyItem(not(in(backingIndicesToRemove)))); + assertThat(reconciledDataStream.getIndices(), everyItem(not(in(backingIndicesToAdd)))); + assertThat( + reconciledDataStream.getIndices().size(), + equalTo(preSnapshotDataStream.getIndices().size() - backingIndicesToRemove.size()) + ); + var reconciledFailureIndices = reconciledDataStream.getFailureIndices().getIndices(); + assertThat(reconciledFailureIndices, everyItem(not(in(failureIndicesToRemove)))); + assertThat(reconciledFailureIndices, everyItem(not(in(failureIndicesToAdd)))); + assertThat( + reconciledFailureIndices.size(), + equalTo(preSnapshotDataStream.getFailureIndices().getIndices().size() - failureIndicesToRemove.size()) + ); } public void testSnapshotWithAllBackingIndicesRemoved() { @@ -924,7 +953,12 @@ public void testSnapshotWithAllBackingIndicesRemoved() { .setBackingIndices(preSnapshotDataStream.getBackingIndices().copy().setIndices(indicesToAdd).build()) .build(); - assertNull(postSnapshotDataStream.snapshot(preSnapshotDataStream.getIndices().stream().map(Index::getName).toList())); + assertNull( + postSnapshotDataStream.snapshot( + preSnapshotDataStream.getIndices().stream().map(Index::getName).collect(Collectors.toSet()), + Metadata.builder() + ) + ); } public void testSelectTimeSeriesWriteIndex() { From f8adadaa758785df7568aa0eca4768f44f86886d Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Thu, 19 Dec 2024 16:08:40 +0000 Subject: [PATCH 4/8] =?UTF-8?q?[8.x]=20Update=20data=20stream=20deprecatio?= =?UTF-8?q?ns=20warnings=20to=20new=20format=20and=20filter=20sea=E2=80=A6?= =?UTF-8?q?=20(#119097)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Update data stream deprecations warnings to new format and filter searchable snapshots from response (#118562) * Update data stream deprecations warnings to new format * Add reindex_required flag to index version deprecation notice response * PR Changes * Move all deprecation checks to use a shared predicate which also excludes snapshots * Update docs/changelog/118562.yaml * Tests for excluding snapshots * PR Changes - Remove leftover comment (cherry picked from commit 5487927) * Update docs/changelog/119097.yaml --- docs/changelog/119097.yaml | 6 ++ .../deprecation/DeprecatedIndexPredicate.java | 47 ++++++++++++ .../DataStreamDeprecationChecks.java | 56 ++++---------- .../deprecation/IndexDeprecationChecks.java | 8 +- .../DataStreamDeprecationChecksTests.java | 74 +++++++++---------- .../IndexDeprecationChecksTests.java | 24 +++++- .../action/ReindexDataStreamAction.java | 18 ----- ...ReindexDataStreamIndexTransportAction.java | 5 +- .../ReindexDataStreamTransportAction.java | 4 +- ...indexDataStreamPersistentTaskExecutor.java | 4 +- 10 files changed, 135 insertions(+), 111 deletions(-) create mode 100644 docs/changelog/119097.yaml create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/deprecation/DeprecatedIndexPredicate.java diff --git a/docs/changelog/119097.yaml b/docs/changelog/119097.yaml new file mode 100644 index 0000000000000..711ddfc0edfcb --- /dev/null +++ b/docs/changelog/119097.yaml @@ -0,0 +1,6 @@ +pr: 119097 +summary: "[8.x] Update data stream deprecations warnings to new format and filter\ + \ sea…" +area: Data streams +type: enhancement +issues: [] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/deprecation/DeprecatedIndexPredicate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/deprecation/DeprecatedIndexPredicate.java new file mode 100644 index 0000000000000..26f769cd08766 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/deprecation/DeprecatedIndexPredicate.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.deprecation; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.IndexVersions; + +import java.util.function.Predicate; + +public class DeprecatedIndexPredicate { + + public static final IndexVersion MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE = IndexVersions.V_8_0_0; + + /* + * This predicate allows through only indices that were created with a previous lucene version, meaning that they need to be reindexed + * in order to be writable in the _next_ lucene version. + * + * It ignores searchable snapshots as they are not writable. + */ + public static Predicate getReindexRequiredPredicate(Metadata metadata) { + return index -> { + IndexMetadata indexMetadata = metadata.index(index); + return reindexRequired(indexMetadata); + }; + } + + public static boolean reindexRequired(IndexMetadata indexMetadata) { + return creationVersionBeforeMinimumWritableVersion(indexMetadata) && isNotSearchableSnapshot(indexMetadata); + } + + private static boolean isNotSearchableSnapshot(IndexMetadata indexMetadata) { + return indexMetadata.isSearchableSnapshot() == false; + } + + private static boolean creationVersionBeforeMinimumWritableVersion(IndexMetadata metadata) { + return metadata.getCreationVersion().before(MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE); + } + +} diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecks.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecks.java index ee029d01427aa..c82f9cb047625 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecks.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecks.java @@ -10,10 +10,12 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexVersions; +import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate; import org.elasticsearch.xpack.core.deprecation.DeprecationIssue; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import static java.util.Map.entry; import static java.util.Map.ofEntries; @@ -21,54 +23,28 @@ public class DataStreamDeprecationChecks { static DeprecationIssue oldIndicesCheck(DataStream dataStream, ClusterState clusterState) { List backingIndices = dataStream.getIndices(); - boolean hasOldIndices = backingIndices.stream() - .anyMatch(index -> clusterState.metadata().index(index).getCompatibilityVersion().before(IndexVersions.V_8_0_0)); - if (hasOldIndices) { - long totalIndices = backingIndices.size(); - List oldIndices = backingIndices.stream() - .filter(index -> clusterState.metadata().index(index).getCompatibilityVersion().before(IndexVersions.V_8_0_0)) - .toList(); - long totalOldIndices = oldIndices.size(); - long totalOldSearchableSnapshots = oldIndices.stream() - .filter(index -> clusterState.metadata().index(index).isSearchableSnapshot()) - .count(); - long totalOldPartiallyMountedSearchableSnapshots = oldIndices.stream() - .filter(index -> clusterState.metadata().index(index).isPartialSearchableSnapshot()) - .count(); - long totalOldFullyMountedSearchableSnapshots = totalOldSearchableSnapshots - totalOldPartiallyMountedSearchableSnapshots; + + Set indicesNeedingUpgrade = backingIndices.stream() + .filter(DeprecatedIndexPredicate.getReindexRequiredPredicate(clusterState.metadata())) + .map(Index::getName) + .collect(Collectors.toUnmodifiableSet()); + + if (indicesNeedingUpgrade.isEmpty() == false) { return new DeprecationIssue( DeprecationIssue.Level.CRITICAL, "Old data stream with a compatibility version < 8.0", - "https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking-changes-9.0.html", + "https://www.elastic.co/guide/en/elasticsearch/reference/current/migrating-8.0.html#breaking-changes-8.0", "This data stream has backing indices that were created before Elasticsearch 8.0.0", false, ofEntries( - entry( - "backing_indices", - ofEntries( - entry("count", totalIndices), - entry( - "need_upgrading", - ofEntries( - entry("count", totalOldIndices), - entry( - "searchable_snapshots", - ofEntries( - entry("count", totalOldSearchableSnapshots), - entry("fully_mounted", ofEntries(entry("count", totalOldFullyMountedSearchableSnapshots))), - entry( - "partially_mounted", - ofEntries(entry("count", totalOldPartiallyMountedSearchableSnapshots)) - ) - ) - ) - ) - ) - ) - ) + entry("reindex_required", true), + entry("total_backing_indices", backingIndices.size()), + entry("indices_requiring_upgrade_count", indicesNeedingUpgrade.size()), + entry("indices_requiring_upgrade", indicesNeedingUpgrade) ) ); } + return null; } } diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecks.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecks.java index e2e30aba3be7d..969a5608ee36a 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecks.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecks.java @@ -17,9 +17,11 @@ import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.engine.frozen.FrozenEngine; import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate; import org.elasticsearch.xpack.core.deprecation.DeprecationIssue; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; @@ -36,14 +38,14 @@ static DeprecationIssue oldIndicesCheck(IndexMetadata indexMetadata, ClusterStat // TODO: this check needs to be revised. It's trivially true right now. IndexVersion currentCompatibilityVersion = indexMetadata.getCompatibilityVersion(); // We intentionally exclude indices that are in data streams because they will be picked up by DataStreamDeprecationChecks - if (currentCompatibilityVersion.before(IndexVersions.V_8_0_0) && isNotDataStreamIndex(indexMetadata, clusterState)) { + if (DeprecatedIndexPredicate.reindexRequired(indexMetadata) && isNotDataStreamIndex(indexMetadata, clusterState)) { return new DeprecationIssue( DeprecationIssue.Level.CRITICAL, "Old index with a compatibility version < 8.0", - "https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking-changes-9.0.html", + "https://www.elastic.co/guide/en/elasticsearch/reference/current/migrating-8.0.html#breaking-changes-8.0", "This index has version: " + currentCompatibilityVersion.toReleaseVersion(), false, - null + Collections.singletonMap("reindex_required", true) ); } return null; diff --git a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecksTests.java b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecksTests.java index c391e852014f2..29bda1b8c71e7 100644 --- a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecksTests.java +++ b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecksTests.java @@ -17,41 +17,46 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.snapshots.SearchableSnapshotsSettings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.deprecation.DeprecationIssue; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static java.util.Collections.singletonList; +import static java.util.Map.entry; +import static java.util.Map.ofEntries; +import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; import static org.elasticsearch.xpack.deprecation.DeprecationChecks.DATA_STREAM_CHECKS; import static org.hamcrest.Matchers.equalTo; public class DataStreamDeprecationChecksTests extends ESTestCase { public void testOldIndicesCheck() { - long oldIndexCount = randomIntBetween(1, 100); - long newIndexCount = randomIntBetween(1, 100); - long oldSearchableSnapshotCount = 0; - long oldFullyManagedSearchableSnapshotCount = 0; - long oldPartiallyManagedSearchableSnapshotCount = 0; + int oldIndexCount = randomIntBetween(1, 100); + int newIndexCount = randomIntBetween(1, 100); + List allIndices = new ArrayList<>(); Map nameToIndexMetadata = new HashMap<>(); + Set expectedIndices = new HashSet<>(); + for (int i = 0; i < oldIndexCount; i++) { - Settings.Builder settingsBuilder = settings(IndexVersion.fromId(7170099)); - if (randomBoolean()) { - settingsBuilder.put("index.store.type", "snapshot"); - if (randomBoolean()) { - oldFullyManagedSearchableSnapshotCount++; - } else { - settingsBuilder.put("index.store.snapshot.partial", true); - oldPartiallyManagedSearchableSnapshotCount++; - } - oldSearchableSnapshotCount++; + Settings.Builder settings = settings(IndexVersion.fromId(7170099)); + + String indexName = "old-data-stream-index-" + i; + if (expectedIndices.isEmpty() == false && randomIntBetween(0, 2) == 0) { + settings.put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE); + } else { + expectedIndices.add(indexName); } - IndexMetadata oldIndexMetadata = IndexMetadata.builder("old-data-stream-index-" + i) + + Settings.Builder settingsBuilder = settings; + IndexMetadata oldIndexMetadata = IndexMetadata.builder(indexName) .settings(settingsBuilder) .numberOfShards(1) .numberOfReplicas(0) @@ -59,11 +64,9 @@ public void testOldIndicesCheck() { allIndices.add(oldIndexMetadata.getIndex()); nameToIndexMetadata.put(oldIndexMetadata.getIndex().getName(), oldIndexMetadata); } + for (int i = 0; i < newIndexCount; i++) { Settings.Builder settingsBuilder = settings(IndexVersion.current()); - if (randomBoolean()) { - settingsBuilder.put("index.store.type", "snapshot"); - } IndexMetadata newIndexMetadata = IndexMetadata.builder("new-data-stream-index-" + i) .settings(settingsBuilder) .numberOfShards(1) @@ -72,6 +75,7 @@ public void testOldIndicesCheck() { allIndices.add(newIndexMetadata.getIndex()); nameToIndexMetadata.put(newIndexMetadata.getIndex().getName(), newIndexMetadata); } + DataStream dataStream = new DataStream( randomAlphaOfLength(10), allIndices, @@ -88,37 +92,27 @@ public void testOldIndicesCheck() { randomBoolean(), null ); + Metadata metadata = Metadata.builder().indices(nameToIndexMetadata).build(); ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).build(); + DeprecationIssue expected = new DeprecationIssue( DeprecationIssue.Level.CRITICAL, "Old data stream with a compatibility version < 8.0", - "https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking-changes-9.0.html", + "https://www.elastic.co/guide/en/elasticsearch/reference/current/migrating-8.0.html#breaking-changes-8.0", "This data stream has backing indices that were created before Elasticsearch 8.0.0", false, - Map.of( - "backing_indices", - Map.of( - "count", - oldIndexCount + newIndexCount, - "need_upgrading", - Map.of( - "count", - oldIndexCount, - "searchable_snapshots", - Map.of( - "count", - oldSearchableSnapshotCount, - "fully_mounted", - Map.of("count", oldFullyManagedSearchableSnapshotCount), - "partially_mounted", - Map.of("count", oldPartiallyManagedSearchableSnapshotCount) - ) - ) - ) + ofEntries( + entry("reindex_required", true), + entry("total_backing_indices", oldIndexCount + newIndexCount), + entry("indices_requiring_upgrade_count", expectedIndices.size()), + entry("indices_requiring_upgrade", expectedIndices) ) ); + List issues = DeprecationChecks.filterChecks(DATA_STREAM_CHECKS, c -> c.apply(dataStream, clusterState)); + assertThat(issues, equalTo(singletonList(expected))); } + } diff --git a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecksTests.java b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecksTests.java index eee3be3596499..bafbcad057081 100644 --- a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecksTests.java +++ b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecksTests.java @@ -19,8 +19,8 @@ import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.engine.frozen.FrozenEngine; +import org.elasticsearch.snapshots.SearchableSnapshotsSettings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.deprecation.DeprecationIssue; @@ -29,6 +29,8 @@ import java.util.Map; import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; import static org.elasticsearch.xpack.deprecation.DeprecationChecks.INDEX_SETTINGS_CHECKS; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -49,10 +51,10 @@ public void testOldIndicesCheck() { DeprecationIssue expected = new DeprecationIssue( DeprecationIssue.Level.CRITICAL, "Old index with a compatibility version < 8.0", - "https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking-changes-9.0.html", + "https://www.elastic.co/guide/en/elasticsearch/reference/current/migrating-8.0.html#breaking-changes-8.0", "This index has version: " + createdWith.toReleaseVersion(), false, - null + singletonMap("reindex_required", true) ); List issues = DeprecationChecks.filterChecks(INDEX_SETTINGS_CHECKS, c -> c.apply(indexMetadata, clusterState)); assertEquals(singletonList(expected), issues); @@ -100,6 +102,20 @@ public void testOldIndicesCheckDataStreamIndex() { assertThat(issues.size(), equalTo(0)); } + public void testOldIndicesCheckSnapshotIgnored() { + IndexVersion createdWith = IndexVersion.fromId(7170099); + Settings.Builder settings = settings(createdWith); + settings.put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE); + IndexMetadata indexMetadata = IndexMetadata.builder("test").settings(settings).numberOfShards(1).numberOfReplicas(0).build(); + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .metadata(Metadata.builder().put(indexMetadata, true)) + .build(); + + List issues = DeprecationChecks.filterChecks(INDEX_SETTINGS_CHECKS, c -> c.apply(indexMetadata, clusterState)); + + assertThat(issues, empty()); + } + public void testTranslogRetentionSettings() { Settings.Builder settings = settings(IndexVersion.current()); settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomPositiveTimeValue()); @@ -229,7 +245,7 @@ public void testCamelCaseDeprecation() throws IOException { + "} }"; IndexMetadata simpleIndex = IndexMetadata.builder(randomAlphaOfLengthBetween(5, 10)) - .settings(settings(IndexVersions.V_7_0_0)) + .settings(settings(IndexVersion.current())) .numberOfShards(1) .numberOfReplicas(1) .putMapping(simpleMapping) diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java index fcb1037419b17..9e4cbb1082215 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java @@ -13,14 +13,10 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.FeatureFlag; import org.elasticsearch.features.NodeFeature; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.index.IndexVersions; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContent; @@ -43,24 +39,10 @@ public class ReindexDataStreamAction extends ActionType getOldIndexVersionPredicate(Metadata metadata) { - return index -> metadata.index(index).getCreationVersion().onOrBefore(MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE); - } - public enum Mode { UPGRADE } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java index 66b13a9ce22b0..38b5da6527039 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate; import java.util.Locale; import java.util.Map; @@ -78,13 +79,13 @@ protected void doExecute( IndexMetadata sourceIndex = clusterService.state().getMetadata().index(sourceIndexName); Settings settingsBefore = sourceIndex.getSettings(); - var hasOldVersion = ReindexDataStreamAction.getOldIndexVersionPredicate(clusterService.state().metadata()); + var hasOldVersion = DeprecatedIndexPredicate.getReindexRequiredPredicate(clusterService.state().metadata()); if (hasOldVersion.test(sourceIndex.getIndex()) == false) { logger.warn( "Migrating index [{}] with version [{}] is unnecessary as its version is not before [{}]", sourceIndexName, sourceIndex.getCreationVersion(), - ReindexDataStreamAction.MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE + DeprecatedIndexPredicate.MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE ); } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java index f011c429ce79c..cc648c1984544 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java @@ -26,8 +26,8 @@ import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTask; import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTaskParams; +import static org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate.getReindexRequiredPredicate; import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.TASK_ID_PREFIX; -import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate; /* * This transport action creates a new persistent task for reindexing the source data stream given in the request. On successful creation @@ -68,7 +68,7 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList return; } int totalIndices = dataStream.getIndices().size(); - int totalIndicesToBeUpgraded = (int) dataStream.getIndices().stream().filter(getOldIndexVersionPredicate(metadata)).count(); + int totalIndicesToBeUpgraded = (int) dataStream.getIndices().stream().filter(getReindexRequiredPredicate(metadata)).count(); ReindexDataStreamTaskParams params = new ReindexDataStreamTaskParams( sourceDataStreamName, transportService.getThreadPool().absoluteTimeInMillis(), diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java index 6a7bf3db440b0..2324d8f269958 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java @@ -24,7 +24,7 @@ import java.util.List; import java.util.Map; -import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate; +import static org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate.getReindexRequiredPredicate; public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExecutor { private static final TimeValue TASK_KEEP_ALIVE_TIME = TimeValue.timeValueDays(1); @@ -74,7 +74,7 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask if (dataStreamInfos.size() == 1) { List indices = dataStreamInfos.get(0).getDataStream().getIndices(); List indicesToBeReindexed = indices.stream() - .filter(getOldIndexVersionPredicate(clusterService.state().metadata())) + .filter(getReindexRequiredPredicate(clusterService.state().metadata())) .toList(); reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size()); for (Index index : indicesToBeReindexed) { From afb72ceb50d3c2f1c2ce1a2f1affae04368d08ed Mon Sep 17 00:00:00 2001 From: Bogdan Pintea Date: Thu, 19 Dec 2024 17:11:15 +0100 Subject: [PATCH 5/8] ESQL: Add a LicenseAware interface for licensed Nodes (#118931) (#119099) This adds a new interface that elements that require a proper license state can implement to enforce the license requirement. This can be now applied to any node or node property. The check still happens in the Verifier, since the plan needs to be analysed first and the check still only happens if no other verification faults exist already. Fixes #117405 --- docs/changelog/118931.yaml | 6 ++ .../core/expression/function/Function.java | 6 -- .../xpack/esql/LicenseAware.java | 15 ++++ .../xpack/esql/analysis/Verifier.java | 16 ++-- .../aggregate/SpatialAggregateFunction.java | 5 +- .../function/CheckLicenseTests.java | 86 ++++++++++++++----- 6 files changed, 98 insertions(+), 36 deletions(-) create mode 100644 docs/changelog/118931.yaml create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/LicenseAware.java diff --git a/docs/changelog/118931.yaml b/docs/changelog/118931.yaml new file mode 100644 index 0000000000000..81e9b3cb16521 --- /dev/null +++ b/docs/changelog/118931.yaml @@ -0,0 +1,6 @@ +pr: 118931 +summary: Add a `LicenseAware` interface for licensed Nodes +area: ES|QL +type: enhancement +issues: + - 117405 diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/function/Function.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/function/Function.java index a1afcdbf1f77c..cad5c631088f2 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/function/Function.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/function/Function.java @@ -6,7 +6,6 @@ */ package org.elasticsearch.xpack.esql.core.expression.function; -import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.core.expression.Nullability; @@ -43,11 +42,6 @@ public Nullability nullable() { return Expressions.nullable(children()); } - /** Return true if this function can be executed under the provided {@link XPackLicenseState}, otherwise false.*/ - public boolean checkLicense(XPackLicenseState state) { - return true; - } - @Override public int hashCode() { return Objects.hash(getClass(), children()); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/LicenseAware.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/LicenseAware.java new file mode 100644 index 0000000000000..04fcdb8a7c8e1 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/LicenseAware.java @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql; + +import org.elasticsearch.license.XPackLicenseState; + +public interface LicenseAware { + /** Return true if the implementer can be executed under the provided {@link XPackLicenseState}, otherwise false.*/ + boolean licenseCheck(XPackLicenseState state); +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java index 93e9d59ed8c6e..e146b517ad1c8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.esql.analysis; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.xpack.esql.LicenseAware; import org.elasticsearch.xpack.esql.action.EsqlCapabilities; import org.elasticsearch.xpack.esql.common.Failure; import org.elasticsearch.xpack.esql.core.capabilities.Unresolvable; @@ -26,6 +27,7 @@ import org.elasticsearch.xpack.esql.core.expression.predicate.logical.Not; import org.elasticsearch.xpack.esql.core.expression.predicate.logical.Or; import org.elasticsearch.xpack.esql.core.expression.predicate.operator.comparison.BinaryComparison; +import org.elasticsearch.xpack.esql.core.tree.Node; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute; import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction; @@ -209,7 +211,7 @@ else if (p instanceof Lookup lookup) { checkRemoteEnrich(plan, failures); if (failures.isEmpty()) { - checkLicense(plan, licenseState, failures); + licenseCheck(plan, failures); } // gather metrics @@ -587,11 +589,15 @@ private static void checkBinaryComparison(LogicalPlan p, Set failures) }); } - private void checkLicense(LogicalPlan plan, XPackLicenseState licenseState, Set failures) { - plan.forEachExpressionDown(Function.class, p -> { - if (p.checkLicense(licenseState) == false) { - failures.add(new Failure(p, "current license is non-compliant for function [" + p.sourceText() + "]")); + private void licenseCheck(LogicalPlan plan, Set failures) { + Consumer> licenseCheck = n -> { + if (n instanceof LicenseAware la && la.licenseCheck(licenseState) == false) { + failures.add(fail(n, "current license is non-compliant for [{}]", n.sourceText())); } + }; + plan.forEachDown(p -> { + licenseCheck.accept(p); + p.forEachExpression(Expression.class, licenseCheck); }); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SpatialAggregateFunction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SpatialAggregateFunction.java index 35f99e4b648df..f68f9f2487884 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SpatialAggregateFunction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SpatialAggregateFunction.java @@ -11,6 +11,7 @@ import org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference; import org.elasticsearch.license.License; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.xpack.esql.LicenseAware; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.tree.Source; @@ -24,7 +25,7 @@ * The AggregateMapper class will generate multiple aggregation functions for each combination, allowing the planner to * select the best one. */ -public abstract class SpatialAggregateFunction extends AggregateFunction { +public abstract class SpatialAggregateFunction extends AggregateFunction implements LicenseAware { protected final FieldExtractPreference fieldExtractPreference; protected SpatialAggregateFunction(Source source, Expression field, Expression filter, FieldExtractPreference fieldExtractPreference) { @@ -41,7 +42,7 @@ protected SpatialAggregateFunction(StreamInput in, FieldExtractPreference fieldE public abstract SpatialAggregateFunction withDocValues(); @Override - public boolean checkLicense(XPackLicenseState state) { + public boolean licenseCheck(XPackLicenseState state) { return switch (field().dataType()) { case GEO_SHAPE, CARTESIAN_SHAPE -> state.isAllowedByLicense(License.OperationMode.PLATINUM); default -> true; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/CheckLicenseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/CheckLicenseTests.java index 98f36d339976c..19af9892015b2 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/CheckLicenseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/CheckLicenseTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.license.internal.XPackLicenseStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.xpack.esql.LicenseAware; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.analysis.Analyzer; import org.elasticsearch.xpack.esql.analysis.AnalyzerContext; @@ -25,10 +26,12 @@ import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.parser.EsqlParser; +import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.stats.Metrics; import java.util.List; +import java.util.Objects; import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyzerDefaultMapping; import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultEnrichResolution; @@ -44,25 +47,28 @@ public void testLicense() { final LicensedFeature functionLicenseFeature = random().nextBoolean() ? LicensedFeature.momentary("test", "license", functionLicense) : LicensedFeature.persistent("test", "license", functionLicense); - final EsqlFunctionRegistry.FunctionBuilder builder = (source, expression, cfg) -> { - final LicensedFunction licensedFunction = new LicensedFunction(source); - licensedFunction.setLicensedFeature(functionLicenseFeature); - return licensedFunction; - }; for (License.OperationMode operationMode : License.OperationMode.values()) { if (License.OperationMode.TRIAL != operationMode && License.OperationMode.compare(operationMode, functionLicense) < 0) { // non-compliant license - final VerificationException ex = expectThrows(VerificationException.class, () -> analyze(builder, operationMode)); - assertThat(ex.getMessage(), containsString("current license is non-compliant for function [license()]")); + final VerificationException ex = expectThrows( + VerificationException.class, + () -> analyze(operationMode, functionLicenseFeature) + ); + assertThat(ex.getMessage(), containsString("current license is non-compliant for [license()]")); + assertThat(ex.getMessage(), containsString("current license is non-compliant for [LicensedLimit]")); } else { // compliant license - assertNotNull(analyze(builder, operationMode)); + assertNotNull(analyze(operationMode, functionLicenseFeature)); } } } } - private LogicalPlan analyze(EsqlFunctionRegistry.FunctionBuilder builder, License.OperationMode operationMode) { + private LogicalPlan analyze(License.OperationMode operationMode, LicensedFeature functionLicenseFeature) { + final EsqlFunctionRegistry.FunctionBuilder builder = (source, expression, cfg) -> new LicensedFunction( + source, + functionLicenseFeature + ); final FunctionDefinition def = EsqlFunctionRegistry.def(LicensedFunction.class, builder, "license"); final EsqlFunctionRegistry registry = new EsqlFunctionRegistry(def) { @Override @@ -70,7 +76,13 @@ public EsqlFunctionRegistry snapshotRegistry() { return this; } }; - return analyzer(registry, operationMode).analyze(parser.createStatement(esql)); + + var plan = parser.createStatement(esql); + plan = plan.transformDown( + Limit.class, + l -> Objects.equals(l.limit().fold(), 10) ? new LicensedLimit(l.source(), l.limit(), l.child(), functionLicenseFeature) : l + ); + return analyzer(registry, operationMode).analyze(plan); } private static Analyzer analyzer(EsqlFunctionRegistry registry, License.OperationMode operationMode) { @@ -88,25 +100,18 @@ private static XPackLicenseState getLicenseState(License.OperationMode operation // It needs to be public because we run validation on it via reflection in org.elasticsearch.xpack.esql.tree.EsqlNodeSubclassTests. // This test prevents to add the license as constructor parameter too. - public static class LicensedFunction extends Function { + public static class LicensedFunction extends Function implements LicenseAware { - private LicensedFeature licensedFeature; + private final LicensedFeature licensedFeature; - public LicensedFunction(Source source) { + public LicensedFunction(Source source, LicensedFeature licensedFeature) { super(source, List.of()); - } - - void setLicensedFeature(LicensedFeature licensedFeature) { this.licensedFeature = licensedFeature; } @Override - public boolean checkLicense(XPackLicenseState state) { - if (licensedFeature instanceof LicensedFeature.Momentary momentary) { - return momentary.check(state); - } else { - return licensedFeature.checkWithoutTracking(state); - } + public boolean licenseCheck(XPackLicenseState state) { + return checkLicense(state, licensedFeature); } @Override @@ -121,7 +126,7 @@ public Expression replaceChildren(List newChildren) { @Override protected NodeInfo info() { - return NodeInfo.create(this); + return NodeInfo.create(this, LicensedFunction::new, licensedFeature); } @Override @@ -135,4 +140,39 @@ public void writeTo(StreamOutput out) { } } + public static class LicensedLimit extends Limit implements LicenseAware { + + private final LicensedFeature licensedFeature; + + public LicensedLimit(Source source, Expression limit, LogicalPlan child, LicensedFeature licensedFeature) { + super(source, limit, child); + this.licensedFeature = licensedFeature; + } + + @Override + public boolean licenseCheck(XPackLicenseState state) { + return checkLicense(state, licensedFeature); + } + + @Override + public Limit replaceChild(LogicalPlan newChild) { + return new LicensedLimit(source(), limit(), newChild, licensedFeature); + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, LicensedLimit::new, limit(), child(), licensedFeature); + } + + @Override + public String sourceText() { + return "LicensedLimit"; + } + } + + private static boolean checkLicense(XPackLicenseState state, LicensedFeature licensedFeature) { + return licensedFeature instanceof LicensedFeature.Momentary momentary + ? momentary.check(state) + : licensedFeature.checkWithoutTracking(state); + } } From 7c50256a68eb54bc1896372d789739699ab191a8 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine <58790826+elasticsearchmachine@users.noreply.github.com> Date: Fri, 20 Dec 2024 03:22:37 +1100 Subject: [PATCH 6/8] Mute org.elasticsearch.xpack.esql.qa.single_node.EsqlClientYamlAsyncIT test {p0=esql/190_lookup_join/non-lookup index} #119111 --- muted-tests.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/muted-tests.yml b/muted-tests.yml index 44a3b8502350b..565481a3542df 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -433,3 +433,6 @@ tests: issue: https://github.com/elastic/elasticsearch/issues/119036 - class: org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT issue: https://github.com/elastic/elasticsearch/issues/118955 +- class: org.elasticsearch.xpack.esql.qa.single_node.EsqlClientYamlAsyncIT + method: test {p0=esql/190_lookup_join/non-lookup index} + issue: https://github.com/elastic/elasticsearch/issues/119111 From e6253c8ea02bac8362670490ab02be43ad5d06f6 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine <58790826+elasticsearchmachine@users.noreply.github.com> Date: Fri, 20 Dec 2024 03:22:49 +1100 Subject: [PATCH 7/8] Mute org.elasticsearch.xpack.esql.qa.single_node.EsqlClientYamlAsyncIT test {p0=esql/190_lookup_join/basic} #119112 --- muted-tests.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/muted-tests.yml b/muted-tests.yml index 565481a3542df..ec3d261a6a421 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -436,3 +436,6 @@ tests: - class: org.elasticsearch.xpack.esql.qa.single_node.EsqlClientYamlAsyncIT method: test {p0=esql/190_lookup_join/non-lookup index} issue: https://github.com/elastic/elasticsearch/issues/119111 +- class: org.elasticsearch.xpack.esql.qa.single_node.EsqlClientYamlAsyncIT + method: test {p0=esql/190_lookup_join/basic} + issue: https://github.com/elastic/elasticsearch/issues/119112 From d9578c58aa92f6b55efe1d84842376a15188206c Mon Sep 17 00:00:00 2001 From: Patrick Doyle <810052+prdoyle@users.noreply.github.com> Date: Thu, 19 Dec 2024 11:44:19 -0500 Subject: [PATCH 8/8] Process execution checks and IT tests (#119010) (#119106) * Process creation checks and IT tests * Remove process queries; only forbid execution --- .../bridge/EntitlementChecker.java | 7 ++++ .../common/RestEntitlementsCheckAction.java | 39 +++++++++++++------ .../EntitlementAllowedNonModularPlugin.java | 1 - .../qa/EntitlementAllowedPlugin.java | 1 - .../EntitlementDeniedNonModularPlugin.java | 1 - .../qa/EntitlementDeniedPlugin.java | 1 - .../api/ElasticsearchEntitlementChecker.java | 11 ++++++ .../runtime/policy/PolicyManager.java | 20 ++++++++++ 8 files changed, 66 insertions(+), 15 deletions(-) diff --git a/libs/entitlement/bridge/src/main/java/org/elasticsearch/entitlement/bridge/EntitlementChecker.java b/libs/entitlement/bridge/src/main/java/org/elasticsearch/entitlement/bridge/EntitlementChecker.java index a6b8a31fc3894..25f4e97bd12ee 100644 --- a/libs/entitlement/bridge/src/main/java/org/elasticsearch/entitlement/bridge/EntitlementChecker.java +++ b/libs/entitlement/bridge/src/main/java/org/elasticsearch/entitlement/bridge/EntitlementChecker.java @@ -11,6 +11,7 @@ import java.net.URL; import java.net.URLStreamHandlerFactory; +import java.util.List; public interface EntitlementChecker { @@ -29,4 +30,10 @@ public interface EntitlementChecker { void check$java_net_URLClassLoader$(Class callerClass, String name, URL[] urls, ClassLoader parent); void check$java_net_URLClassLoader$(Class callerClass, String name, URL[] urls, ClassLoader parent, URLStreamHandlerFactory factory); + + // Process creation + void check$$start(Class callerClass, ProcessBuilder that, ProcessBuilder.Redirect[] redirects); + + void check$java_lang_ProcessBuilder$startPipeline(Class callerClass, List builders); + } diff --git a/libs/entitlement/qa/common/src/main/java/org/elasticsearch/entitlement/qa/common/RestEntitlementsCheckAction.java b/libs/entitlement/qa/common/src/main/java/org/elasticsearch/entitlement/qa/common/RestEntitlementsCheckAction.java index 1ac4a7506eacb..3cc4b97e9bfea 100644 --- a/libs/entitlement/qa/common/src/main/java/org/elasticsearch/entitlement/qa/common/RestEntitlementsCheckAction.java +++ b/libs/entitlement/qa/common/src/main/java/org/elasticsearch/entitlement/qa/common/RestEntitlementsCheckAction.java @@ -29,43 +29,47 @@ import java.util.stream.Collectors; import static java.util.Map.entry; +import static org.elasticsearch.entitlement.qa.common.RestEntitlementsCheckAction.CheckAction.deniedToPlugins; +import static org.elasticsearch.entitlement.qa.common.RestEntitlementsCheckAction.CheckAction.forPlugins; import static org.elasticsearch.rest.RestRequest.Method.GET; public class RestEntitlementsCheckAction extends BaseRestHandler { private static final Logger logger = LogManager.getLogger(RestEntitlementsCheckAction.class); private final String prefix; - private record CheckAction(Runnable action, boolean isServerOnly) { - - static CheckAction serverOnly(Runnable action) { + record CheckAction(Runnable action, boolean isAlwaysDeniedToPlugins) { + /** + * These cannot be granted to plugins, so our test plugins cannot test the "allowed" case. + * Used both for always-denied entitlements as well as those granted only to the server itself. + */ + static CheckAction deniedToPlugins(Runnable action) { return new CheckAction(action, true); } - static CheckAction serverAndPlugin(Runnable action) { + static CheckAction forPlugins(Runnable action) { return new CheckAction(action, false); } } private static final Map checkActions = Map.ofEntries( - entry("runtime_exit", CheckAction.serverOnly(RestEntitlementsCheckAction::runtimeExit)), - entry("runtime_halt", CheckAction.serverOnly(RestEntitlementsCheckAction::runtimeHalt)), - entry("create_classloader", CheckAction.serverAndPlugin(RestEntitlementsCheckAction::createClassLoader)) + entry("runtime_exit", deniedToPlugins(RestEntitlementsCheckAction::runtimeExit)), + entry("runtime_halt", deniedToPlugins(RestEntitlementsCheckAction::runtimeHalt)), + entry("create_classloader", forPlugins(RestEntitlementsCheckAction::createClassLoader)), + // entry("processBuilder_start", deniedToPlugins(RestEntitlementsCheckAction::processBuilder_start)), + entry("processBuilder_startPipeline", deniedToPlugins(RestEntitlementsCheckAction::processBuilder_startPipeline)) ); @SuppressForbidden(reason = "Specifically testing Runtime.exit") private static void runtimeExit() { - logger.info("Calling Runtime.exit;"); Runtime.getRuntime().exit(123); } @SuppressForbidden(reason = "Specifically testing Runtime.halt") private static void runtimeHalt() { - logger.info("Calling Runtime.halt;"); Runtime.getRuntime().halt(123); } private static void createClassLoader() { - logger.info("Calling new URLClassLoader"); try (var classLoader = new URLClassLoader("test", new URL[0], RestEntitlementsCheckAction.class.getClassLoader())) { logger.info("Created URLClassLoader [{}]", classLoader.getName()); } catch (IOException e) { @@ -73,6 +77,18 @@ private static void createClassLoader() { } } + private static void processBuilder_start() { + // TODO: processBuilder().start(); + } + + private static void processBuilder_startPipeline() { + try { + ProcessBuilder.startPipeline(List.of()); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + public RestEntitlementsCheckAction(String prefix) { this.prefix = prefix; } @@ -80,7 +96,7 @@ public RestEntitlementsCheckAction(String prefix) { public static Set getServerAndPluginsCheckActions() { return checkActions.entrySet() .stream() - .filter(kv -> kv.getValue().isServerOnly() == false) + .filter(kv -> kv.getValue().isAlwaysDeniedToPlugins() == false) .map(Map.Entry::getKey) .collect(Collectors.toSet()); } @@ -112,6 +128,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli } return channel -> { + logger.info("Calling check action [{}]", actionName); checkAction.action().run(); channel.sendResponse(new RestResponse(RestStatus.OK, Strings.format("Succesfully executed action [%s]", actionName))); }; diff --git a/libs/entitlement/qa/entitlement-allowed-nonmodular/src/main/java/org/elasticsearch/entitlement/qa/nonmodular/EntitlementAllowedNonModularPlugin.java b/libs/entitlement/qa/entitlement-allowed-nonmodular/src/main/java/org/elasticsearch/entitlement/qa/nonmodular/EntitlementAllowedNonModularPlugin.java index d65981c30f0be..82146e6a87759 100644 --- a/libs/entitlement/qa/entitlement-allowed-nonmodular/src/main/java/org/elasticsearch/entitlement/qa/nonmodular/EntitlementAllowedNonModularPlugin.java +++ b/libs/entitlement/qa/entitlement-allowed-nonmodular/src/main/java/org/elasticsearch/entitlement/qa/nonmodular/EntitlementAllowedNonModularPlugin.java @@ -27,7 +27,6 @@ import java.util.function.Supplier; public class EntitlementAllowedNonModularPlugin extends Plugin implements ActionPlugin { - @Override public List getRestHandlers( final Settings settings, diff --git a/libs/entitlement/qa/entitlement-allowed/src/main/java/org/elasticsearch/entitlement/qa/EntitlementAllowedPlugin.java b/libs/entitlement/qa/entitlement-allowed/src/main/java/org/elasticsearch/entitlement/qa/EntitlementAllowedPlugin.java index d81e23e311be1..8649daf272e70 100644 --- a/libs/entitlement/qa/entitlement-allowed/src/main/java/org/elasticsearch/entitlement/qa/EntitlementAllowedPlugin.java +++ b/libs/entitlement/qa/entitlement-allowed/src/main/java/org/elasticsearch/entitlement/qa/EntitlementAllowedPlugin.java @@ -27,7 +27,6 @@ import java.util.function.Supplier; public class EntitlementAllowedPlugin extends Plugin implements ActionPlugin { - @Override public List getRestHandlers( final Settings settings, diff --git a/libs/entitlement/qa/entitlement-denied-nonmodular/src/main/java/org/elasticsearch/entitlement/qa/nonmodular/EntitlementDeniedNonModularPlugin.java b/libs/entitlement/qa/entitlement-denied-nonmodular/src/main/java/org/elasticsearch/entitlement/qa/nonmodular/EntitlementDeniedNonModularPlugin.java index 0f908d84260fb..7ca89c735a602 100644 --- a/libs/entitlement/qa/entitlement-denied-nonmodular/src/main/java/org/elasticsearch/entitlement/qa/nonmodular/EntitlementDeniedNonModularPlugin.java +++ b/libs/entitlement/qa/entitlement-denied-nonmodular/src/main/java/org/elasticsearch/entitlement/qa/nonmodular/EntitlementDeniedNonModularPlugin.java @@ -27,7 +27,6 @@ import java.util.function.Supplier; public class EntitlementDeniedNonModularPlugin extends Plugin implements ActionPlugin { - @Override public List getRestHandlers( final Settings settings, diff --git a/libs/entitlement/qa/entitlement-denied/src/main/java/org/elasticsearch/entitlement/qa/EntitlementDeniedPlugin.java b/libs/entitlement/qa/entitlement-denied/src/main/java/org/elasticsearch/entitlement/qa/EntitlementDeniedPlugin.java index 0ed27e2e576e7..2a2fd35d47cf3 100644 --- a/libs/entitlement/qa/entitlement-denied/src/main/java/org/elasticsearch/entitlement/qa/EntitlementDeniedPlugin.java +++ b/libs/entitlement/qa/entitlement-denied/src/main/java/org/elasticsearch/entitlement/qa/EntitlementDeniedPlugin.java @@ -27,7 +27,6 @@ import java.util.function.Supplier; public class EntitlementDeniedPlugin extends Plugin implements ActionPlugin { - @Override public List getRestHandlers( final Settings settings, diff --git a/libs/entitlement/src/main/java/org/elasticsearch/entitlement/runtime/api/ElasticsearchEntitlementChecker.java b/libs/entitlement/src/main/java/org/elasticsearch/entitlement/runtime/api/ElasticsearchEntitlementChecker.java index a5ca0543ad15a..75365fbb74d65 100644 --- a/libs/entitlement/src/main/java/org/elasticsearch/entitlement/runtime/api/ElasticsearchEntitlementChecker.java +++ b/libs/entitlement/src/main/java/org/elasticsearch/entitlement/runtime/api/ElasticsearchEntitlementChecker.java @@ -14,6 +14,7 @@ import java.net.URL; import java.net.URLStreamHandlerFactory; +import java.util.List; /** * Implementation of the {@link EntitlementChecker} interface, providing additional @@ -67,4 +68,14 @@ public ElasticsearchEntitlementChecker(PolicyManager policyManager) { ) { policyManager.checkCreateClassLoader(callerClass); } + + @Override + public void check$$start(Class callerClass, ProcessBuilder processBuilder, ProcessBuilder.Redirect[] redirects) { + policyManager.checkStartProcess(callerClass); + } + + @Override + public void check$java_lang_ProcessBuilder$startPipeline(Class callerClass, List builders) { + policyManager.checkStartProcess(callerClass); + } } diff --git a/libs/entitlement/src/main/java/org/elasticsearch/entitlement/runtime/policy/PolicyManager.java b/libs/entitlement/src/main/java/org/elasticsearch/entitlement/runtime/policy/PolicyManager.java index 74ba986041dac..e06f7768eb8be 100644 --- a/libs/entitlement/src/main/java/org/elasticsearch/entitlement/runtime/policy/PolicyManager.java +++ b/libs/entitlement/src/main/java/org/elasticsearch/entitlement/runtime/policy/PolicyManager.java @@ -105,6 +105,26 @@ private static Map> buildScopeEntitlementsMap(Policy p return policy.scopes.stream().collect(Collectors.toUnmodifiableMap(scope -> scope.name, scope -> scope.entitlements)); } + public void checkStartProcess(Class callerClass) { + neverEntitled(callerClass, "start process"); + } + + private void neverEntitled(Class callerClass, String operationDescription) { + var requestingModule = requestingModule(callerClass); + if (isTriviallyAllowed(requestingModule)) { + return; + } + + throw new NotEntitledException( + Strings.format( + "Not entitled: caller [%s], module [%s], operation [%s]", + callerClass, + requestingModule.getName(), + operationDescription + ) + ); + } + public void checkExitVM(Class callerClass) { checkEntitlementPresent(callerClass, ExitVMEntitlement.class); }