Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sql): Optimize executions #4804

Merged
merged 8 commits into from
Dec 19, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.springframework.context.ApplicationContext
import org.springframework.context.annotation.Primary
import org.springframework.stereotype.Component
import rx.Observable
import javax.annotation.Nonnull

/**
* Intended for performing red/black Orca deployments which do not share the
Expand Down Expand Up @@ -192,6 +193,13 @@ class DualExecutionRepository(
).distinct { it.id }
}

override fun retrievePipelineConfigIdsForApplication(application: String): List<String> {
return (
primary.retrievePipelineConfigIdsForApplication(application) +
previous.retrievePipelineConfigIdsForApplication(application)
).distinct()
}

override fun retrievePipelinesForPipelineConfigId(
pipelineConfigId: String,
criteria: ExecutionCriteria
Expand All @@ -202,6 +210,26 @@ class DualExecutionRepository(
).distinct { it.id }
}

override fun retrieveAndFilterPipelineExecutionIdsForApplication(
@Nonnull application: String,
@Nonnull pipelineConfigIds: List<String>,
@Nonnull criteria: ExecutionCriteria
): List<String> {
return primary.retrieveAndFilterPipelineExecutionIdsForApplication(application, pipelineConfigIds, criteria) +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MINOR: BUT ... this ordering DOES matter on the selection criteria and don't think we have a test for this (aka primary MUST come first or it'd select previous data potentially). Just an observation that this is missing a test .. but not sure how "critical" that test really is.

previous.retrieveAndFilterPipelineExecutionIdsForApplication(application, pipelineConfigIds, criteria)
}

override fun retrievePipelineExecutionDetailsForApplication(
@Nonnull application: String,
pipelineConfigIds: List<String>,
queryTimeoutSeconds: Int
): Collection<PipelineExecution> {
return (
primary.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds, queryTimeoutSeconds) +
previous.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds, queryTimeoutSeconds)
).distinctBy { it.id }
}

override fun retrievePipelinesForPipelineConfigIdsBetweenBuildTimeBoundary(
pipelineConfigIds: MutableList<String>,
buildTimeStartBoundary: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,21 @@ Observable<PipelineExecution> retrieve(
Observable<PipelineExecution> retrievePipelinesForPipelineConfigId(
@Nonnull String pipelineConfigId, @Nonnull ExecutionCriteria criteria);

@Nonnull
Collection<String> retrievePipelineConfigIdsForApplication(@Nonnull String application);

@Nonnull
Collection<String> retrieveAndFilterPipelineExecutionIdsForApplication(
@Nonnull String application,
@Nonnull List<String> pipelineConfigIds,
@Nonnull ExecutionCriteria criteria);

@Nonnull
Collection<PipelineExecution> retrievePipelineExecutionDetailsForApplication(
@Nonnull String application,
@Nonnull List<String> pipelineConfigIds,
int queryTimeoutSeconds);

/**
* Returns executions in the time boundary. Redis impl does not respect pageSize or offset params,
* and returns all executions. Sql impl respects these params.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionComparator
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionCriteria
import rx.Observable
import java.lang.System.currentTimeMillis
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import rx.Observable
import javax.annotation.Nonnull

class InMemoryExecutionRepository : ExecutionRepository {

Expand Down Expand Up @@ -276,6 +277,34 @@ class InMemoryExecutionRepository : ExecutionRepository {
)
}

override fun retrievePipelineConfigIdsForApplication(application: String): List<String> {
return pipelines.values
.filter { it.application == application }
.map { it.pipelineConfigId }
.distinct()
}

override fun retrieveAndFilterPipelineExecutionIdsForApplication(
@Nonnull application: String,
@Nonnull pipelineConfigIds: List<String>,
@Nonnull criteria: ExecutionCriteria
): List<String> {
return pipelines.values
.filter { it.application == application && pipelineConfigIds.contains(it.pipelineConfigId) }
.applyCriteria(criteria)
.map { it.id }
}

override fun retrievePipelineExecutionDetailsForApplication(
application: String,
pipelineConfigIds: List<String>,
queryTimeoutSeconds: Int
): Collection<PipelineExecution> {
return pipelines.values
.filter { it.application == application && pipelineConfigIds.contains(it.pipelineConfigId) }
.distinctBy { it.id }
}

override fun retrieveOrchestrationForCorrelationId(correlationId: String): PipelineExecution {
return retrieveByCorrelationId(ORCHESTRATION, correlationId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,37 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List<String> idsToDelet
return currentObservable;
}

@Override
public @Nonnull List<String> retrievePipelineConfigIdsForApplication(
@Nonnull String application) {
// TODO: not implemented yet - this method, at present, is primarily meant for the
// SqlExecutionRepository
// implementation.
return List.of();
}

@Override
public @Nonnull List<String> retrieveAndFilterPipelineExecutionIdsForApplication(
@Nonnull String application,
@Nonnull List<String> pipelineConfigIds,
@Nonnull ExecutionCriteria criteria) {
// TODO: not implemented yet - this method, at present, is primarily meant for the
// SqlExecutionRepository
// implementation.
return List.of();
}

@Override
public @Nonnull List<PipelineExecution> retrievePipelineExecutionDetailsForApplication(
@Nonnull String application,
@Nonnull List<String> pipelineExecutionIds,
int queryTimeoutSeconds) {
// TODO: not implemented yet - this method, at present, is primarily meant for the
// SqlExecutionRepository
// implementation.
return List.of();
}

/*
* There is no guarantee that the returned results will be sorted.
* @param limit and the param @offset are only implemented in SqlExecutionRepository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.Execu
import com.netflix.spinnaker.orca.pipeline.persistence.UnpausablePipelineException
import com.netflix.spinnaker.orca.pipeline.persistence.UnresumablePipelineException
import de.huxhorn.sulky.ulid.SpinULID
import java.lang.System.currentTimeMillis
import java.security.SecureRandom
import java.time.Duration
import org.jooq.DSLContext
import org.jooq.DatePart
Expand All @@ -80,7 +78,33 @@ import org.jooq.impl.DSL.value
import org.slf4j.LoggerFactory
import rx.Observable
import java.io.ByteArrayOutputStream
import java.lang.System.currentTimeMillis
import java.nio.charset.StandardCharsets
import java.security.SecureRandom
import java.util.stream.Collectors.toList
import kotlin.collections.Collection
import kotlin.collections.Iterable
import kotlin.collections.Iterator
import kotlin.collections.List
import kotlin.collections.Map
import kotlin.collections.MutableList
import kotlin.collections.chunked
import kotlin.collections.distinct
import kotlin.collections.firstOrNull
import kotlin.collections.forEach
import kotlin.collections.isEmpty
import kotlin.collections.isNotEmpty
import kotlin.collections.listOf
import kotlin.collections.map
import kotlin.collections.mapOf
import kotlin.collections.mutableListOf
import kotlin.collections.mutableMapOf
import kotlin.collections.plus
import kotlin.collections.set
import kotlin.collections.toList
import kotlin.collections.toMutableList
import kotlin.collections.toMutableMap
import kotlin.collections.toTypedArray

/**
* A generic SQL [ExecutionRepository].
Expand Down Expand Up @@ -427,6 +451,129 @@ class SqlExecutionRepository(
)
}

override fun retrievePipelineConfigIdsForApplication(application: String): List<String> =
withPool(poolName) {
return jooq.selectDistinct(field("config_id"))
.from(PIPELINE.tableName)
.where(field("application").eq(application))
.fetch(0, String::class.java)
}

/**
* this function supports the following ExecutionCriteria currently:
* 'limit', a.k.a page size and
* 'statuses'.
*
* It executes the following query to determine how many pipeline executions exist that satisfy the above
* ExecutionCriteria. It then returns a list of all these execution ids.
*
* It does this by executing the following query:
* - If the execution criteria does not contain any statuses:
* SELECT config_id, id
FROM pipelines force index (`pipeline_application_idx`)
WHERE application = "myapp"
ORDER BY
config_id;
* - If the execution criteria contains statuses:
* SELECT config_id, id
FROM pipelines force index (`pipeline_application_status_starttime_idx`)
WHERE (
application = "myapp" and
status in ("status1", "status2)
)
ORDER BY
config_id;

* It then applies the limit execution criteria on the result set obtained above. We observed load issues in the db
jasonmcintosh marked this conversation as resolved.
Show resolved Hide resolved
* when running a query where the limit was calculated in the query itself. Therefore, we are moving that logic to
* the code below to ease the burden on the db in such circumstances.
*/
override fun retrieveAndFilterPipelineExecutionIdsForApplication(
application: String,
pipelineConfigIds: List<String>,
criteria: ExecutionCriteria
): List<String> {

// baseQueryPredicate for the flow where there are no statuses in the execution criteria
var baseQueryPredicate = field("application").eq(application)
.and(field("config_id").`in`(*pipelineConfigIds.toTypedArray()))

var table = if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_idx")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: DO we really want to force an index, or is another index on the combination needed? OR perhaps merging a couple of these indexes. NOT sure best solution but... we have a number of indexes with some of this data and wondering if we couldn't do this a touch cleaner. I'd EXPECT the optimizer to handle MOST of this without the need to force an index.

else PIPELINE.tableName
// baseQueryPredicate for the flow with statuses
if (criteria.statuses.isNotEmpty() && criteria.statuses.size != ExecutionStatus.values().size) {
val statusStrings = criteria.statuses.map { it.toString() }
baseQueryPredicate = baseQueryPredicate
.and(field("status").`in`(*statusStrings.toTypedArray()))

table = if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_status_starttime_idx")
else PIPELINE.tableName
}

val finalResult: MutableList<String> = mutableListOf()

withPool(poolName) {
val baseQuery = jooq.select(field("config_id"), field("id"))
.from(table)
.where(baseQueryPredicate)
.orderBy(field("config_id"))
.fetch().intoGroups("config_id", "id")

baseQuery.forEach {
val count = it.value.size
if (criteria.pageSize < count) {
finalResult.addAll(it.value
.stream()
.skip((count - criteria.pageSize).toLong())
.collect(toList()) as List<String>
)
} else {
finalResult.addAll(it.value as List<String>)
}
}
}
return finalResult
}

