Skip to content

Commit

Permalink
fix(sql): add correct indexes
Browse files Browse the repository at this point in the history
refactor function and variable names to make them more descriptive
  • Loading branch information
kirangodishala committed Nov 26, 2024
1 parent f1508c7 commit fa088e3
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,19 +210,21 @@ class DualExecutionRepository(
).distinct { it.id }
}

override fun filterPipelineExecutionsForApplication(@Nonnull application: String,
@Nonnull pipelineConfigIds: List<String>,
@Nonnull criteria: ExecutionCriteria): List<String>{
return primary.filterPipelineExecutionsForApplication(application, pipelineConfigIds, criteria) +
previous.filterPipelineExecutionsForApplication(application,pipelineConfigIds, criteria)
override fun retrieveAndFilterPipelineExecutionIdsForApplication(
@Nonnull application: String,
@Nonnull pipelineConfigIds: List<String>,
@Nonnull criteria: ExecutionCriteria
): List<String> {
return primary.retrieveAndFilterPipelineExecutionIdsForApplication(application, pipelineConfigIds, criteria) +
previous.retrieveAndFilterPipelineExecutionIdsForApplication(application, pipelineConfigIds, criteria)
}

override fun retrievePipelineExecutionsDetailsForApplication(
override fun retrievePipelineExecutionDetailsForApplication(
@Nonnull application: String,
pipelineConfigIds: List<String>): Collection<PipelineExecution> {
return (
primary.retrievePipelineExecutionsDetailsForApplication(application, pipelineConfigIds) +
previous.retrievePipelineExecutionsDetailsForApplication(application,pipelineConfigIds)
primary.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds) +
previous.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds)
).distinctBy { it.id }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ Observable<PipelineExecution> retrievePipelinesForPipelineConfigId(
Collection<String> retrievePipelineConfigIdsForApplication(@Nonnull String application);

@Nonnull
Collection<String> filterPipelineExecutionsForApplication(
Collection<String> retrieveAndFilterPipelineExecutionIdsForApplication(
@Nonnull String application,
@Nonnull List<String> pipelineConfigIds,
@Nonnull ExecutionCriteria criteria);

@Nonnull
Collection<PipelineExecution> retrievePipelineExecutionsDetailsForApplication(
Collection<PipelineExecution> retrievePipelineExecutionDetailsForApplication(
@Nonnull String application, @Nonnull List<String> pipelineConfigIds);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,29 +279,30 @@ class InMemoryExecutionRepository : ExecutionRepository {

override fun retrievePipelineConfigIdsForApplication(application: String): List<String> {
return pipelines.values
.filter { it.application == application }
.filter { it.application == application }
.map { it.pipelineConfigId }
.distinct()
}

override fun filterPipelineExecutionsForApplication(@Nonnull application: String,
@Nonnull pipelineConfigIds: List<String>,
@Nonnull criteria: ExecutionCriteria): List<String> {
override fun retrieveAndFilterPipelineExecutionIdsForApplication(
@Nonnull application: String,
@Nonnull pipelineConfigIds: List<String>,
@Nonnull criteria: ExecutionCriteria
): List<String> {
return pipelines.values
.filter { it.application == application && pipelineConfigIds.contains(it.pipelineConfigId) }
.filter { it.application == application && pipelineConfigIds.contains(it.pipelineConfigId) }
.applyCriteria(criteria)
.map { it.id }
}

override fun retrievePipelineExecutionsDetailsForApplication(
override fun retrievePipelineExecutionDetailsForApplication(
application: String,
pipelineConfigIds: List<String>): Collection<PipelineExecution> {
return pipelines.values
.filter { it.application == application && pipelineConfigIds.contains(it.pipelineConfigId) }
.distinctBy { it.id }
}


override fun retrieveOrchestrationForCorrelationId(correlationId: String): PipelineExecution {
return retrieveByCorrelationId(ORCHESTRATION, correlationId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import javax.annotation.Nonnull;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -490,7 +489,7 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List<String> idsToDelet
}

@Override
public @Nonnull List<String> filterPipelineExecutionsForApplication(
public @Nonnull List<String> retrieveAndFilterPipelineExecutionIdsForApplication(
@Nonnull String application,
@Nonnull List<String> pipelineConfigIds,
@Nonnull ExecutionCriteria criteria) {
Expand All @@ -501,8 +500,8 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List<String> idsToDelet
}

@Override
public @NotNull List<PipelineExecution> retrievePipelineExecutionsDetailsForApplication(
@Nonnull String application, @NotNull List<String> pipelineExecutionIds) {
public @Nonnull List<PipelineExecution> retrievePipelineExecutionDetailsForApplication(
@Nonnull String application, @Nonnull List<String> pipelineExecutionIds) {
// TODO: not implemented yet - this method, at present, is primarily meant for the
// SqlExecutionRepository
// implementation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,14 +453,14 @@ class SqlExecutionRepository(

override fun retrievePipelineConfigIdsForApplication(application: String): List<String> =
withPool(poolName) {
return jooq.selectDistinct(field("config_id"))
.from(PIPELINE.tableName) // not adding index here as it slowed down the query
.where(field("application").eq(application))
.fetch(0, String::class.java)
return jooq.selectDistinct(field("config_id"))
.from(PIPELINE.tableName)
.where(field("application").eq(application))
.fetch(0, String::class.java)
}

/**
* this function supports the following ExecutionCriteria currently:
* this function supports the following ExecutionCriteria currently:
* 'limit', a.k.a page size and
* 'statuses'.
*
Expand All @@ -470,13 +470,13 @@ class SqlExecutionRepository(
* It does this by executing the following query:
* - If the execution criteria does not contain any statuses:
* SELECT config_id, id
FROM pipelines force index (`pipeline_config_id_idx`)
FROM pipelines force index (`pipeline_application_idx`)
WHERE application = "myapp"
ORDER BY
config_id;
* - If the execution criteria contains statuses:
* SELECT config_id, id
FROM pipelines force index (`pipeline_config_id_idx`)
FROM pipelines force index (`pipeline_application_status_starttime_idx`)
WHERE (
application = "myapp" and
status in ("status1", "status2)
Expand All @@ -485,33 +485,37 @@ class SqlExecutionRepository(
config_id;
* It then applies the limit execution criteria on the result set obtained above. We observed load issues in the db
* when running a query where the limit was calculated in the query itself . Thereforce, we are moving that logic to
* when running a query where the limit was calculated in the query itself. Therefore, we are moving that logic to
* the code below to ease the burden on the db in such circumstances.
*/
override fun filterPipelineExecutionsForApplication(application: String,
pipelineConfigIds: List<String>,
criteria: ExecutionCriteria): List<String> {
override fun retrieveAndFilterPipelineExecutionIdsForApplication(
application: String,
pipelineConfigIds: List<String>,
criteria: ExecutionCriteria
): List<String> {

// baseQueryPredicate for the flow where there are no statuses in the execution criteria
var baseQueryPredicate = field("application").eq(application)
.and(field("config_id").`in`(*pipelineConfigIds.toTypedArray()))

var table = if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_idx")
else PIPELINE.tableName
// baseQueryPredicate for the flow with statuses
if (criteria.statuses.isNotEmpty() && criteria.statuses.size != ExecutionStatus.values().size) {
val statusStrings = criteria.statuses.map { it.toString() }
baseQueryPredicate = baseQueryPredicate
.and(field("status").`in`(*statusStrings.toTypedArray()))

table = if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_status_starttime_idx")
else PIPELINE.tableName
}

val finalResult: MutableList<String> = mutableListOf()

log.info("getting execution ids")
withPool(poolName) {
val baseQuery = jooq.select(field("config_id"), field("id"))
.from(
if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_config_id_idx")
else PIPELINE.tableName
)
val baseQuery = jooq.select(field("config_id"), field("id"))
.from(table)
.where(baseQueryPredicate)
.orderBy(field("config_id"))
.fetch().intoGroups("config_id", "id")
Expand All @@ -536,7 +540,7 @@ class SqlExecutionRepository(
* It executes the following query to get execution details for n executions at a time in a specific application
*
* SELECT id, body, compressed_body, compression_type, `partition`
FROM pipelines
FROM pipelines force index (`pipeline_application_idx`)
left outer join
pipelines_compressed_executions
using (`id`)
Expand All @@ -545,14 +549,17 @@ class SqlExecutionRepository(
id in ('id1', 'id2', 'id3')
);
*
* it then get all the stage information for all the executions returned from the above query.
* it then gets all the stage information for all the executions returned from the above query.
*/
override fun retrievePipelineExecutionsDetailsForApplication(
override fun retrievePipelineExecutionDetailsForApplication(
application: String,
pipelineExecutions: List<String>): Collection<PipelineExecution> {
withPool(poolName) {
val baseQuery = jooq.select(selectExecutionFields(compressionProperties))
.from(PIPELINE.tableName)
.from(
if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_idx")
else PIPELINE.tableName
)
.leftOuterJoin(PIPELINE.tableName.compressedExecTable).using(field("id"))
.where(
field("application").eq(application)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,16 +612,17 @@ class TaskController {
log.info("received ${pipelineConfigIds.size()} pipelines for application: $application from front50")
def strategyConfigIds = front50Service.getStrategies(application)*.id as List<String>
log.info("received ${strategyConfigIds.size()} strategies for application: $application from front50")
def allFront50Ids = pipelineConfigIds + strategyConfigIds

def allFront50PipelineConfigIds = pipelineConfigIds + strategyConfigIds

List<PipelineExecution> allPipelineExecutions = []

if (this.configurationProperties.getOptimizeExecutionRetrieval()) {
allPipelineExecutions.addAll(
optimizedGetPipelineExecutions(application, allFront50Ids, executionCriteria)
optimizedGetPipelineExecutions(application, allFront50PipelineConfigIds, executionCriteria)
)
} else {
allPipelineExecutions = rx.Observable.merge(allFront50Ids.collect {
allPipelineExecutions = rx.Observable.merge(allFront50PipelineConfigIds.collect {
log.debug("processing pipeline config id: $it")
executionRepository.retrievePipelinesForPipelineConfigId(it, executionCriteria)
}).subscribeOn(Schedulers.io()).toList().toBlocking().single()
Expand Down Expand Up @@ -897,32 +898,34 @@ class TaskController {
"${this.configurationProperties.getMaxExecutionRetrievalThreads()} threads and processing" +
" ${this.configurationProperties.getMaxNumberOfPipelineExecutionsToProcess()} pipeline executions at a time")

List<String> commonIdsInFront50AndOrca
List<String> commonPipelineConfigIdsInFront50AndOrca
try {
List<String> allOrcaIds = executionRepository.retrievePipelineConfigIdsForApplication(application)
log.info("found ${allOrcaIds.size()} pipeline config ids for application: $application in orca")
commonIdsInFront50AndOrca = front50PipelineConfigIds.intersect(allOrcaIds)
log.info("found ${commonIdsInFront50AndOrca.size()} pipeline config ids that are common in orca and front50 " +
"for application: $application." +
" Saved ${front50PipelineConfigIds.size() - commonIdsInFront50AndOrca.size()} extra pipeline config id queries")
List<String> allOrcaPipelineConfigIds = executionRepository.retrievePipelineConfigIdsForApplication(application)
log.info("found ${allOrcaPipelineConfigIds.size()} pipeline config ids for application: $application in orca")
commonPipelineConfigIdsInFront50AndOrca = front50PipelineConfigIds.intersect(allOrcaPipelineConfigIds)
log.info("found ${commonPipelineConfigIdsInFront50AndOrca.size()} pipeline config ids that are common in orca " +
"and front50 for application: $application. " +
"Saved ${front50PipelineConfigIds.size() - commonPipelineConfigIdsInFront50AndOrca.size()} extra pipeline " +
"config id queries")
} catch (Exception e) {
log.warn("retrieving pipeline config ids from orca db failed. using the result obtained from front50 ", e)
commonIdsInFront50AndOrca = front50PipelineConfigIds
commonPipelineConfigIdsInFront50AndOrca = front50PipelineConfigIds
}

if (commonIdsInFront50AndOrca.size() == 0 ) {
log.info("no pipelines found")
if (commonPipelineConfigIdsInFront50AndOrca.size() == 0 ) {
log.info("no pipeline config ids found.")
return finalResult
}

// get complete list of executions based on the execution criteria
log.info("filtering pipeline executions based on the execution criteria: " +
"limit: ${executionCriteria.getPageSize()}, statuses: ${executionCriteria.getStatuses()}")
List<String> filteredPipelineExecutions = executionRepository.filterPipelineExecutionsForApplication(application,
commonIdsInFront50AndOrca,
List<String> filteredPipelineExecutionIds = executionRepository.retrieveAndFilterPipelineExecutionIdsForApplication(
application,
commonPipelineConfigIdsInFront50AndOrca,
executionCriteria
)
if (filteredPipelineExecutions.size() == 0) {
if (filteredPipelineExecutionIds.size() == 0) {
log.info("no pipeline executions found")
return finalResult
}
Expand All @@ -936,16 +939,16 @@ class TaskController {
.build())

try {
List<Future<Collection<PipelineExecution>>> futures = new ArrayList<>(filteredPipelineExecutions.size())
log.info("processing ${filteredPipelineExecutions.size()} pipeline executions")
List<Future<Collection<PipelineExecution>>> futures = new ArrayList<>(filteredPipelineExecutionIds.size())
log.info("processing ${filteredPipelineExecutionIds.size()} pipeline executions")

// process a chunk of the executions at a time
filteredPipelineExecutions
filteredPipelineExecutionIds
.collate(this.configurationProperties.getMaxNumberOfPipelineExecutionsToProcess())
.each { List<String> chunkedExecutions ->
futures.add(
executorService.submit({
List<PipelineExecution> result = executionRepository.retrievePipelineExecutionsDetailsForApplication(
List<PipelineExecution> result = executionRepository.retrievePipelineExecutionDetailsForApplication(
application, chunkedExecutions
)
log.debug("completed execution retrieval for ${result.size()} executions")
Expand All @@ -960,8 +963,10 @@ class TaskController {
finalResult.addAll(
future.get(this.configurationProperties.getExecutionRetrievalTimeoutSeconds(), TimeUnit.SECONDS)
)
} catch (TimeoutException | CancellationException | InterruptedException e) {
log.warn("Task failed with unexpected error", e)
} catch (Exception e) {
// no need to fail the entire thing if one thread fails. This means the final output will simply not
// contain any of these failed executions.
log.error("Task failed with error", e)
}
}
return finalResult
Expand All @@ -970,7 +975,7 @@ class TaskController {
try {
executorService.shutdownNow()
} catch (Exception e) {
log.warn("shutting down the executor service failed", e)
log.error("shutting down the executor service failed", e)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@ public class TaskControllerConfigurationProperties {
* process the queries to retrieve the executions. Needs to be tuned appropriately since this has
* the potential to exhaust the connection pool size for the database.
*/
int maxExecutionRetrievalThreads = 20;
int maxExecutionRetrievalThreads = 10;

/**
* only applicable if optimizeExecutionRetrieval = true. It specifies how many pipeline executions
* should be processed at a time. 30 pipeline executions was selected as the default after testing
* this number against an orca sql db that contained lots of pipelines and executions for a single
* application (about 1200 pipelines and 1000 executions). Each execution was 1 MB or more in
* size.
* should be processed at a time. 150 pipeline executions was selected as the default after
* testing this number against an orca sql db that contained lots of pipelines and executions for
* a single application (about 1200 pipelines and 1500 executions). Each execution was 1 MB or
* more in size.
*
* <p>It can be further tuned, depending on your setup, since 30 executions work well for some
* <p>It can be further tuned, depending on your setup, since 150 executions work well for some
* applications but a higher number may be appropriate for others.
*/
int maxNumberOfPipelineExecutionsToProcess = 30;
int maxNumberOfPipelineExecutionsToProcess = 150;

/**
* only applicable if optimizeExecutionRetrieval = true. No retrieval thread should take more than
Expand Down

0 comments on commit fa088e3

Please sign in to comment.