diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueConfiguration.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueConfiguration.kt index 40636be8aa..39849735da 100644 --- a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueConfiguration.kt +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueConfiguration.kt @@ -46,6 +46,7 @@ class SqlQueueConfiguration { lockTtlSeconds = properties.lockTtlSeconds, mapper = mapper, serializationMigrator = serializationMigrator, + resetAttemptsOnAck = properties.resetAttemptsOnAck, ackTimeout = properties.ackTimeout, deadMessageHandlers = listOf(deadMessageHandler), publisher = publisher, diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueProperties.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueProperties.kt index 1117f23cab..51e931b36f 100644 --- a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueProperties.kt +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueProperties.kt @@ -39,6 +39,13 @@ class SqlQueueProperties { */ var ackTimeout: Duration = Duration.ofMinutes(2) + /** + * When set to true, the number of ack attempts in the message metadata will be reset to 0 + * when the message is acked. This is to avoid having this count grow over time for long-lived + * messages that risk being dropped when the max attempts is reached. + */ + var resetAttemptsOnAck: Boolean = true + /** * The length of time in seconds that a message with a locked set on the queue table has to * be moved to the unacked table, signifying that it is actively being processed. Messages diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt index b65e8b16f2..6d66eaa199 100644 --- a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt @@ -71,6 +71,7 @@ class SqlQueue( private val lockTtlSeconds: Int, private val mapper: ObjectMapper, private val serializationMigrator: Optional, + private val resetAttemptsOnAck: Boolean = true, override val ackTimeout: Duration = Duration.ofMinutes(5), override val deadMessageHandlers: List, override val canPollMany: Boolean = true, @@ -354,7 +355,7 @@ class SqlQueue( atTime(ackTimeout) }, maxAttempts = message.getAttribute()?.maxAttempts ?: 0, - ackCallback = this::ackMessage.partially1(fingerprint) + ackCallback = this::ackMessage.partially1(fingerprint) // partial application, still needs an up-to-date version of the message ) ) } catch (e: Exception) { @@ -441,7 +442,7 @@ class SqlQueue( .filterNot { toRelease.contains(it.queueId) } .forEach { fire(MessageProcessing(it.message, it.scheduledTime, clock.instant())) - callback(it.message, it.ackCallback) + callback(it.message, { it.ackCallback(it.message) }) } } @@ -700,7 +701,8 @@ class SqlQueue( if (ackAttemptsAttribute.ackAttempts >= Queue.maxRetries || (maxAttempts > 0 && attempts > maxAttempts) ) { - log.warn("Message $fingerprint with payload $message exceeded max ack retries") + log.warn("Message $fingerprint with payload $message exceeded max ack retries," + + " moving to DLQ attempts=$attempts acks=$acks maxAttempts=$maxAttempts") dlq = true } } catch (e: Exception) { @@ -842,7 +844,10 @@ class SqlQueue( } } - private fun ackMessage(fingerprint: String) { + private fun ackMessage(fingerprint: String, message: Message) { + if (log.isDebugEnabled) { + log.debug("Acking message $fingerprint") + } withPool(poolName) { withRetry(WRITE) { jooq.deleteFrom(unackedTable) @@ -850,17 +855,51 @@ class SqlQueue( .execute() } - withRetry(WRITE) { - jooq.update(messagesTable) - .set(updatedAtField, clock.millis()) - .where(fingerprintField.eq(fingerprint)) - .execute() + if (resetAttemptsOnAck) { + resetAcksAndMarkUpdated(fingerprint, message) + } else { + markMessageUpdatedOptBody(fingerprint, null) // only set updatedAt } } fire(MessageAcknowledged) } + /** + * Mark the message as updated by setting its updateAt field. + * Optionally update the message body if provided. + */ + private fun markMessageUpdatedOptBody(fingerprint: String, message: Message?) = + withRetry(WRITE) { + val step = jooq.update(messagesTable) + .set(updatedAtField, clock.millis()) + if (message != null) { + step.set(bodyField, mapper.writeValueAsString(message)) + } + step.where(fingerprintField.eq(fingerprint)) + .execute() + } + + /** + * Mark the message as updated, and reset ackAttempts to 0 if needed. + * This is to avoid messages gradually growing their ackAttempts value over time + * due to sporadic failures until they are dropped. Since we're acking the message, + * we also reset this counter here if it was non-zero. + */ + private fun resetAcksAndMarkUpdated(fingerprint: String, message: Message) { + val ackAttemptsAttribute = (message.getAttribute() ?: AckAttemptsAttribute()) + val updateMessage = ackAttemptsAttribute.ackAttempts > 0 + if (updateMessage) { + log.info( + "Resetting ackAttempts for message {}, was {}", // at INFO since this is somewhat unusual + fingerprint, ackAttemptsAttribute.ackAttempts + ) + ackAttemptsAttribute.ackAttempts = 0 + message.setAttribute(ackAttemptsAttribute) + } + markMessageUpdatedOptBody(fingerprint, message) + } + private fun deleteAll(fingerprint: String) { withRetry(WRITE) { jooq.deleteFrom(queueTable) @@ -963,6 +1002,6 @@ class SqlQueue( val message: Message, val expiry: Long, val maxAttempts: Int, - val ackCallback: () -> Unit + val ackCallback: (Message) -> Unit ) } diff --git a/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlAckQueueTest.kt b/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlAckQueueTest.kt new file mode 100644 index 0000000000..e7bb78f824 --- /dev/null +++ b/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlAckQueueTest.kt @@ -0,0 +1,117 @@ +/* + * Copyright 2023 Apple, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.q.sql + +import com.netflix.spinnaker.kork.sql.test.SqlTestUtil +import com.netflix.spinnaker.q.AckAttemptsAttribute +import com.netflix.spinnaker.q.DeadMessageCallback +import com.netflix.spinnaker.q.TestMessage +import com.netflix.spinnaker.q.metrics.EventPublisher +import com.netflix.spinnaker.time.MutableClock +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.reset +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.junit.platform.runner.JUnitPlatform +import org.junit.runner.RunWith +import java.util.* + + +@RunWith(JUnitPlatform::class) +class SqlAckQueueTest : Spek({ + describe("both values of resetAttemptsOnAck") { + // map of resetAttemptsOnAck to expected number of ackAttempts still on the message after ack + val flagToAckAttempts = mapOf( + true to 0, + false to 1 + ) + + // check both values of resetAttemptsOnAck + flagToAckAttempts.forEach { resetFlag, expectedAckAttempts -> + val testDescription = "SqlQueue with resetAttemptsOnAck = $resetFlag" + + given(testDescription) { + var queue: SqlQueue? = null + val clock = MutableClock() + val deadMessageHandler: DeadMessageCallback = mock() + val publisher: EventPublisher = mock() + + val testDb = SqlTestUtil.initTcMysqlDatabase() + val jooq = testDb.context + + fun resetMocks() = reset(deadMessageHandler, publisher) + + fun stopQueue() { + SqlTestUtil.cleanupDb(jooq) + } + + fun startQueue() { + queue = createQueueWithResetAck(clock, deadMessageHandler, publisher, resetFlag) // from SqlQueueTest + } + + describe("message is dropped once then retried successfully") { + beforeGroup(::startQueue) + afterGroup(::stopQueue) + afterGroup(::resetMocks) + + given("a test message") { + val message = TestMessage("a") + + on("pushing a message that gets dropped") { + with(queue!!) { + push(message) + poll { _, _ -> } // do not ack the message after de-queuing it + clock.incrementBy(ackTimeout) + + retry() // make it available again + clock.incrementBy(ackTimeout) + } + } + + it("has an ackAttempt count of 1 upon retrying") { + with(queue!!) { + poll { msg, ack -> + val ackAttemptsAttribute = (msg.getAttribute() ?: AckAttemptsAttribute()) + assert(ackAttemptsAttribute.ackAttempts == 1) + ack() // *now* we acknowledge the message + push(msg) // and re-enqueue it (as with a RunTask returning RUNNING) + } + } + } + + val validatesAckAttrAfterAckdRetry = if (expectedAckAttempts == 0) { + "has reset its ackAttempts upon being ack'd" + } else { + "still has ackAttempts=1 even after being ack'd" + } + it(validatesAckAttrAfterAckdRetry) { + with(queue!!) { + poll { msg, ack -> + val ackAttemptsAttribute = (msg.getAttribute() ?: AckAttemptsAttribute()) + assert(ackAttemptsAttribute.ackAttempts == expectedAckAttempts) + } + } + } + } + } + } + } + } +}) diff --git a/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlQueueTest.kt b/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlQueueTest.kt index 2d5f3235c0..75b0f82ad8 100644 --- a/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlQueueTest.kt +++ b/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlQueueTest.kt @@ -37,7 +37,13 @@ private val createQueueNoPublisher = { clock: Clock, private fun createQueue(clock: Clock, deadLetterCallback: DeadMessageCallback, - publisher: EventPublisher?): SqlQueue { + publisher: EventPublisher?): SqlQueue = + createQueueWithResetAck(clock, deadLetterCallback, publisher, true) + +internal fun createQueueWithResetAck(clock: Clock, + deadLetterCallback: DeadMessageCallback, + publisher: EventPublisher?, + resetAttemptsOnAck: Boolean): SqlQueue { return SqlQueue( queueName = "test", schemaVersion = 1, @@ -56,6 +62,7 @@ private fun createQueue(clock: Clock, ) }, serializationMigrator = Optional.empty(), + resetAttemptsOnAck = resetAttemptsOnAck, ackTimeout = Duration.ofSeconds(60), deadMessageHandlers = listOf(deadLetterCallback), publisher = publisher ?: ( diff --git a/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlOrcaQueueConfiguration.kt b/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlOrcaQueueConfiguration.kt index 4d57f0bb2f..57c9ea18d2 100644 --- a/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlOrcaQueueConfiguration.kt +++ b/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlOrcaQueueConfiguration.kt @@ -88,6 +88,7 @@ class SqlOrcaQueueConfiguration : SqlQueueConfiguration() { lockTtlSeconds = properties.lockTtlSeconds, mapper = mapper, serializationMigrator = serializationMigrator, + resetAttemptsOnAck = properties.resetAttemptsOnAck, ackTimeout = properties.ackTimeout, deadMessageHandlers = listOf(deadMessageHandler), publisher = publisher, diff --git a/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueShovelConfiguration.kt b/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueShovelConfiguration.kt index f09ca7d653..67c8722266 100644 --- a/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueShovelConfiguration.kt +++ b/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueShovelConfiguration.kt @@ -63,6 +63,7 @@ class SqlQueueShovelConfiguration { lockTtlSeconds = properties.lockTtlSeconds, mapper = mapper, serializationMigrator = serializationMigrator, + resetAttemptsOnAck = properties.resetAttemptsOnAck, ackTimeout = properties.ackTimeout, deadMessageHandlers = listOf(deadMessageHandler), publisher = publisher, diff --git a/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlRedisQueueShovelConfiguration.kt b/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlRedisQueueShovelConfiguration.kt index f7f9e2d31a..d4e34f8067 100644 --- a/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlRedisQueueShovelConfiguration.kt +++ b/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlRedisQueueShovelConfiguration.kt @@ -155,6 +155,7 @@ class SqlRedisQueueShovelConfiguration { lockTtlSeconds = sqlQueueProperties.lockTtlSeconds, mapper = mapper, serializationMigrator = serializationMigrator, + resetAttemptsOnAck = sqlQueueProperties.resetAttemptsOnAck, deadMessageHandlers = emptyList(), publisher = publisher, sqlRetryProperties = sqlQueueProperties.retries diff --git a/orca-queue-sql/src/test/kotlin/com/netflix/spinnaker/orca/q/sql/SqlQueueIntegrationTest.kt b/orca-queue-sql/src/test/kotlin/com/netflix/spinnaker/orca/q/sql/SqlQueueIntegrationTest.kt index 462694dcd0..3ac91175b6 100644 --- a/orca-queue-sql/src/test/kotlin/com/netflix/spinnaker/orca/q/sql/SqlQueueIntegrationTest.kt +++ b/orca-queue-sql/src/test/kotlin/com/netflix/spinnaker/orca/q/sql/SqlQueueIntegrationTest.kt @@ -106,6 +106,7 @@ class SqlTestConfig { 1, mapper, Optional.empty(), + true, Duration.ofSeconds(1), emptyList(), true,