diff --git a/docs/reference/troubleshooting/common-issues/mapping-explosion.asciidoc b/docs/reference/troubleshooting/common-issues/mapping-explosion.asciidoc
index 5ba18df3e6a6b..fd1a31228c95f 100644
--- a/docs/reference/troubleshooting/common-issues/mapping-explosion.asciidoc
+++ b/docs/reference/troubleshooting/common-issues/mapping-explosion.asciidoc
@@ -24,7 +24,8 @@ reporting that the coordinating node is waiting for all other nodes to
 confirm they are on mapping update request.
 
 * Discover's **Fields for wildcard** page-loading API command or {kibana-ref}/console-kibana.html[Dev Tools] page-refreshing Autocomplete API commands are taking a long time (more than 10 seconds) or
-timing out in the browser's Developer Tools Network tab.
+timing out in the browser's Developer Tools Network tab. For more 
+information, refer to our https://www.elastic.co/blog/troubleshooting-guide-common-issues-kibana-discover-load[walkthrough on troubleshooting Discover].
 
 * Discover's **Available fields** taking a long time to compile Javascript in the browser's Developer Tools Performance tab. This may potentially escalate to temporary browser page unresponsiveness.
 
diff --git a/libs/x-content/impl/src/main/java/org/elasticsearch/xcontent/provider/json/JsonXContentGenerator.java b/libs/x-content/impl/src/main/java/org/elasticsearch/xcontent/provider/json/JsonXContentGenerator.java
index f22176930da64..09cbdf2d571cd 100644
--- a/libs/x-content/impl/src/main/java/org/elasticsearch/xcontent/provider/json/JsonXContentGenerator.java
+++ b/libs/x-content/impl/src/main/java/org/elasticsearch/xcontent/provider/json/JsonXContentGenerator.java
@@ -500,7 +500,7 @@ public void writeRawValue(InputStream stream, XContentType xContentType) throws
     public void writeRawValue(String value) throws IOException {
         try {
             if (supportsRawWrites()) {
-                generator.writeRaw(value);
+                generator.writeRawValue(value);
             } else {
                 // fallback to a regular string for formats that don't allow writing the value as is
                 generator.writeString(value);
diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java
index 392e157f60952..515992e18d62d 100644
--- a/server/src/main/java/org/elasticsearch/TransportVersions.java
+++ b/server/src/main/java/org/elasticsearch/TransportVersions.java
@@ -140,6 +140,7 @@ static TransportVersion def(int id) {
     public static final TransportVersion ESQL_ENRICH_OPERATOR_STATUS = def(8_600_00_0);
     public static final TransportVersion ESQL_SERIALIZE_ARRAY_VECTOR = def(8_601_00_0);
     public static final TransportVersion ESQL_SERIALIZE_ARRAY_BLOCK = def(8_602_00_0);
+    public static final TransportVersion ADD_DATA_STREAM_GLOBAL_RETENTION = def(8_603_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,
diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java
new file mode 100644
index 0000000000000..f3b88ba6083c3
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster.metadata;
+
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.TransportVersions;
+import org.elasticsearch.cluster.AbstractNamedDiffable;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.NamedDiff;
+import org.elasticsearch.common.collect.Iterators;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.xcontent.ParseField;
+import org.elasticsearch.xcontent.ToXContent;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Objects;
+
+/**
+ * A cluster state entry that contains global retention settings that are configurable by the user. These settings include:
+ * - default retention, applied on any data stream managed by DSL that does not have an explicit retention defined
+ * - max retention, applied on every data stream managed by DSL
+ */
+public final class DataStreamGlobalRetention extends AbstractNamedDiffable<ClusterState.Custom> implements ClusterState.Custom {
+
+    public static final String TYPE = "data-stream-global-retention";
+
+    public static final ParseField DEFAULT_RETENTION_FIELD = new ParseField("default_retention");
+    public static final ParseField MAX_RETENTION_FIELD = new ParseField("max_retention");
+
+    public static final DataStreamGlobalRetention EMPTY = new DataStreamGlobalRetention(null, null);
+
+    @Nullable
+    private final TimeValue defaultRetention;
+    @Nullable
+    private final TimeValue maxRetention;
+
+    /**
+     * @param defaultRetention the default retention or null if it's undefined
+     * @param maxRetention the max retention or null if it's undefined
+     * @throws IllegalArgumentException when the default retention is greater than the max retention.
+     */
+    public DataStreamGlobalRetention(TimeValue defaultRetention, TimeValue maxRetention) {
+        if (defaultRetention != null && maxRetention != null && defaultRetention.getMillis() > maxRetention.getMillis()) {
+            throw new IllegalArgumentException(
+                "Default global retention ["
+                    + defaultRetention.getStringRep()
+                    + "] cannot be greater than the max global retention ["
+                    + maxRetention.getStringRep()
+                    + "]."
+            );
+        }
+        this.defaultRetention = defaultRetention;
+        this.maxRetention = maxRetention;
+    }
+
+    public static DataStreamGlobalRetention read(StreamInput in) throws IOException {
+        return new DataStreamGlobalRetention(in.readOptionalTimeValue(), in.readOptionalTimeValue());
+    }
+
+    @Override
+    public String getWriteableName() {
+        return TYPE;
+    }
+
+    @Override
+    public TransportVersion getMinimalSupportedVersion() {
+        return TransportVersions.ADD_DATA_STREAM_GLOBAL_RETENTION;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeOptionalTimeValue(defaultRetention);
+        out.writeOptionalTimeValue(maxRetention);
+    }
+
+    public static NamedDiff<ClusterState.Custom> readDiffFrom(StreamInput in) throws IOException {
+        return readDiffFrom(ClusterState.Custom.class, TYPE, in);
+    }
+
+    @Override
+    public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
+        return Iterators.single(this::toXContentFragment);
+    }
+
+    /**
+     * Adds to the XContentBuilder the two fields when they are not null.
+     */
+    public XContentBuilder toXContentFragment(XContentBuilder builder, ToXContent.Params params) throws IOException {
+        if (defaultRetention != null) {
+            builder.field(DEFAULT_RETENTION_FIELD.getPreferredName(), defaultRetention.getStringRep());
+        }
+        if (maxRetention != null) {
+            builder.field(MAX_RETENTION_FIELD.getPreferredName(), maxRetention.getStringRep());
+        }
+        return builder;
+    }
+
+    /**
+     * Returns the metadata found in the cluster state or null.
+     */
+    public static DataStreamGlobalRetention getFromClusterState(ClusterState clusterState) {
+        return clusterState.custom(DataStreamGlobalRetention.TYPE);
+    }
+
+    @Nullable
+    public TimeValue getDefaultRetention() {
+        return defaultRetention;
+    }
+
+    @Nullable
+    public TimeValue getMaxRetention() {
+        return maxRetention;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        DataStreamGlobalRetention that = (DataStreamGlobalRetention) o;
+        return Objects.equals(defaultRetention, that.defaultRetention) && Objects.equals(maxRetention, that.maxRetention);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(defaultRetention, maxRetention);
+    }
+
+    @Override
+    public String toString() {
+        return "DataStreamGlobalRetention{"
+            + "defaultRetention="
+            + (defaultRetention == null ? "null" : defaultRetention.getStringRep())
+            + ", maxRetention="
+            + (maxRetention == null ? "null" : maxRetention.getStringRep())
+            + '}';
+    }
+}
diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSerializationTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSerializationTests.java
new file mode 100644
index 0000000000000..8c3d36464784e
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSerializationTests.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster.metadata;
+
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.Diff;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
+import org.elasticsearch.test.SimpleDiffableWireSerializationTestCase;
+
+import java.util.List;
+
+public class DataStreamGlobalRetentionSerializationTests extends SimpleDiffableWireSerializationTestCase<ClusterState.Custom> {
+
+    @Override
+    protected ClusterState.Custom makeTestChanges(ClusterState.Custom testInstance) {
+        if (randomBoolean()) {
+            return testInstance;
+        }
+        return mutateInstance(testInstance);
+    }
+
+    @Override
+    protected Writeable.Reader<Diff<ClusterState.Custom>> diffReader() {
+        return DataStreamGlobalRetention::readDiffFrom;
+    }
+
+    @Override
+    protected Writeable.Reader<ClusterState.Custom> instanceReader() {
+        return DataStreamGlobalRetention::read;
+    }
+
+    @Override
+    protected NamedWriteableRegistry getNamedWriteableRegistry() {
+        return new NamedWriteableRegistry(
+            List.of(
+                new NamedWriteableRegistry.Entry(ClusterState.Custom.class, DataStreamGlobalRetention.TYPE, DataStreamGlobalRetention::read)
+            )
+        );
+    }
+
+    @Override
+    protected ClusterState.Custom createTestInstance() {
+        return randomGlobalRetention();
+    }
+
+    @Override
+    protected ClusterState.Custom mutateInstance(ClusterState.Custom instance) {
+        DataStreamGlobalRetention metadata = (DataStreamGlobalRetention) instance;
+        var defaultRetention = metadata.getDefaultRetention();
+        var maxRetention = metadata.getMaxRetention();
+        switch (randomInt(1)) {
+            case 0 -> {
+                if (defaultRetention == null) {
+                    defaultRetention = TimeValue.timeValueDays(randomIntBetween(1, 1000));
+                } else {
+                    defaultRetention = randomBoolean() ? null : TimeValue.timeValueDays(randomIntBetween(1, 1000));
+                }
+            }
+            case 1 -> {
+                if (maxRetention == null) {
+                    maxRetention = TimeValue.timeValueDays(randomIntBetween(1000, 2000));
+                } else {
+                    maxRetention = randomBoolean() ? null : TimeValue.timeValueDays(randomIntBetween(1000, 2000));
+                }
+            }
+        }
+        return new DataStreamGlobalRetention(defaultRetention, maxRetention);
+    }
+
+    public static DataStreamGlobalRetention randomGlobalRetention() {
+        return new DataStreamGlobalRetention(
+            randomBoolean() ? null : TimeValue.timeValueDays(randomIntBetween(1, 1000)),
+            randomBoolean() ? null : TimeValue.timeValueDays(randomIntBetween(1000, 2000))
+        );
+    }
+
+    public void testChunking() {
+        AbstractChunkedSerializingTestCase.assertChunkCount(createTestInstance(), ignored -> 1);
+    }
+
+    public void testValidation() {
+        expectThrows(
+            IllegalArgumentException.class,
+            () -> new DataStreamGlobalRetention(
+                TimeValue.timeValueDays(randomIntBetween(1001, 2000)),
+                TimeValue.timeValueDays(randomIntBetween(1, 1000))
+            )
+        );
+    }
+}
diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java
index 6520e3d0f68bd..a841e9b4304b3 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java
@@ -1103,8 +1103,10 @@ protected static void wipeAllIndices() throws IOException {
     protected static void wipeAllIndices(boolean preserveSecurityIndices) throws IOException {
         boolean includeHidden = clusterHasFeature(RestTestLegacyFeatures.HIDDEN_INDICES_SUPPORTED);
         try {
-            // remove all indices except ilm and slm history which can pop up after deleting all data streams but shouldn't interfere
-            final List<String> indexPatterns = new ArrayList<>(List.of("*", "-.ds-ilm-history-*", "-.ds-.slm-history-*"));
+            // remove all indices except some history indices which can pop up after deleting all data streams but shouldn't interfere
+            final List<String> indexPatterns = new ArrayList<>(
+                List.of("*", "-.ds-ilm-history-*", "-.ds-.slm-history-*", ".ds-.watcher-history-*")
+            );
             if (preserveSecurityIndices) {
                 indexPatterns.add("-.security-*");
             }
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingService.java
index 5994c61f46297..5869f353c80c9 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingService.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingService.java
@@ -647,7 +647,7 @@ private void handleLoadSuccess(
             // Also, if the consumer is a search consumer, we should always cache it
             if (referencedModels.contains(modelId)
                 || Sets.haveNonEmptyIntersection(modelIdToModelAliases.getOrDefault(modelId, new HashSet<>()), referencedModels)
-                || consumer.equals(Consumer.SEARCH_AGGS)) {
+                || consumer.isAnyOf(Consumer.SEARCH_AGGS, Consumer.SEARCH_RESCORER)) {
                 try {
                     // The local model may already be in cache. If it is, we don't bother adding it to cache.
                     // If it isn't, we flip an `isLoaded` flag, and increment the model counter to make sure if it is evicted
@@ -810,7 +810,8 @@ public void clusterChanged(ClusterChangedEvent event) {
                 );
                 if (oldModelAliasesNotReferenced && newModelAliasesNotReferenced && modelIsNotReferenced) {
                     ModelAndConsumer modelAndConsumer = localModelCache.get(modelId);
-                    if (modelAndConsumer != null && modelAndConsumer.consumers.contains(Consumer.SEARCH_AGGS) == false) {
+                    if (modelAndConsumer != null
+                        && modelAndConsumer.consumers.stream().noneMatch(c -> c.isAnyOf(Consumer.SEARCH_AGGS, Consumer.SEARCH_RESCORER))) {
                         logger.trace("[{} ({})] invalidated from cache", modelId, modelAliasOrId);
                         localModelCache.invalidate(modelId);
                     }
diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingServiceTests.java
index 40b0dd519f7d8..bab292671c0bc 100644
--- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingServiceTests.java
+++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/loadingservice/ModelLoadingServiceTests.java
@@ -43,7 +43,9 @@
 import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
 import org.elasticsearch.xpack.core.ml.inference.TrainedModelInput;
 import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ClassificationConfig;
+import org.elasticsearch.xpack.core.ml.inference.trainedmodel.InferenceConfig;
 import org.elasticsearch.xpack.core.ml.inference.trainedmodel.InferenceStats;
+import org.elasticsearch.xpack.core.ml.inference.trainedmodel.LearningToRankConfig;
 import org.elasticsearch.xpack.core.ml.inference.trainedmodel.inference.InferenceDefinition;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.ml.MachineLearning;
@@ -424,6 +426,34 @@ public void testGetModelForSearch() throws Exception {
         verify(trainedModelStatsService, never()).queueStats(any(InferenceStats.class), anyBoolean());
     }
 
+    public void testGetModelForLearningToRank() throws Exception {
+        String modelId = "test-get-model-for-ltr";
+        withTrainedModel(modelId, 1L, LearningToRankConfig.EMPTY_PARAMS);
+
+        ModelLoadingService modelLoadingService = new ModelLoadingService(
+            trainedModelProvider,
+            auditor,
+            threadPool,
+            clusterService,
+            trainedModelStatsService,
+            Settings.EMPTY,
+            "test-node",
+            circuitBreaker,
+            mock(XPackLicenseState.class)
+        );
+
+        for (int i = 0; i < 3; i++) {
+            PlainActionFuture<LocalModel> future = new PlainActionFuture<>();
+            modelLoadingService.getModelForLearningToRank(modelId, future);
+            assertThat(future.get(), is(not(nullValue())));
+        }
+
+        assertTrue(modelLoadingService.isModelCached(modelId));
+
+        verify(trainedModelProvider, times(1)).getTrainedModelForInference(eq(modelId), eq(false), any());
+        verify(trainedModelStatsService, never()).queueStats(any(InferenceStats.class), anyBoolean());
+    }
+
     public void testCircuitBreakerBreak() throws Exception {
         String model1 = "test-circuit-break-model-1";
         String model2 = "test-circuit-break-model-2";
@@ -656,13 +686,17 @@ public void testAliasesGetUpdatedEvenWhenNotIngestNode() throws IOException {
         assertThat(modelLoadingService.getModelId("loaded_model_again"), equalTo(model1));
     }
 
-    @SuppressWarnings("unchecked")
     private void withTrainedModel(String modelId, long size) {
+        withTrainedModel(modelId, size, ClassificationConfig.EMPTY_PARAMS);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void withTrainedModel(String modelId, long size, InferenceConfig inferenceConfig) {
         InferenceDefinition definition = mock(InferenceDefinition.class);
         when(definition.ramBytesUsed()).thenReturn(size);
         TrainedModelConfig trainedModelConfig = mock(TrainedModelConfig.class);
         when(trainedModelConfig.getModelId()).thenReturn(modelId);
-        when(trainedModelConfig.getInferenceConfig()).thenReturn(ClassificationConfig.EMPTY_PARAMS);
+        when(trainedModelConfig.getInferenceConfig()).thenReturn(inferenceConfig);
         when(trainedModelConfig.getInput()).thenReturn(new TrainedModelInput(Arrays.asList("foo", "bar", "baz")));
         when(trainedModelConfig.getModelSize()).thenReturn(size);
         doAnswer(invocationOnMock -> {
diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/profiling/10_basic.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/profiling/10_basic.yml
index 1e0c260a70e4f..367655ba89388 100644
--- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/profiling/10_basic.yml
+++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/profiling/10_basic.yml
@@ -166,9 +166,6 @@ teardown:
 
 ---
 "Test flamegraph from profiling-events":
-  - skip:
-      reason: "https://github.com/elastic/elasticsearch/issues/106103"
-      version: "all"
   - do:
       profiling.flamegraph:
         body: >
@@ -195,9 +192,6 @@ teardown:
 
 ---
 "Test flamegraph from test-events":
-  - skip:
-      reason: "https://github.com/elastic/elasticsearch/issues/106103"
-      version: "all"
   - do:
       profiling.flamegraph:
         body: >
diff --git a/x-pack/plugin/watcher/src/internalClusterTest/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java b/x-pack/plugin/watcher/src/internalClusterTest/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java
index 4a3bcca3acb85..e5f4091ca89eb 100644
--- a/x-pack/plugin/watcher/src/internalClusterTest/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java
+++ b/x-pack/plugin/watcher/src/internalClusterTest/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java
@@ -37,7 +37,6 @@ protected boolean timeWarped() {
         return false;
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/105951")
     public void testHistoryOnRejection() throws Exception {
         createIndex("idx");
         prepareIndex("idx").setSource("field", "a").get();
@@ -73,11 +72,11 @@ public void testHistoryOnRejection() throws Exception {
 
     @Override
     protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
-
         return Settings.builder()
             .put(super.nodeSettings(nodeOrdinal, otherSettings))
             .put(XPackSettings.SECURITY_ENABLED.getKey(), false)
             .put(LicenseSettings.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial")
+            .put("xpack.watcher.thread_pool.size", 1)
             .put("xpack.watcher.thread_pool.queue_size", 0)
             .build();
     }