Closed as not planned
Description
Issue
I’m developing a Spring-based web application in Kotlin that pushes messages from Redis Pub/Sub to active SSE (Server-Sent Events) connections. However, while running integration tests, I discovered that messages are being processed twice.
My code
Config
@Configuration
@EnableConfigurationProperties(RedisProperties::class)
class RedisConfig(
val props: RedisProperties,
) {
@Bean
fun lettuceConnectionFactory(): RedisConnectionFactory =
LettuceConnectionFactory(RedisStandaloneConfiguration(props.host, props.port.toInt()))
@Bean
fun redisMessageListenerContainer(connectionFactory: RedisConnectionFactory, notificationMessageListener: MessageListenerAdapter): RedisMessageListenerContainer =
RedisMessageListenerContainer()
.apply {
setConnectionFactory(connectionFactory)
}
@Bean
fun notificationMessageListener(connectionPool: ConnectionPool) = MessageListenerAdapter(NotificationsMessageListener(connectionPool))
}
Controller
@RestController
@RequestMapping("/notify/{userId}")
class SubscriptionsController(
private val subscriptionManager: SubscriptionManager,
) {
@GetMapping(produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
override suspend fun notificationsStream(
@PathVariable userId: String,
): Flow<String> =
channelFlow {
subscriptionManager.subscribe(userId, channel)
awaitClose {
subscriptionManager.unsubscribe(userId, channel)
}
}
}
Subscription Manager
@Component
class SubscriptionManager(
private val connectionPool: ConnectionPool,
private val redisMessageListenerContainer: RedisMessageListenerContainer,
private val notificationMessageListener: MessageListenerAdapter,
) {
override fun subscribe(
userId: String,
connection: SendChannel<String>,
) {
connectionPool.add(userId, connection)
redisMessageListenerContainer.addMessageListener(notificationMessageListener, ChannelTopic(userId))
}
override fun unsubscribe(
userId: String,
connection: SendChannel<String>,
) {
if (connectionPool.remove(userId, connection).isEmpty()) {
redisMessageListenerContainer.removeMessageListener(notificationMessageListener, ChannelTopic(userId))
}
}
}
Connection pool
@Component
class ConnectionPool {
private val connections = ConcurrentHashMap<String, Set<SendChannel<String>>>()
fun add(
userId: String,
connection: SendChannel<String>,
) = connections.compute(userId) { _, connections ->
connections?.plus(connection) ?: setOf(connection)
}
fun find(userId: String): Set<SendChannel<String>> = connections[userId] ?: emptySet()
fun remove(
userId: String,
connection: SendChannel<String>,
) = connections.computeIfPresent(userId) { _, connections ->
removeConnection(connections, connection)
} ?: emptySet()
private fun removeConnection(
connections: Set<SendChannel<String>>,
connection: SendChannel<String>,
): Set<SendChannel<String>>? {
val filteredConnections = connections.filterNot { it == connection }
return if (filteredConnections.isEmpty()) null else filteredConnections.toSet()
}
}
Notification listener
class NotificationsMessageListener(
private val connectionPool: ConnectionPool,
) {
fun handleMessage(
message: String,
userId: String,
) = runBlocking {
println("${Thread.currentThread().name} $userId $message")
connectionPool
.find(userId)
.forEach { it.send(message) }
}
}
Integration test:
@ActiveProfiles("test")
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
)
class SubscriptionsControllerIT(
@Autowired private val template: RedisTemplate<String, String>,
@Autowired private val webTestClient: WebTestClient,
) : StringSpec({
override fun extensions() = listOf(SpringExtension)
"should return 200 with same events for both streams" {
// given
val userId = UUID.randomUUID().toString()
val messages = listOf("test1", "test2", "test3")
launch(Dispatchers.IO) {
delay(1.seconds)
messages.forEach {
template.convertAndSend(userId, it)
delay(100.milliseconds)
}
}
val result1Job =
async(Dispatchers.IO) {
webTestClient
.get()
.uri("/notify/$userId")
.exchange()
.expectStatus()
.isOk
.expectHeader()
.contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM_VALUE)
.returnResult(String::class.java)
.responseBody
.take(messages.size.toLong())
.collectList()
.block() ?: emptyList()
}
val result2Job =
async(Dispatchers.IO) {
webTestClient
.get()
.uri("/notify/$userId")
.exchange()
.expectStatus()
.isOk
.expectHeader()
.contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM_VALUE)
.returnResult(String::class.java)
.responseBody
.take(messages.size.toLong())
.collectList()
.block() ?: emptyList()
}
// then
eventually(5.seconds) {
val result1 = result1Job.await()
result1 shouldBe messages
val result2 = result2Job.await()
result2 shouldBe messages
}
}
})
Console output:
12:53:30.994 [webflux-http-nio-2] DEBUG c.v.p.n.a.http.filters.LoggingWebFilter - Incoming request GET:/notify
12:53:30.994 [webflux-http-nio-3] DEBUG c.v.p.n.a.http.filters.LoggingWebFilter - Incoming request GET:/v1/notify
redisMessageListenerContainer-1 8628515b-8799-4321-9612-25121a1eceb2 test1
redisMessageListenerContainer-2 8628515b-8799-4321-9612-25121a1eceb2 test1
redisMessageListenerContainer-3 8628515b-8799-4321-9612-25121a1eceb2 test2
redisMessageListenerContainer-4 8628515b-8799-4321-9612-25121a1eceb2 test2
12:53:32.022 [webflux-http-nio-3] DEBUG c.v.p.n.a.http.filters.LoggingWebFilter - Outgoing response GET:/notify, 200 OK
12:53:32.022 [webflux-http-nio-2] DEBUG c.v.p.n.a.http.filters.LoggingWebFilter - Outgoing response GET:/notify, 200 OK
Workaround
When I modify the bean configuration to add a dummy listener at startup and initialize message listening eagerly, the issue is resolved.
@Bean
fun redisMessageListenerContainer(connectionFactory: RedisConnectionFactory, notificationMessageListener: MessageListenerAdapter): RedisMessageListenerContainer =
RedisMessageListenerContainer()
.apply {
setConnectionFactory(connectionFactory)
addMessageListener(notificationMessageListener, ChannelTopic("dummy-start-listen")) // added
}
Console output:
12:54:36.208 [webflux-http-nio-2] DEBUG c.v.p.n.a.http.filters.LoggingWebFilter - Incoming request GET:/notify
12:54:36.208 [webflux-http-nio-3] DEBUG c.v.p.n.a.http.filters.LoggingWebFilter - Incoming request GET:/notify
redisMessageListenerContainer-1 411147eb-e2cd-4c62-b651-ba750faea0ff test1
redisMessageListenerContainer-2 411147eb-e2cd-4c62-b651-ba750faea0ff test2
redisMessageListenerContainer-3 411147eb-e2cd-4c62-b651-ba750faea0ff test3
12:54:37.371 [webflux-http-nio-2] DEBUG c.v.p.n.a.http.filters.LoggingWebFilter - Outgoing response GET:/notify, 200 OK
12:54:37.370 [webflux-http-nio-3] DEBUG c.v.p.n.a.http.filters.LoggingWebFilter - Outgoing response GET:/notify, 200 OK