diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt index 82db894cc4..cc32aa958b 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt @@ -221,10 +221,12 @@ class DualExecutionRepository( override fun retrievePipelineExecutionDetailsForApplication( @Nonnull application: String, - pipelineConfigIds: List): Collection { + pipelineConfigIds: List, + queryTimeoutSeconds: Int + ): Collection { return ( - primary.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds) + - previous.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds) + primary.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds, queryTimeoutSeconds) + + previous.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds, queryTimeoutSeconds) ).distinctBy { it.id } } diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java index 6f2d65aeac..4cfe657aef 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java @@ -103,7 +103,9 @@ Collection retrieveAndFilterPipelineExecutionIdsForApplication( @Nonnull Collection retrievePipelineExecutionDetailsForApplication( - @Nonnull String application, @Nonnull List pipelineConfigIds); + @Nonnull String application, + @Nonnull List pipelineConfigIds, + int queryTimeoutSeconds); /** * Returns executions in the time boundary. Redis impl does not respect pageSize or offset params, diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt index 51c1a5cb4d..96ee40242d 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt @@ -297,7 +297,9 @@ class InMemoryExecutionRepository : ExecutionRepository { override fun retrievePipelineExecutionDetailsForApplication( application: String, - pipelineConfigIds: List): Collection { + pipelineConfigIds: List, + queryTimeoutSeconds: Int + ): Collection { return pipelines.values .filter { it.application == application && pipelineConfigIds.contains(it.pipelineConfigId) } .distinctBy { it.id } diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java index aef959dab8..bfc6070edb 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java @@ -501,7 +501,9 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List idsToDelet @Override public @Nonnull List retrievePipelineExecutionDetailsForApplication( - @Nonnull String application, @Nonnull List pipelineExecutionIds) { + @Nonnull String application, + @Nonnull List pipelineExecutionIds, + int queryTimeoutSeconds) { // TODO: not implemented yet - this method, at present, is primarily meant for the // SqlExecutionRepository // implementation. diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index 134fa6acb0..889e439507 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -553,7 +553,9 @@ class SqlExecutionRepository( */ override fun retrievePipelineExecutionDetailsForApplication( application: String, - pipelineExecutions: List): Collection { + pipelineExecutions: List, + queryTimeoutSeconds: Int + ): Collection { withPool(poolName) { val baseQuery = jooq.select(selectExecutionFields(compressionProperties)) .from( @@ -565,6 +567,7 @@ class SqlExecutionRepository( field("application").eq(application) .and(field("id").`in`(*pipelineExecutions.toTypedArray())) ) + .queryTimeout(queryTimeoutSeconds) // add an explicit timeout so that the query doesn't run forever .fetch() log.info("getting stage information for all the executions found so far") diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy index abfa53da2b..49c12bcbed 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy @@ -49,11 +49,9 @@ import java.nio.charset.Charset import java.time.Clock import java.time.ZoneOffset import java.util.concurrent.Callable -import java.util.concurrent.CancellationException import java.util.concurrent.Executors import java.util.concurrent.ExecutorService import java.util.concurrent.Future -import java.util.concurrent.TimeoutException import java.util.concurrent.TimeUnit import java.util.stream.Collectors @@ -889,7 +887,7 @@ class TaskController { * *

* 3. It then processes n pipeline executions at a time to retrieve the complete execution details. In addition, - * we make use of a configured thread pool so that multiple batches of n executions can be processed parallelly. + * we make use of a configured thread pool to process multiple batches of n executions in parallel. */ private List optimizedGetPipelineExecutions(String application, List front50PipelineConfigIds, ExecutionCriteria executionCriteria) { @@ -946,34 +944,33 @@ class TaskController { filteredPipelineExecutionIds .collate(this.configurationProperties.getMaxNumberOfPipelineExecutionsToProcess()) .each { List chunkedExecutions -> - futures.add( - executorService.submit({ - List result = executionRepository.retrievePipelineExecutionDetailsForApplication( - application, chunkedExecutions - ) - log.debug("completed execution retrieval for ${result.size()} executions") - return result - } as Callable>) - ) + futures.add(executorService.submit({ + try { + List result = executionRepository.retrievePipelineExecutionDetailsForApplication( + application, + chunkedExecutions, + this.configurationProperties.getExecutionRetrievalTimeoutSeconds() + ) + log.debug("completed execution retrieval for ${result.size()} executions") + return result + } catch (Exception e) { // handle exceptions such as query timeouts etc. + log.error("error occurred while retrieving these executions: ${chunkedExecutions.toString()} " + + "for application: ${application}.", e) + // in case of errors, this will return partial results. We are going with this best-effort approach + // because the UI keeps refreshing the executions view frequently. Hence, the user will eventually see + // these executions via one of the subsequent calls. Partial data is better than an exception at this + // point since the latter will result in a UI devoid of any executions. + // + return [] + } + } as Callable>)) } - futures.each { - Future> future -> - try { - finalResult.addAll( - future.get(this.configurationProperties.getExecutionRetrievalTimeoutSeconds(), TimeUnit.SECONDS) - ) - } catch (Exception e) { - // no need to fail the entire thing if one thread fails. This means the final output will simply not - // contain any of these failed executions. - log.error("Task failed with error", e) - } - } + futures.each { Future> future -> finalResult.addAll(future.get()) } return finalResult } finally { - // attempt to shutdown the executor service try { - executorService.shutdownNow() + executorService.shutdownNow() // attempt to shutdown the executor service } catch (Exception e) { log.error("shutting down the executor service failed", e) } diff --git a/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java index a2ff97691a..8407cb5598 100644 --- a/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java +++ b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java @@ -54,10 +54,10 @@ public class TaskControllerConfigurationProperties { int maxNumberOfPipelineExecutionsToProcess = 150; /** - * only applicable if optimizeExecutionRetrieval = true. No retrieval thread should take more than - * 60s to complete. + * only applicable if optimizeExecutionRetrieval = true. It specifies the max time after which the + * execution retrieval query will timeout. */ - long executionRetrievalTimeoutSeconds = 60; + int executionRetrievalTimeoutSeconds = 60; /** moved this to here. Earlier definition was in the {@link TaskController} class */ int daysOfExecutionHistory = 14; diff --git a/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt b/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt index dc85b3e881..7c13c15dc8 100644 --- a/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt +++ b/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt @@ -30,28 +30,31 @@ import com.netflix.spinnaker.orca.sql.pipeline.persistence.SqlExecutionRepositor import com.nhaarman.mockito_kotlin.mock import dev.minutest.junit.JUnit5Minutests import dev.minutest.rootContext +import org.jooq.exception.DataAccessException import org.jooq.impl.DSL.field import org.jooq.impl.DSL.table +import org.junit.Assert.assertThrows +import org.junit.jupiter.api.assertThrows import org.mockito.Mockito import org.springframework.test.web.servlet.MockMvc import org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get import org.springframework.test.web.servlet.setup.MockMvcBuilders +import strikt.api.expectCatching import strikt.api.expectThat +import strikt.assertions.isA import strikt.assertions.isEqualTo -import strikt.assertions.isTrue +import strikt.assertions.isFailure import java.time.Clock import java.time.Instant import java.time.ZoneId import java.time.temporal.ChronoUnit class TaskControllerTest : JUnit5Minutests { - data class Fixture(val optimizeExecution: Boolean, val timeout: Double = 60.0) { + data class Fixture(val optimizeExecution: Boolean) { private val clock: Clock = Clock.fixed(Instant.now(), ZoneId.systemDefault()) val database: SqlTestUtil.TestDatabase = SqlTestUtil.initTcMysqlDatabase()!! - - private val executionRepository: SqlExecutionRepository = SqlExecutionRepository( partitionName = "test", jooq = database.context, @@ -64,7 +67,6 @@ class TaskControllerTest : JUnit5Minutests { private val taskControllerConfigurationProperties: TaskControllerConfigurationProperties = TaskControllerConfigurationProperties() .apply { optimizeExecutionRetrieval = optimizeExecution - executionRetrievalTimeoutSeconds = timeout.toLong() } private val daysOfExecutionHistory: Long = taskControllerConfigurationProperties.daysOfExecutionHistory.toLong() @@ -238,28 +240,20 @@ class TaskControllerTest : JUnit5Minutests { } } - context("execution retrieval with optimization having timeouts") { + context("test query having explicit query timeouts") { fixture { - Fixture(true, 0.1) + Fixture(true) } before { setup() } after { cleanUp() } - test("retrieve executions with limit = 2 & expand = false") { - expectThat(database.context.fetchCount(table("pipelines"))).isEqualTo(5) - val response = subject.perform(get("/applications/test-app/pipelines?limit=2&expand=false")).andReturn().response - val results = OrcaObjectMapper.getInstance().readValue(response.contentAsString, object : TypeReference>() {}) - expectThat(results.isEmpty()).isTrue() - } - - test("retrieve executions with limit = 2 & expand = false with statuses") { - expectThat(database.context.fetchCount(table("pipelines"))).isEqualTo(5) - val response = subject.perform(get( - "/applications/test-app/pipelines?limit=2&expand=false&statuses=RUNNING,SUSPENDED,PAUSED,NOT_STARTED") - ).andReturn().response - val results = OrcaObjectMapper.getInstance().readValue(response.contentAsString, object : TypeReference>() {}) - expectThat(results.isEmpty()).isTrue() + test("it returns a DataAccessException on query timeout") { + expectCatching { + database.context.select(field("sleep(10)")).queryTimeout(1).execute() + } + .isFailure() + .isA() } } }