diff --git a/orca-queue-sql/src/test/kotlin/com/netflix/spinnaker/orca/q/sql/SqlQueueIntegrationTest.kt b/orca-queue-sql/src/test/kotlin/com/netflix/spinnaker/orca/q/sql/SqlQueueIntegrationTest.kt index 4b8c26ad45..96e84a66a7 100644 --- a/orca-queue-sql/src/test/kotlin/com/netflix/spinnaker/orca/q/sql/SqlQueueIntegrationTest.kt +++ b/orca-queue-sql/src/test/kotlin/com/netflix/spinnaker/orca/q/sql/SqlQueueIntegrationTest.kt @@ -55,9 +55,11 @@ import java.util.Optional import org.jooq.DSLContext import org.junit.jupiter.api.extension.ExtendWith import org.springframework.boot.test.context.SpringBootTest +import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.test.context.junit.jupiter.SpringExtension +import javax.sql.DataSource @Configuration class SqlTestConfig { @@ -121,7 +123,8 @@ class SqlTestConfig { registry: Registry, properties: SqlProperties, orcaSqlProperties: OrcaSqlProperties, - compressionProperties: ExecutionCompressionProperties + compressionProperties: ExecutionCompressionProperties, + dataSource: DataSource ) = SqlExecutionRepository( orcaSqlProperties.partitionName, dsl, @@ -131,7 +134,8 @@ class SqlTestConfig { orcaSqlProperties.stageReadSize, interlink = null, compressionProperties = compressionProperties, - pipelineRefEnabled = false + pipelineRefEnabled = false, + dataSource = dataSource ) @Bean @@ -192,4 +196,7 @@ class SqlTestConfig { "spring.application.name=orcaTest" ] ) -class SqlQueueIntegrationTest : QueueIntegrationTest() +class SqlQueueIntegrationTest : QueueIntegrationTest() { + @MockBean + var dataSource: DataSource? = null +} diff --git a/orca-sql/orca-sql.gradle b/orca-sql/orca-sql.gradle index 6938176bbe..2f0ca54757 100644 --- a/orca-sql/orca-sql.gradle +++ b/orca-sql/orca-sql.gradle @@ -50,12 +50,14 @@ dependencies { testImplementation("org.springframework.boot:spring-boot-starter-test") testImplementation("dev.minutest:minutest") testImplementation("com.nhaarman:mockito-kotlin") + testImplementation("org.springframework.boot:spring-boot-starter-test") testImplementation("org.testcontainers:mysql") testImplementation("org.testcontainers:postgresql") testRuntimeOnly("com.mysql:mysql-connector-j") testRuntimeOnly("org.postgresql:postgresql") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine") + testRuntimeOnly(project(":keiko-sql")) // so SpringLiquibaseProxy has changelog-keiko.yml } test { diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt index 21bc326748..941c71ea32 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt @@ -38,6 +38,9 @@ import com.netflix.spinnaker.orca.sql.SqlHealthcheckActivator import com.netflix.spinnaker.orca.sql.pipeline.persistence.ExecutionStatisticsRepository import com.netflix.spinnaker.orca.sql.pipeline.persistence.SqlExecutionRepository import com.netflix.spinnaker.orca.sql.telemetry.SqlActiveExecutionsMonitor +import java.time.Clock +import java.util.Optional +import javax.sql.DataSource import liquibase.integration.spring.SpringLiquibase import net.javacrumbs.shedlock.core.LockProvider import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider @@ -49,10 +52,11 @@ import org.springframework.beans.factory.annotation.Value import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties -import org.springframework.context.annotation.* -import java.time.Clock -import java.util.* -import javax.sql.DataSource +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.Configuration +import org.springframework.context.annotation.Import +import org.springframework.context.annotation.Primary @Configuration @ConditionalOnProperty("sql.enabled") @@ -78,7 +82,8 @@ class SqlConfiguration { interlink: Optional, executionRepositoryListeners: Collection, compressionProperties: ExecutionCompressionProperties, - pipelineRefProperties: PipelineRefProperties + pipelineRefProperties: PipelineRefProperties, + dataSource: DataSource ) = SqlExecutionRepository( orcaSqlProperties.partitionName, @@ -90,7 +95,8 @@ class SqlConfiguration { interlink = interlink.orElse(null), executionRepositoryListeners = executionRepositoryListeners, compressionProperties = compressionProperties, - pipelineRefEnabled = pipelineRefProperties.enabled + pipelineRefEnabled = pipelineRefProperties.enabled, + dataSource = dataSource ).let { InstrumentedProxy.proxy(registry, it, "sql.executions", mapOf(Pair("repository", "primary"))) as ExecutionRepository } @@ -105,7 +111,8 @@ class SqlConfiguration { orcaSqlProperties: OrcaSqlProperties, @Value("\${execution-repository.sql.secondary.pool-name}") poolName: String, compressionProperties: ExecutionCompressionProperties, - pipelineRefProperties: PipelineRefProperties + pipelineRefProperties: PipelineRefProperties, + dataSource: DataSource ) = SqlExecutionRepository( orcaSqlProperties.partitionName, @@ -116,7 +123,8 @@ class SqlConfiguration { orcaSqlProperties.stageReadSize, poolName, compressionProperties = compressionProperties, - pipelineRefEnabled = pipelineRefProperties.enabled + pipelineRefEnabled = pipelineRefProperties.enabled, + dataSource = dataSource ).let { InstrumentedProxy.proxy(registry, it, "sql.executions", mapOf(Pair("repository", "secondary"))) as ExecutionRepository } diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index 4edfd3513c..b402b5f4a6 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -76,12 +76,14 @@ import org.jooq.impl.DSL.table import org.jooq.impl.DSL.timestampSub import org.jooq.impl.DSL.value import org.slf4j.LoggerFactory +import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource import rx.Observable import java.io.ByteArrayOutputStream import java.lang.System.currentTimeMillis import java.nio.charset.StandardCharsets import java.security.SecureRandom import java.util.stream.Collectors.toList +import javax.sql.DataSource import kotlin.collections.Collection import kotlin.collections.Iterable import kotlin.collections.Iterator @@ -120,10 +122,12 @@ class SqlExecutionRepository( private val batchReadSize: Int = 10, private val stageReadSize: Int = 200, private val poolName: String = "default", + internal var readPoolName: String = "read", /* internal for testing */ private val interlink: Interlink? = null, private val executionRepositoryListeners: Collection = emptyList(), private val compressionProperties: ExecutionCompressionProperties, - private val pipelineRefEnabled: Boolean + private val pipelineRefEnabled: Boolean, + private val dataSource: DataSource ) : ExecutionRepository, ExecutionStatisticsRepository { companion object { val ulid = SpinULID(SecureRandom()) @@ -133,7 +137,13 @@ class SqlExecutionRepository( private val log = LoggerFactory.getLogger(javaClass) init { - log.info("Creating SqlExecutionRepository with partition=$partitionName and pool=$poolName") + // If there's no read pool configured, fall back to the default pool + if ((dataSource !is AbstractRoutingDataSource) + || (dataSource.resolvedDataSources[readPoolName] == null)) { + readPoolName = poolName + } + + log.info("Creating SqlExecutionRepository with partition=$partitionName, pool=$poolName, readPool=$readPoolName") try { withPool(poolName) { @@ -182,10 +192,8 @@ class SqlExecutionRepository( validateHandledPartitionOrThrow(execution) withPool(poolName) { - jooq.transactional { - it.delete(execution.type.stagesTableName) - .where(stageId.toWhereCondition()).execute() - } + jooq.delete(execution.type.stagesTableName) + .where(stageId.toWhereCondition()).execute() } } @@ -277,7 +285,7 @@ class SqlExecutionRepository( } override fun isCanceled(type: ExecutionType, id: String): Boolean { - withPool(poolName) { + withPool(readPoolName) { return jooq.fetchExists( jooq.selectFrom(type.tableName) .where(id.toWhereCondition()) @@ -412,7 +420,7 @@ class SqlExecutionRepository( } private fun retrieve(type: ExecutionType, criteria: ExecutionCriteria, partition: String?): Observable { - withPool(poolName) { + withPool(readPoolName) { val select = jooq.selectExecutions( type, fields = selectExecutionFields(compressionProperties) + field("status"), @@ -441,7 +449,7 @@ class SqlExecutionRepository( } override fun retrievePipelinesForApplication(application: String): Observable = - withPool(poolName) { + withPool(readPoolName) { Observable.from( fetchExecutions { pageSize, cursor -> selectExecutions(PIPELINE, pageSize, cursor) { @@ -613,7 +621,7 @@ class SqlExecutionRepository( // When not filtering by status, provide an index hint to ensure use of `pipeline_config_id_idx` which // fully satisfies the where clause and order by. Without, some lookups by config_id matching thousands // of executions triggered costly full table scans. - withPool(poolName) { + withPool(readPoolName) { val select = if (criteria.statuses.isEmpty() || criteria.statuses.size == ExecutionStatus.values().size) { jooq.selectExecutions( PIPELINE, @@ -657,7 +665,7 @@ class SqlExecutionRepository( criteria: ExecutionCriteria, sorter: ExecutionComparator? ): MutableList { - withPool(poolName) { + withPool(readPoolName) { return jooq.selectExecutions( ORCHESTRATION, conditions = { @@ -709,17 +717,23 @@ class SqlExecutionRepository( ) .fetchExecution() - if (execution != null) { - if (!execution.status.isComplete) { - return execution - } - jooq.transactional { - it.deleteFrom(table("correlation_ids")).where(field("id").eq(correlationId)).execute() - } + if (execution == null) { + throw ExecutionNotFoundException("No Orchestration found for correlation ID $correlationId") } - throw ExecutionNotFoundException("No Orchestration found for correlation ID $correlationId") + if (!execution.status.isComplete) { + return execution + } } + + // If we get here, there's an execution with the given correlation id, but + // it's complete, so clean up the correlation_ids table. + withPool(poolName) { + jooq.deleteFrom(table("correlation_ids")).where(field("id").eq(correlationId)).execute() + } + + // Treat a completed execution similar to not finding one at all. + throw ExecutionNotFoundException("Complete Orchestration found for correlation ID $correlationId") } override fun retrievePipelineForCorrelationId(correlationId: String): PipelineExecution { @@ -737,17 +751,22 @@ class SqlExecutionRepository( ) .fetchExecution() - if (execution != null) { - if (!execution.status.isComplete) { - return execution - } - jooq.transactional { - it.deleteFrom(table("correlation_ids")).where(field("id").eq(correlationId)).execute() - } + if (execution == null) { + throw ExecutionNotFoundException("No Pipeline found for correlation ID $correlationId") } - throw ExecutionNotFoundException("No Pipeline found for correlation ID $correlationId") + if (!execution.status.isComplete) { + return execution + } } + + // If we get here, there's an execution with the given correlation id, but + // it's complete, so clean up the correlation_ids table. + withPool(poolName) { + jooq.deleteFrom(table("correlation_ids")).where(field("id").eq(correlationId)).execute() + } + + throw ExecutionNotFoundException("Complete Pipeline found for correlation ID $correlationId") } override fun retrieveBufferedExecutions(): MutableList = @@ -762,7 +781,7 @@ class SqlExecutionRepository( } override fun retrieveAllApplicationNames(type: ExecutionType?): List { - withPool(poolName) { + withPool(readPoolName) { return if (type == null) { jooq.select(field("application")) .from(PIPELINE.tableName) @@ -785,7 +804,7 @@ class SqlExecutionRepository( } override fun retrieveAllApplicationNames(type: ExecutionType?, minExecutions: Int): List { - withPool(poolName) { + withPool(readPoolName) { return if (type == null) { jooq.select(field("application")) .from(PIPELINE.tableName) @@ -811,7 +830,7 @@ class SqlExecutionRepository( } override fun countActiveExecutions(): ActiveExecutionsReport { - withPool(poolName) { + withPool(readPoolName) { val partitionPredicate = if (partitionName != null) field(name("partition")).eq(partitionName) else value(1).eq(value(1)) val orchestrationsQuery = jooq.selectCount() @@ -840,7 +859,7 @@ class SqlExecutionRepository( buildTimeEndBoundary: Long, executionCriteria: ExecutionCriteria ): List { - withPool(poolName) { + withPool(readPoolName) { val select = jooq.select(selectExecutionFields(compressionProperties)) .from(PIPELINE.tableName) .join( @@ -920,7 +939,7 @@ class SqlExecutionRepository( } override fun hasExecution(type: ExecutionType, id: String): Boolean { - withPool(poolName) { + withPool(readPoolName) { return jooq.selectCount() .from(type.tableName) .where(id.toWhereCondition()) @@ -929,7 +948,7 @@ class SqlExecutionRepository( } override fun retrieveAllExecutionIds(type: ExecutionType): MutableList { - withPool(poolName) { + withPool(readPoolName) { return jooq.select(field("id")).from(type.tableName).fetch("id", String::class.java) } } @@ -949,7 +968,7 @@ class SqlExecutionRepository( ): Pair { if (isULID(id)) return Pair(id, null) - withPool(poolName) { + withPool(readPoolName) { val ts = (timestamp ?: System.currentTimeMillis()) val row = ctx.select(field("id")) .from(table) @@ -1288,14 +1307,10 @@ class SqlExecutionRepository( private fun selectExecution( ctx: DSLContext, type: ExecutionType, - id: String, - forUpdate: Boolean = false + id: String ): PipelineExecution? { withPool(poolName) { val select = ctx.selectExecution(type, compressionProperties).where(id.toWhereCondition()) - if (forUpdate) { - select.forUpdate() - } return select.fetchExecution() } } @@ -1306,7 +1321,7 @@ class SqlExecutionRepository( cursor: String?, where: ((SelectJoinStep) -> SelectConditionStep)? = null ): Collection { - withPool(poolName) { + withPool(readPoolName) { val select = jooq.selectExecutions( type, conditions = { diff --git a/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgentSpec.groovy b/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgentSpec.groovy index 4041c9d955..2d7d90a51d 100644 --- a/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgentSpec.groovy +++ b/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgentSpec.groovy @@ -22,6 +22,7 @@ import com.netflix.spinnaker.config.OrcaSqlProperties import com.netflix.spinnaker.kork.sql.config.RetryProperties import com.netflix.spinnaker.kork.sql.test.SqlTestUtil +import javax.sql.DataSource import java.time.Clock import java.time.Instant import com.fasterxml.jackson.databind.ObjectMapper @@ -80,7 +81,19 @@ abstract class OldPipelineCleanupPollingNotificationAgentSpec extends Specificat def setupSpec() { currentDatabase = getDatabase() - executionRepository = new SqlExecutionRepository("test", currentDatabase.context, mapper, new RetryProperties(), 10, 100, "poolName", null, [], new ExecutionCompressionProperties(), false) + executionRepository = new SqlExecutionRepository("test", + currentDatabase.context, + mapper, + new RetryProperties(), + 10, + 100, + "poolName", + "readPoolName", + null, + [], + new ExecutionCompressionProperties(), + false, + Mock(DataSource)) } def cleanup() { diff --git a/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/TopApplicationExecutionCleanupPollingNotificationAgentSpec.groovy b/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/TopApplicationExecutionCleanupPollingNotificationAgentSpec.groovy index 39235c5720..841af0f84e 100644 --- a/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/TopApplicationExecutionCleanupPollingNotificationAgentSpec.groovy +++ b/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/TopApplicationExecutionCleanupPollingNotificationAgentSpec.groovy @@ -31,11 +31,11 @@ import com.netflix.spinnaker.orca.pipeline.model.PipelineExecutionImpl import com.netflix.spinnaker.orca.pipeline.model.StageExecutionImpl import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import com.netflix.spinnaker.orca.sql.pipeline.persistence.SqlExecutionRepository -import org.jooq.SQLDialect import spock.lang.AutoCleanup import spock.lang.Shared import spock.lang.Specification +import javax.sql.DataSource import java.time.Instant import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.* @@ -82,7 +82,7 @@ abstract class TopApplicationExecutionCleanupPollingNotificationAgentSpec extend def setupSpec() { currentDatabase = getDatabase() - executionRepository = new SqlExecutionRepository("test", currentDatabase.context, mapper, new RetryProperties(), 10, 100, "poolName", null, [], new ExecutionCompressionProperties(), false) + executionRepository = new SqlExecutionRepository("test", currentDatabase.context, mapper, new RetryProperties(), 10, 100, "poolName", "readPoolName", null, [], new ExecutionCompressionProperties(), false, Mock(DataSource)) } def cleanup() { diff --git a/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlPipelineExecutionRepositorySpec.groovy b/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlPipelineExecutionRepositorySpec.groovy index 8f8176a607..5c256bb864 100644 --- a/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlPipelineExecutionRepositorySpec.groovy +++ b/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlPipelineExecutionRepositorySpec.groovy @@ -43,6 +43,8 @@ import spock.lang.Shared import spock.lang.Subject import spock.lang.Unroll +import javax.sql.DataSource + import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.cleanupDb import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initDualTcMysqlDatabases import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initDualTcPostgresDatabases @@ -98,7 +100,19 @@ abstract class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepos ExecutionRepository createExecutionRepository(String partition, Interlink interlink = null, boolean compression = false) { return InstrumentedProxy.proxy( new DefaultRegistry(), - new SqlExecutionRepository(partition, currentDatabase.context, mapper, new RetryProperties(), 10, 100, "poolName", interlink, [], new ExecutionCompressionProperties(enabled: compression), false), + new SqlExecutionRepository(partition, + currentDatabase.context, + mapper, + new RetryProperties(), + 10, + 100, + "poolName", + "readPoolName", + interlink, + [], + new ExecutionCompressionProperties(enabled: compression), + false, + Mock(DataSource)), "namespace") } diff --git a/orca-sql/src/test/java/com/netflix/spinnaker/orca/testconfig/SqlConfigurationTest.java b/orca-sql/src/test/java/com/netflix/spinnaker/orca/testconfig/SqlConfigurationTest.java new file mode 100644 index 0000000000..4ac7e733e2 --- /dev/null +++ b/orca-sql/src/test/java/com/netflix/spinnaker/orca/testconfig/SqlConfigurationTest.java @@ -0,0 +1,99 @@ +/* + * Copyright 2023 Salesforce, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License") + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.testconfig; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.netflix.spinnaker.config.SqlConfiguration; +import com.netflix.spinnaker.kork.sql.test.SqlTestUtil; +import javax.sql.DataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.springframework.boot.context.annotation.UserConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; + +class SqlConfigurationTest { + + private static SqlTestUtil.TestDatabase database = SqlTestUtil.initTcMysqlDatabase(); + + // Without .withAllowBeanDefinitionOverriding(true), this fails with an error: + // + // org.springframework.beans.factory.support.BeanDefinitionOverrideException: + // Invalid bean definition with name 'liquibase' defined in + // com.netflix.spinnaker.config.SqlConfiguration: Cannot register bean + // definition [Root bean: class [null]; scope=; abstract=false; lazyInit=null; + // autowireMode=3; dependencyCheck=0; autowireCandidate=true; primary=false; + // factoryBeanName=sqlConfiguration; factoryMethodName=liquibase; + // initMethodName=null; destroyMethodName=(inferred); defined in + // com.netflix.spinnaker.config.SqlConfiguration] for bean 'liquibase': There + // is already [Root bean: class [null]; scope=; abstract=false; lazyInit=null; + // autowireMode=3; dependencyCheck=0; autowireCandidate=true; primary=false; + // factoryBeanName=com.netflix.spinnaker.kork.sql.config.DefaultSqlConfiguration; + // factoryMethodName=liquibase; initMethodName=null; + // destroyMethodName=(inferred); defined in + // com.netflix.spinnaker.kork.sql.config.DefaultSqlConfiguration] bound. + // + // Even though DefaultSqlConfiguration has + // + // @Bean + // @ConditionalOnMissingBean(SpringLiquibase::class) + // fun liquibase(properties: SqlProperties, @Value("\${sql.read-only:false}") sqlReadOnly: + // Boolean): SpringLiquibase = + // SpringLiquibaseProxy(properties.migration, sqlReadOnly) + // + // which makes sense if DefaultSqlConfiguration gets processed first... + private final ApplicationContextRunner runner = + new ApplicationContextRunner() + .withPropertyValues( + "sql.enabled=true", + "sql.connectionPools.default.default=true", + "sql.connectionPools.default.jdbcUrl=" + SqlTestUtil.tcJdbcUrl, + "sql.migration.jdbcUrl=" + SqlTestUtil.tcJdbcUrl) + .withAllowBeanDefinitionOverriding(true) + .withConfiguration(UserConfigurations.of(SqlConfiguration.class)); + + @BeforeEach + void init(TestInfo testInfo) { + System.out.println("--------------- Test " + testInfo.getDisplayName()); + } + + @Test + void testDataSourceWithDefaultConnectionPool() { + runner.run( + ctx -> { + DataSource dataSource = ctx.getBean(DataSource.class); + assertThat(dataSource).isNotNull(); + // with only one connection pool configured, we get a "plain" data source with no routing. + assertThat(dataSource instanceof AbstractRoutingDataSource).isFalse(); + }); + } + + @Test + void testDataSourceWithReadConnectionPool() { + runner + .withPropertyValues("sql.connectionPools.read.jdbcUrl=" + SqlTestUtil.tcJdbcUrl) + .run( + ctx -> { + DataSource dataSource = ctx.getBean(DataSource.class); + assertThat(dataSource).isNotNull(); + // with multiple connection pools configured, we get a routing data source + assertThat(dataSource instanceof AbstractRoutingDataSource).isTrue(); + }); + } +} diff --git a/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositoryTest.kt b/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositoryTest.kt index 0b85ea5cb6..1d0632282a 100644 --- a/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositoryTest.kt +++ b/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositoryTest.kt @@ -15,6 +15,7 @@ */ package com.netflix.spinnaker.orca.sql.pipeline.persistence +import com.fasterxml.jackson.databind.ObjectMapper import com.netflix.spinnaker.config.CompressionMode import com.netflix.spinnaker.config.CompressionType import com.netflix.spinnaker.config.ExecutionCompressionProperties @@ -32,14 +33,21 @@ import com.netflix.spinnaker.orca.pipeline.model.StageExecutionImpl import com.netflix.spinnaker.orca.pipeline.model.support.TriggerDeserializer import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionNotFoundException import com.netflix.spinnaker.orca.sql.PipelineRefTriggerDeserializerSupplier +import com.nhaarman.mockito_kotlin.atLeastOnce +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.verify import dev.minutest.junit.JUnit5Minutests import dev.minutest.rootContext import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy +import org.jooq.DSLContext import org.jooq.impl.DSL.field import org.junit.jupiter.api.Assumptions.assumeTrue import org.testcontainers.DockerClientFactory import java.lang.System.currentTimeMillis +import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource +import javax.sql.DataSource class SqlExecutionRepositoryTest : JUnit5Minutests { @@ -567,6 +575,64 @@ class SqlExecutionRepositoryTest : JUnit5Minutests { assertThat(actualPipelineExecution).isEqualTo(pipelineExecution) } } + context("read pool usage") { + val mockedDslContext = mock() + val mockedObjectMapper = mock() + val mockedAbstractRoutingDataSource = mock() + + test("fallback when read pool is not configured") { + + val poolName = "poolName" + val readPoolName = "myReadPoolName" + + doReturn(mapOf("someOtherPool" to mock())).`when`(mockedAbstractRoutingDataSource).resolvedDataSources + + val sqlExecutionRepository = SqlExecutionRepository("test", + mockedDslContext, + mockedObjectMapper, + testRetryProprties, + 10, + 100, + poolName, + readPoolName, + null, + emptyList(), + executionCompressionPropertiesEnabled, + false, + mockedAbstractRoutingDataSource + ) + + verify(mockedAbstractRoutingDataSource, atLeastOnce()).resolvedDataSources + assertThat(sqlExecutionRepository.readPoolName).isEqualTo(poolName) + } + + test("use read pool when configured") { + + val poolName = "poolName" + val readPoolName = "myReadPoolName" + + doReturn(mapOf(readPoolName to mock())).`when`(mockedAbstractRoutingDataSource).resolvedDataSources + + val sqlExecutionRepository = SqlExecutionRepository("test", + mockedDslContext, + mockedObjectMapper, + testRetryProprties, + 10, + 100, + poolName, + readPoolName, + null, + emptyList(), + executionCompressionPropertiesEnabled, + false, + mockedAbstractRoutingDataSource + ) + + verify(mockedAbstractRoutingDataSource, atLeastOnce()).resolvedDataSources + assertThat(sqlExecutionRepository.readPoolName).isEqualTo(readPoolName) + } + + } } private inner class Fixture { @@ -589,6 +655,7 @@ class SqlExecutionRepositoryTest : JUnit5Minutests { bodyCompressionThreshold = 9 compressionType = CompressionType.ZLIB } + val mockDataSource = mock() val sqlExecutionRepository = SqlExecutionRepository( @@ -599,10 +666,12 @@ class SqlExecutionRepositoryTest : JUnit5Minutests { 10, 100, "poolName", + "myReadPoolName", null, emptyList(), executionCompressionPropertiesEnabled, - false + false, + mockDataSource ) val executionCompressionPropertiesDisabled = ExecutionCompressionProperties().apply { @@ -618,10 +687,12 @@ class SqlExecutionRepositoryTest : JUnit5Minutests { 10, 100, "poolName", + "myReadPoolName", null, emptyList(), executionCompressionPropertiesDisabled, - false + false, + mockDataSource ) val executionCompressionPropertiesReadOnly = ExecutionCompressionProperties().apply { @@ -640,10 +711,12 @@ class SqlExecutionRepositoryTest : JUnit5Minutests { 10, 100, "poolName", + "myReadPoolName", null, emptyList(), executionCompressionPropertiesReadOnly, - false + false, + mockDataSource ) val sqlExecutionRepositoryWithPipelineRefOnly = @@ -655,10 +728,12 @@ class SqlExecutionRepositoryTest : JUnit5Minutests { 10, 100, "poolName", + "myReadPoolName", null, emptyList(), executionCompressionPropertiesDisabled, - true + true, + mockDataSource ) val sqlExecutionRepositoryWithPipelineRefAndCompression = @@ -670,10 +745,12 @@ class SqlExecutionRepositoryTest : JUnit5Minutests { 10, 100, "poolName", + "myReadPoolName", null, emptyList(), executionCompressionPropertiesEnabled, - true + true, + mockDataSource ) fun addCustomDeserializerWithFeatureFlagEnabled() { diff --git a/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt b/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt index 24c9f0f764..6a5a2b3e23 100644 --- a/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt +++ b/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt @@ -61,7 +61,8 @@ class TaskControllerTest : JUnit5Minutests { mapper = OrcaObjectMapper.getInstance(), retryProperties = RetryProperties(), compressionProperties = ExecutionCompressionProperties(), - pipelineRefEnabled = false + pipelineRefEnabled = false, + dataSource = mock() ) private val taskControllerConfigurationProperties: TaskControllerConfigurationProperties = TaskControllerConfigurationProperties()