/**
* It executes the following query to get execution details for n executions at a time in a specific application
*
* SELECT id, body, compressed_body, compression_type, `partition`
FROM pipelines force index (`pipeline_application_idx`)
left outer join
pipelines_compressed_executions
using (`id`)
WHERE (
application = "<myapp>" and
id in ('id1', 'id2', 'id3')
);
*
* it then gets all the stage information for all the executions returned from the above query.
*/
override fun retrievePipelineExecutionDetailsForApplication(
application: String,
pipelineExecutions: List<String>,
queryTimeoutSeconds: Int
): Collection<PipelineExecution> {
withPool(poolName) {
val baseQuery = jooq.select(selectExecutionFields(compressionProperties))
.from(
if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_idx")
else PIPELINE.tableName
)
.leftOuterJoin(PIPELINE.tableName.compressedExecTable).using(field("id"))
.where(
field("application").eq(application)
.and(field("id").`in`(*pipelineExecutions.toTypedArray()))
)
.queryTimeout(queryTimeoutSeconds) // add an explicit timeout so that the query doesn't run forever
.fetch()

log.debug("getting stage information for all the executions found so far")
return ExecutionMapper(mapper, stageReadSize,compressionProperties, pipelineRefEnabled).map(baseQuery.intoResultSet(), jooq)
}
}

override fun retrievePipelinesForPipelineConfigId(
pipelineConfigId: String,
criteria: ExecutionCriteria
Expand Down
10 changes: 10 additions & 0 deletions orca-web/orca-web.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,19 @@ dependencies {
testImplementation("io.strikt:strikt-core")
testImplementation("io.mockk:mockk")
testImplementation("org.apache.groovy:groovy-json")
testImplementation("com.nhaarman:mockito-kotlin")
testImplementation("io.spinnaker.kork:kork-sql-test")
testImplementation("org.testcontainers:mysql")
testImplementation ("com.squareup.retrofit2:retrofit-mock")
}

sourceSets {
main {
java { srcDirs = [] } // no source dirs for the java compiler
groovy { srcDirs = ["src/main/java", "src/main/groovy"] } // compile everything in src/ with groovy
}
}

test {
//The Implementation-Version is set in the MANIFEST.MF for the JAR produced via testing so that
//assertions can be made against the version (see orca-plugins-test, for example).
Expand Down
Loading
Loading