From fa088e3932bafc13d7fb3a9dfafba2257219505d Mon Sep 17 00:00:00 2001 From: kirangodishala Date: Fri, 18 Jun 2021 07:08:50 +0530 Subject: [PATCH] fix(sql): add correct indexes refactor function and variable names to make them more descriptive --- .../persistence/DualExecutionRepository.kt | 18 ++++--- .../persistence/ExecutionRepository.java | 4 +- .../InMemoryExecutionRepository.kt | 15 +++--- .../jedis/RedisExecutionRepository.java | 7 ++- .../persistence/SqlExecutionRepository.kt | 47 +++++++++-------- .../orca/controllers/TaskController.groovy | 51 ++++++++++--------- ...TaskControllerConfigurationProperties.java | 14 ++--- 7 files changed, 85 insertions(+), 71 deletions(-) diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt index 20be61abc9..82db894cc4 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt @@ -210,19 +210,21 @@ class DualExecutionRepository( ).distinct { it.id } } - override fun filterPipelineExecutionsForApplication(@Nonnull application: String, - @Nonnull pipelineConfigIds: List, - @Nonnull criteria: ExecutionCriteria): List{ - return primary.filterPipelineExecutionsForApplication(application, pipelineConfigIds, criteria) + - previous.filterPipelineExecutionsForApplication(application,pipelineConfigIds, criteria) + override fun retrieveAndFilterPipelineExecutionIdsForApplication( + @Nonnull application: String, + @Nonnull pipelineConfigIds: List, + @Nonnull criteria: ExecutionCriteria + ): List { + return primary.retrieveAndFilterPipelineExecutionIdsForApplication(application, pipelineConfigIds, criteria) + + previous.retrieveAndFilterPipelineExecutionIdsForApplication(application, pipelineConfigIds, criteria) } - override fun retrievePipelineExecutionsDetailsForApplication( + override fun retrievePipelineExecutionDetailsForApplication( @Nonnull application: String, pipelineConfigIds: List): Collection { return ( - primary.retrievePipelineExecutionsDetailsForApplication(application, pipelineConfigIds) + - previous.retrievePipelineExecutionsDetailsForApplication(application,pipelineConfigIds) + primary.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds) + + previous.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds) ).distinctBy { it.id } } diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java index 43b6ee04f7..6f2d65aeac 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java @@ -96,13 +96,13 @@ Observable retrievePipelinesForPipelineConfigId( Collection retrievePipelineConfigIdsForApplication(@Nonnull String application); @Nonnull - Collection filterPipelineExecutionsForApplication( + Collection retrieveAndFilterPipelineExecutionIdsForApplication( @Nonnull String application, @Nonnull List pipelineConfigIds, @Nonnull ExecutionCriteria criteria); @Nonnull - Collection retrievePipelineExecutionsDetailsForApplication( + Collection retrievePipelineExecutionDetailsForApplication( @Nonnull String application, @Nonnull List pipelineConfigIds); /** diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt index 1537a4f71a..51c1a5cb4d 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt @@ -279,21 +279,23 @@ class InMemoryExecutionRepository : ExecutionRepository { override fun retrievePipelineConfigIdsForApplication(application: String): List { return pipelines.values - .filter { it.application == application } + .filter { it.application == application } .map { it.pipelineConfigId } .distinct() } - override fun filterPipelineExecutionsForApplication(@Nonnull application: String, - @Nonnull pipelineConfigIds: List, - @Nonnull criteria: ExecutionCriteria): List { + override fun retrieveAndFilterPipelineExecutionIdsForApplication( + @Nonnull application: String, + @Nonnull pipelineConfigIds: List, + @Nonnull criteria: ExecutionCriteria + ): List { return pipelines.values - .filter { it.application == application && pipelineConfigIds.contains(it.pipelineConfigId) } + .filter { it.application == application && pipelineConfigIds.contains(it.pipelineConfigId) } .applyCriteria(criteria) .map { it.id } } - override fun retrievePipelineExecutionsDetailsForApplication( + override fun retrievePipelineExecutionDetailsForApplication( application: String, pipelineConfigIds: List): Collection { return pipelines.values @@ -301,7 +303,6 @@ class InMemoryExecutionRepository : ExecutionRepository { .distinctBy { it.id } } - override fun retrieveOrchestrationForCorrelationId(correlationId: String): PipelineExecution { return retrieveByCorrelationId(ORCHESTRATION, correlationId) } diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java index e77deed8a8..aef959dab8 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java @@ -55,7 +55,6 @@ import javax.annotation.Nonnull; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -490,7 +489,7 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List idsToDelet } @Override - public @Nonnull List filterPipelineExecutionsForApplication( + public @Nonnull List retrieveAndFilterPipelineExecutionIdsForApplication( @Nonnull String application, @Nonnull List pipelineConfigIds, @Nonnull ExecutionCriteria criteria) { @@ -501,8 +500,8 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List idsToDelet } @Override - public @NotNull List retrievePipelineExecutionsDetailsForApplication( - @Nonnull String application, @NotNull List pipelineExecutionIds) { + public @Nonnull List retrievePipelineExecutionDetailsForApplication( + @Nonnull String application, @Nonnull List pipelineExecutionIds) { // TODO: not implemented yet - this method, at present, is primarily meant for the // SqlExecutionRepository // implementation. diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index 243e3de5b5..134fa6acb0 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -453,14 +453,14 @@ class SqlExecutionRepository( override fun retrievePipelineConfigIdsForApplication(application: String): List = withPool(poolName) { - return jooq.selectDistinct(field("config_id")) - .from(PIPELINE.tableName) // not adding index here as it slowed down the query - .where(field("application").eq(application)) - .fetch(0, String::class.java) + return jooq.selectDistinct(field("config_id")) + .from(PIPELINE.tableName) + .where(field("application").eq(application)) + .fetch(0, String::class.java) } /** - * this function supports the following ExecutionCriteria currently: + * this function supports the following ExecutionCriteria currently: * 'limit', a.k.a page size and * 'statuses'. * @@ -470,13 +470,13 @@ class SqlExecutionRepository( * It does this by executing the following query: * - If the execution criteria does not contain any statuses: * SELECT config_id, id - FROM pipelines force index (`pipeline_config_id_idx`) + FROM pipelines force index (`pipeline_application_idx`) WHERE application = "myapp" ORDER BY config_id; * - If the execution criteria contains statuses: * SELECT config_id, id - FROM pipelines force index (`pipeline_config_id_idx`) + FROM pipelines force index (`pipeline_application_status_starttime_idx`) WHERE ( application = "myapp" and status in ("status1", "status2) @@ -485,33 +485,37 @@ class SqlExecutionRepository( config_id; * It then applies the limit execution criteria on the result set obtained above. We observed load issues in the db - * when running a query where the limit was calculated in the query itself . Thereforce, we are moving that logic to + * when running a query where the limit was calculated in the query itself. Therefore, we are moving that logic to * the code below to ease the burden on the db in such circumstances. */ - override fun filterPipelineExecutionsForApplication(application: String, - pipelineConfigIds: List, - criteria: ExecutionCriteria): List { + override fun retrieveAndFilterPipelineExecutionIdsForApplication( + application: String, + pipelineConfigIds: List, + criteria: ExecutionCriteria + ): List { // baseQueryPredicate for the flow where there are no statuses in the execution criteria var baseQueryPredicate = field("application").eq(application) .and(field("config_id").`in`(*pipelineConfigIds.toTypedArray())) + var table = if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_idx") + else PIPELINE.tableName // baseQueryPredicate for the flow with statuses if (criteria.statuses.isNotEmpty() && criteria.statuses.size != ExecutionStatus.values().size) { val statusStrings = criteria.statuses.map { it.toString() } baseQueryPredicate = baseQueryPredicate .and(field("status").`in`(*statusStrings.toTypedArray())) + + table = if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_status_starttime_idx") + else PIPELINE.tableName } val finalResult: MutableList = mutableListOf() log.info("getting execution ids") withPool(poolName) { - val baseQuery = jooq.select(field("config_id"), field("id")) - .from( - if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_config_id_idx") - else PIPELINE.tableName - ) + val baseQuery = jooq.select(field("config_id"), field("id")) + .from(table) .where(baseQueryPredicate) .orderBy(field("config_id")) .fetch().intoGroups("config_id", "id") @@ -536,7 +540,7 @@ class SqlExecutionRepository( * It executes the following query to get execution details for n executions at a time in a specific application * * SELECT id, body, compressed_body, compression_type, `partition` - FROM pipelines + FROM pipelines force index (`pipeline_application_idx`) left outer join pipelines_compressed_executions using (`id`) @@ -545,14 +549,17 @@ class SqlExecutionRepository( id in ('id1', 'id2', 'id3') ); * - * it then get all the stage information for all the executions returned from the above query. + * it then gets all the stage information for all the executions returned from the above query. */ - override fun retrievePipelineExecutionsDetailsForApplication( + override fun retrievePipelineExecutionDetailsForApplication( application: String, pipelineExecutions: List): Collection { withPool(poolName) { val baseQuery = jooq.select(selectExecutionFields(compressionProperties)) - .from(PIPELINE.tableName) + .from( + if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_idx") + else PIPELINE.tableName + ) .leftOuterJoin(PIPELINE.tableName.compressedExecTable).using(field("id")) .where( field("application").eq(application) diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy index f00acfa40e..abfa53da2b 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy @@ -612,16 +612,17 @@ class TaskController { log.info("received ${pipelineConfigIds.size()} pipelines for application: $application from front50") def strategyConfigIds = front50Service.getStrategies(application)*.id as List log.info("received ${strategyConfigIds.size()} strategies for application: $application from front50") - def allFront50Ids = pipelineConfigIds + strategyConfigIds + + def allFront50PipelineConfigIds = pipelineConfigIds + strategyConfigIds List allPipelineExecutions = [] if (this.configurationProperties.getOptimizeExecutionRetrieval()) { allPipelineExecutions.addAll( - optimizedGetPipelineExecutions(application, allFront50Ids, executionCriteria) + optimizedGetPipelineExecutions(application, allFront50PipelineConfigIds, executionCriteria) ) } else { - allPipelineExecutions = rx.Observable.merge(allFront50Ids.collect { + allPipelineExecutions = rx.Observable.merge(allFront50PipelineConfigIds.collect { log.debug("processing pipeline config id: $it") executionRepository.retrievePipelinesForPipelineConfigId(it, executionCriteria) }).subscribeOn(Schedulers.io()).toList().toBlocking().single() @@ -897,32 +898,34 @@ class TaskController { "${this.configurationProperties.getMaxExecutionRetrievalThreads()} threads and processing" + " ${this.configurationProperties.getMaxNumberOfPipelineExecutionsToProcess()} pipeline executions at a time") - List commonIdsInFront50AndOrca + List commonPipelineConfigIdsInFront50AndOrca try { - List allOrcaIds = executionRepository.retrievePipelineConfigIdsForApplication(application) - log.info("found ${allOrcaIds.size()} pipeline config ids for application: $application in orca") - commonIdsInFront50AndOrca = front50PipelineConfigIds.intersect(allOrcaIds) - log.info("found ${commonIdsInFront50AndOrca.size()} pipeline config ids that are common in orca and front50 " + - "for application: $application." + - " Saved ${front50PipelineConfigIds.size() - commonIdsInFront50AndOrca.size()} extra pipeline config id queries") + List allOrcaPipelineConfigIds = executionRepository.retrievePipelineConfigIdsForApplication(application) + log.info("found ${allOrcaPipelineConfigIds.size()} pipeline config ids for application: $application in orca") + commonPipelineConfigIdsInFront50AndOrca = front50PipelineConfigIds.intersect(allOrcaPipelineConfigIds) + log.info("found ${commonPipelineConfigIdsInFront50AndOrca.size()} pipeline config ids that are common in orca " + + "and front50 for application: $application. " + + "Saved ${front50PipelineConfigIds.size() - commonPipelineConfigIdsInFront50AndOrca.size()} extra pipeline " + + "config id queries") } catch (Exception e) { log.warn("retrieving pipeline config ids from orca db failed. using the result obtained from front50 ", e) - commonIdsInFront50AndOrca = front50PipelineConfigIds + commonPipelineConfigIdsInFront50AndOrca = front50PipelineConfigIds } - if (commonIdsInFront50AndOrca.size() == 0 ) { - log.info("no pipelines found") + if (commonPipelineConfigIdsInFront50AndOrca.size() == 0 ) { + log.info("no pipeline config ids found.") return finalResult } // get complete list of executions based on the execution criteria log.info("filtering pipeline executions based on the execution criteria: " + "limit: ${executionCriteria.getPageSize()}, statuses: ${executionCriteria.getStatuses()}") - List filteredPipelineExecutions = executionRepository.filterPipelineExecutionsForApplication(application, - commonIdsInFront50AndOrca, + List filteredPipelineExecutionIds = executionRepository.retrieveAndFilterPipelineExecutionIdsForApplication( + application, + commonPipelineConfigIdsInFront50AndOrca, executionCriteria ) - if (filteredPipelineExecutions.size() == 0) { + if (filteredPipelineExecutionIds.size() == 0) { log.info("no pipeline executions found") return finalResult } @@ -936,16 +939,16 @@ class TaskController { .build()) try { - List>> futures = new ArrayList<>(filteredPipelineExecutions.size()) - log.info("processing ${filteredPipelineExecutions.size()} pipeline executions") + List>> futures = new ArrayList<>(filteredPipelineExecutionIds.size()) + log.info("processing ${filteredPipelineExecutionIds.size()} pipeline executions") // process a chunk of the executions at a time - filteredPipelineExecutions + filteredPipelineExecutionIds .collate(this.configurationProperties.getMaxNumberOfPipelineExecutionsToProcess()) .each { List chunkedExecutions -> futures.add( executorService.submit({ - List result = executionRepository.retrievePipelineExecutionsDetailsForApplication( + List result = executionRepository.retrievePipelineExecutionDetailsForApplication( application, chunkedExecutions ) log.debug("completed execution retrieval for ${result.size()} executions") @@ -960,8 +963,10 @@ class TaskController { finalResult.addAll( future.get(this.configurationProperties.getExecutionRetrievalTimeoutSeconds(), TimeUnit.SECONDS) ) - } catch (TimeoutException | CancellationException | InterruptedException e) { - log.warn("Task failed with unexpected error", e) + } catch (Exception e) { + // no need to fail the entire thing if one thread fails. This means the final output will simply not + // contain any of these failed executions. + log.error("Task failed with error", e) } } return finalResult @@ -970,7 +975,7 @@ class TaskController { try { executorService.shutdownNow() } catch (Exception e) { - log.warn("shutting down the executor service failed", e) + log.error("shutting down the executor service failed", e) } } } diff --git a/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java index 4741dcb215..a2ff97691a 100644 --- a/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java +++ b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java @@ -39,19 +39,19 @@ public class TaskControllerConfigurationProperties { * process the queries to retrieve the executions. Needs to be tuned appropriately since this has * the potential to exhaust the connection pool size for the database. */ - int maxExecutionRetrievalThreads = 20; + int maxExecutionRetrievalThreads = 10; /** * only applicable if optimizeExecutionRetrieval = true. It specifies how many pipeline executions - * should be processed at a time. 30 pipeline executions was selected as the default after testing - * this number against an orca sql db that contained lots of pipelines and executions for a single - * application (about 1200 pipelines and 1000 executions). Each execution was 1 MB or more in - * size. + * should be processed at a time. 150 pipeline executions was selected as the default after + * testing this number against an orca sql db that contained lots of pipelines and executions for + * a single application (about 1200 pipelines and 1500 executions). Each execution was 1 MB or + * more in size. * - *

It can be further tuned, depending on your setup, since 30 executions work well for some + *

It can be further tuned, depending on your setup, since 150 executions work well for some * applications but a higher number may be appropriate for others. */ - int maxNumberOfPipelineExecutionsToProcess = 30; + int maxNumberOfPipelineExecutionsToProcess = 150; /** * only applicable if optimizeExecutionRetrieval = true. No retrieval thread should take more than