Skip to content

Commit

Permalink
feat(sql): use a connection pool named "read" for some read operation…
Browse files Browse the repository at this point in the history
…s in SqlExecutionRepository (#4803)

* test(sql): demonstrate that SqlConfiguration makes different types of DataSource bean available
depending on configuration

* feat(sql): use a connection pool named "read" for read operations in SqlExecutionRepository if configured to do so.

WIP: so far this is only configuration, nothing actually uses the read pool

* refactor(sql): reorganize correlation id cleanup logic in SqlExecutionRepository

to pave the way to use the read pool for selecting by correlation id

* perf(sql): no need for a transaction to delete individual rows

- correlation ids for completed pipelines/orchestrations
- a stage

* perf(sql): use the read pool for read-only database queries in SqlExecutionRepository

* refactor(sql): remove forUpdate argument from SqlExecutionRepository.selectExecution

since it's not used.

---------

Co-authored-by: David Byron <[email protected]>
  • Loading branch information
kirangodishala and dbyron-sf authored Jan 3, 2025
1 parent d739295 commit 386da27
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ import java.util.Optional
import org.jooq.DSLContext
import org.junit.jupiter.api.extension.ExtendWith
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.test.context.junit.jupiter.SpringExtension
import javax.sql.DataSource

@Configuration
class SqlTestConfig {
Expand Down Expand Up @@ -121,7 +123,8 @@ class SqlTestConfig {
registry: Registry,
properties: SqlProperties,
orcaSqlProperties: OrcaSqlProperties,
compressionProperties: ExecutionCompressionProperties
compressionProperties: ExecutionCompressionProperties,
dataSource: DataSource
) = SqlExecutionRepository(
orcaSqlProperties.partitionName,
dsl,
Expand All @@ -131,7 +134,8 @@ class SqlTestConfig {
orcaSqlProperties.stageReadSize,
interlink = null,
compressionProperties = compressionProperties,
pipelineRefEnabled = false
pipelineRefEnabled = false,
dataSource = dataSource
)

@Bean
Expand Down Expand Up @@ -192,4 +196,7 @@ class SqlTestConfig {
"spring.application.name=orcaTest"
]
)
class SqlQueueIntegrationTest : QueueIntegrationTest()
class SqlQueueIntegrationTest : QueueIntegrationTest() {
@MockBean
var dataSource: DataSource? = null
}
2 changes: 2 additions & 0 deletions orca-sql/orca-sql.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ dependencies {
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("dev.minutest:minutest")
testImplementation("com.nhaarman:mockito-kotlin")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.testcontainers:mysql")
testImplementation("org.testcontainers:postgresql")

testRuntimeOnly("com.mysql:mysql-connector-j")
testRuntimeOnly("org.postgresql:postgresql")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
testRuntimeOnly(project(":keiko-sql")) // so SpringLiquibaseProxy has changelog-keiko.yml
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ import com.netflix.spinnaker.orca.sql.SqlHealthcheckActivator
import com.netflix.spinnaker.orca.sql.pipeline.persistence.ExecutionStatisticsRepository
import com.netflix.spinnaker.orca.sql.pipeline.persistence.SqlExecutionRepository
import com.netflix.spinnaker.orca.sql.telemetry.SqlActiveExecutionsMonitor
import java.time.Clock
import java.util.Optional
import javax.sql.DataSource
import liquibase.integration.spring.SpringLiquibase
import net.javacrumbs.shedlock.core.LockProvider
import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider
Expand All @@ -49,10 +52,11 @@ import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.*
import java.time.Clock
import java.util.*
import javax.sql.DataSource
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
import org.springframework.context.annotation.Primary

@Configuration
@ConditionalOnProperty("sql.enabled")
Expand All @@ -78,7 +82,8 @@ class SqlConfiguration {
interlink: Optional<Interlink>,
executionRepositoryListeners: Collection<ExecutionRepositoryListener>,
compressionProperties: ExecutionCompressionProperties,
pipelineRefProperties: PipelineRefProperties
pipelineRefProperties: PipelineRefProperties,
dataSource: DataSource
) =
SqlExecutionRepository(
orcaSqlProperties.partitionName,
Expand All @@ -90,7 +95,8 @@ class SqlConfiguration {
interlink = interlink.orElse(null),
executionRepositoryListeners = executionRepositoryListeners,
compressionProperties = compressionProperties,
pipelineRefEnabled = pipelineRefProperties.enabled
pipelineRefEnabled = pipelineRefProperties.enabled,
dataSource = dataSource
).let {
InstrumentedProxy.proxy(registry, it, "sql.executions", mapOf(Pair("repository", "primary"))) as ExecutionRepository
}
Expand All @@ -105,7 +111,8 @@ class SqlConfiguration {
orcaSqlProperties: OrcaSqlProperties,
@Value("\${execution-repository.sql.secondary.pool-name}") poolName: String,
compressionProperties: ExecutionCompressionProperties,
pipelineRefProperties: PipelineRefProperties
pipelineRefProperties: PipelineRefProperties,
dataSource: DataSource
) =
SqlExecutionRepository(
orcaSqlProperties.partitionName,
Expand All @@ -116,7 +123,8 @@ class SqlConfiguration {
orcaSqlProperties.stageReadSize,
poolName,
compressionProperties = compressionProperties,
pipelineRefEnabled = pipelineRefProperties.enabled
pipelineRefEnabled = pipelineRefProperties.enabled,
dataSource = dataSource
).let {
InstrumentedProxy.proxy(registry, it, "sql.executions", mapOf(Pair("repository", "secondary"))) as ExecutionRepository
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,14 @@ import org.jooq.impl.DSL.table
import org.jooq.impl.DSL.timestampSub
import org.jooq.impl.DSL.value
import org.slf4j.LoggerFactory
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource
import rx.Observable
import java.io.ByteArrayOutputStream
import java.lang.System.currentTimeMillis
import java.nio.charset.StandardCharsets
import java.security.SecureRandom
import java.util.stream.Collectors.toList
import javax.sql.DataSource
import kotlin.collections.Collection
import kotlin.collections.Iterable
import kotlin.collections.Iterator
Expand Down Expand Up @@ -120,10 +122,12 @@ class SqlExecutionRepository(
private val batchReadSize: Int = 10,
private val stageReadSize: Int = 200,
private val poolName: String = "default",
internal var readPoolName: String = "read", /* internal for testing */
private val interlink: Interlink? = null,
private val executionRepositoryListeners: Collection<ExecutionRepositoryListener> = emptyList(),
private val compressionProperties: ExecutionCompressionProperties,
private val pipelineRefEnabled: Boolean
private val pipelineRefEnabled: Boolean,
private val dataSource: DataSource
) : ExecutionRepository, ExecutionStatisticsRepository {
companion object {
val ulid = SpinULID(SecureRandom())
Expand All @@ -133,7 +137,13 @@ class SqlExecutionRepository(
private val log = LoggerFactory.getLogger(javaClass)

init {
log.info("Creating SqlExecutionRepository with partition=$partitionName and pool=$poolName")
// If there's no read pool configured, fall back to the default pool
if ((dataSource !is AbstractRoutingDataSource)
|| (dataSource.resolvedDataSources[readPoolName] == null)) {
readPoolName = poolName
}

log.info("Creating SqlExecutionRepository with partition=$partitionName, pool=$poolName, readPool=$readPoolName")

try {
withPool(poolName) {
Expand Down Expand Up @@ -182,10 +192,8 @@ class SqlExecutionRepository(
validateHandledPartitionOrThrow(execution)

withPool(poolName) {
jooq.transactional {
it.delete(execution.type.stagesTableName)
.where(stageId.toWhereCondition()).execute()
}
jooq.delete(execution.type.stagesTableName)
.where(stageId.toWhereCondition()).execute()
}
}

Expand Down Expand Up @@ -277,7 +285,7 @@ class SqlExecutionRepository(
}

override fun isCanceled(type: ExecutionType, id: String): Boolean {
withPool(poolName) {
withPool(readPoolName) {
return jooq.fetchExists(
jooq.selectFrom(type.tableName)
.where(id.toWhereCondition())
Expand Down Expand Up @@ -412,7 +420,7 @@ class SqlExecutionRepository(
}

private fun retrieve(type: ExecutionType, criteria: ExecutionCriteria, partition: String?): Observable<PipelineExecution> {
withPool(poolName) {
withPool(readPoolName) {
val select = jooq.selectExecutions(
type,
fields = selectExecutionFields(compressionProperties) + field("status"),
Expand Down Expand Up @@ -441,7 +449,7 @@ class SqlExecutionRepository(
}

override fun retrievePipelinesForApplication(application: String): Observable<PipelineExecution> =
withPool(poolName) {
withPool(readPoolName) {
Observable.from(
fetchExecutions { pageSize, cursor ->
selectExecutions(PIPELINE, pageSize, cursor) {
Expand Down Expand Up @@ -581,7 +589,7 @@ class SqlExecutionRepository(
// When not filtering by status, provide an index hint to ensure use of `pipeline_config_id_idx` which
// fully satisfies the where clause and order by. Without, some lookups by config_id matching thousands
// of executions triggered costly full table scans.
withPool(poolName) {
withPool(readPoolName) {
val select = if (criteria.statuses.isEmpty() || criteria.statuses.size == ExecutionStatus.values().size) {
jooq.selectExecutions(
PIPELINE,
Expand Down Expand Up @@ -625,7 +633,7 @@ class SqlExecutionRepository(
criteria: ExecutionCriteria,
sorter: ExecutionComparator?
): MutableList<PipelineExecution> {
withPool(poolName) {
withPool(readPoolName) {
return jooq.selectExecutions(
ORCHESTRATION,
conditions = {
Expand Down Expand Up @@ -677,17 +685,23 @@ class SqlExecutionRepository(
)
.fetchExecution()

if (execution != null) {
if (!execution.status.isComplete) {
return execution
}
jooq.transactional {
it.deleteFrom(table("correlation_ids")).where(field("id").eq(correlationId)).execute()
}
if (execution == null) {
throw ExecutionNotFoundException("No Orchestration found for correlation ID $correlationId")
}

throw ExecutionNotFoundException("No Orchestration found for correlation ID $correlationId")
if (!execution.status.isComplete) {
return execution
}
}

// If we get here, there's an execution with the given correlation id, but
// it's complete, so clean up the correlation_ids table.
withPool(poolName) {
jooq.deleteFrom(table("correlation_ids")).where(field("id").eq(correlationId)).execute()
}

// Treat a completed execution similar to not finding one at all.
throw ExecutionNotFoundException("Complete Orchestration found for correlation ID $correlationId")
}

override fun retrievePipelineForCorrelationId(correlationId: String): PipelineExecution {
Expand All @@ -705,17 +719,22 @@ class SqlExecutionRepository(
)
.fetchExecution()

if (execution != null) {
if (!execution.status.isComplete) {
return execution
}
jooq.transactional {
it.deleteFrom(table("correlation_ids")).where(field("id").eq(correlationId)).execute()
}
if (execution == null) {
throw ExecutionNotFoundException("No Pipeline found for correlation ID $correlationId")
}

throw ExecutionNotFoundException("No Pipeline found for correlation ID $correlationId")
if (!execution.status.isComplete) {
return execution
}
}

// If we get here, there's an execution with the given correlation id, but
// it's complete, so clean up the correlation_ids table.
withPool(poolName) {
jooq.deleteFrom(table("correlation_ids")).where(field("id").eq(correlationId)).execute()
}

throw ExecutionNotFoundException("Complete Pipeline found for correlation ID $correlationId")
}

override fun retrieveBufferedExecutions(): MutableList<PipelineExecution> =
Expand All @@ -730,7 +749,7 @@ class SqlExecutionRepository(
}

override fun retrieveAllApplicationNames(type: ExecutionType?): List<String> {
withPool(poolName) {
withPool(readPoolName) {
return if (type == null) {
jooq.select(field("application"))
.from(PIPELINE.tableName)
Expand All @@ -753,7 +772,7 @@ class SqlExecutionRepository(
}

override fun retrieveAllApplicationNames(type: ExecutionType?, minExecutions: Int): List<String> {
withPool(poolName) {
withPool(readPoolName) {
return if (type == null) {
jooq.select(field("application"))
.from(PIPELINE.tableName)
Expand All @@ -779,7 +798,7 @@ class SqlExecutionRepository(
}

override fun countActiveExecutions(): ActiveExecutionsReport {
withPool(poolName) {
withPool(readPoolName) {
val partitionPredicate = if (partitionName != null) field(name("partition")).eq(partitionName) else value(1).eq(value(1))

val orchestrationsQuery = jooq.selectCount()
Expand Down Expand Up @@ -808,7 +827,7 @@ class SqlExecutionRepository(
buildTimeEndBoundary: Long,
executionCriteria: ExecutionCriteria
): List<PipelineExecution> {
withPool(poolName) {
withPool(readPoolName) {
val select = jooq.select(selectExecutionFields(compressionProperties))
.from(PIPELINE.tableName)
.join(
Expand Down Expand Up @@ -888,7 +907,7 @@ class SqlExecutionRepository(
}

override fun hasExecution(type: ExecutionType, id: String): Boolean {
withPool(poolName) {
withPool(readPoolName) {
return jooq.selectCount()
.from(type.tableName)
.where(id.toWhereCondition())
Expand All @@ -897,7 +916,7 @@ class SqlExecutionRepository(
}

override fun retrieveAllExecutionIds(type: ExecutionType): MutableList<String> {
withPool(poolName) {
withPool(readPoolName) {
return jooq.select(field("id")).from(type.tableName).fetch("id", String::class.java)
}
}
Expand All @@ -917,7 +936,7 @@ class SqlExecutionRepository(
): Pair<String, String?> {
if (isULID(id)) return Pair(id, null)

withPool(poolName) {
withPool(readPoolName) {
val ts = (timestamp ?: System.currentTimeMillis())
val row = ctx.select(field("id"))
.from(table)
Expand Down Expand Up @@ -1256,14 +1275,10 @@ class SqlExecutionRepository(
private fun selectExecution(
ctx: DSLContext,
type: ExecutionType,
id: String,
forUpdate: Boolean = false
id: String
): PipelineExecution? {
withPool(poolName) {
val select = ctx.selectExecution(type, compressionProperties).where(id.toWhereCondition())
if (forUpdate) {
select.forUpdate()
}
return select.fetchExecution()
}
}
Expand All @@ -1274,7 +1289,7 @@ class SqlExecutionRepository(
cursor: String?,
where: ((SelectJoinStep<Record>) -> SelectConditionStep<Record>)? = null
): Collection<PipelineExecution> {
withPool(poolName) {
withPool(readPoolName) {
val select = jooq.selectExecutions(
type,
conditions = {
Expand Down
Loading

0 comments on commit 386da27

Please sign in to comment.