diff --git a/README.md b/README.md index 588bda2..33cc282 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,8 @@ projects: configuration: redis: uri: "YOUR REDIS URI" # e.g. "redis://localhost:6379" + useCluster: false # Set to true for Redis Cluster + readFrom: "ANY" # Read routing: "ANY" (default) or "REPLICA_PREFERRED" ``` The developer docs contain [additional information about configuration](https://docs.developers.amplitude.com/experiment/infra/evaluation-proxy#configuration). diff --git a/core/src/main/kotlin/Config.kt b/core/src/main/kotlin/Config.kt index 76c53a2..c0cb1b9 100644 --- a/core/src/main/kotlin/Config.kt +++ b/core/src/main/kotlin/Config.kt @@ -161,6 +161,7 @@ data class RedisConfiguration( val uri: String? = null, val readOnlyUri: String? = uri, val useCluster: Boolean = false, + val readFrom: String = Default.REDIS_READ_FROM, val prefix: String = Default.REDIS_PREFIX, val scanLimit: Long = Default.REDIS_SCAN_LIMIT, val connectionTimeoutMillis: Long = Default.REDIS_CONNECTION_TIMEOUT_MILLIS, @@ -174,6 +175,7 @@ data class RedisConfiguration( return if (redisUri != null) { val redisReadOnlyUri = stringEnv(EnvKey.REDIS_READ_ONLY_URI, Default.REDIS_READ_ONLY_URI) ?: redisUri + val redisReadFrom = stringEnv(EnvKey.REDIS_READ_FROM, Default.REDIS_READ_FROM)!! val redisPrefix = stringEnv(EnvKey.REDIS_PREFIX, Default.REDIS_PREFIX)!! val redisScanLimit = longEnv(EnvKey.REDIS_SCAN_LIMIT, Default.REDIS_SCAN_LIMIT)!! val connectionTimeoutMillis = longEnv(EnvKey.REDIS_CONNECTION_TIMEOUT_MILLIS, Default.REDIS_CONNECTION_TIMEOUT_MILLIS)!! @@ -184,6 +186,7 @@ data class RedisConfiguration( uri = redisUri, readOnlyUri = redisReadOnlyUri, useCluster = useCluster, + readFrom = redisReadFrom, prefix = redisPrefix, scanLimit = redisScanLimit, connectionTimeoutMillis = connectionTimeoutMillis, @@ -242,6 +245,7 @@ object EnvKey { const val REDIS_URI = "AMPLITUDE_REDIS_URI" const val REDIS_READ_ONLY_URI = "AMPLITUDE_REDIS_READ_ONLY_URI" const val REDIS_USE_CLUSTER = "AMPLITUDE_REDIS_USE_CLUSTER" + const val REDIS_READ_FROM = "AMPLITUDE_REDIS_READ_FROM" const val REDIS_PREFIX = "AMPLITUDE_REDIS_PREFIX" const val REDIS_SCAN_LIMIT = "AMPLITUDE_REDIS_SCAN_LIMIT" const val REDIS_CONNECTION_TIMEOUT_MILLIS = "AMPLITUDE_REDIS_CONNECTION_TIMEOUT_MILLIS" @@ -277,6 +281,7 @@ object Default { val REDIS_URI: String? = null val REDIS_READ_ONLY_URI: String? = null const val REDIS_USE_CLUSTER = false + const val REDIS_READ_FROM = "ANY" const val REDIS_PREFIX = "amplitude" const val REDIS_SCAN_LIMIT = 10000L const val REDIS_CONNECTION_TIMEOUT_MILLIS = 10000L diff --git a/core/src/main/kotlin/util/redis/RedisConnections.kt b/core/src/main/kotlin/util/redis/RedisConnections.kt index 62bca00..76b8791 100644 --- a/core/src/main/kotlin/util/redis/RedisConnections.kt +++ b/core/src/main/kotlin/util/redis/RedisConnections.kt @@ -1,6 +1,8 @@ package com.amplitude.util.redis import com.amplitude.RedisConfiguration +import com.amplitude.util.logger +import io.lettuce.core.ReadFrom /** * Container for Redis connections - primary for writes, readOnly for high-volume reads @@ -10,6 +12,29 @@ internal data class RedisConnections( val readOnly: Redis, ) +internal object RedisConnectionsLogger { + val log by logger() +} + +/** + * Parse readFrom configuration string to Lettuce ReadFrom enum. + * Supports: ANY, REPLICA_PREFERRED + * Invalid values default to ANY with a warning. + */ +internal fun parseReadFrom(readFromStr: String): ReadFrom { + return when (readFromStr.uppercase()) { + "ANY" -> ReadFrom.ANY + "REPLICA_PREFERRED" -> ReadFrom.REPLICA_PREFERRED + else -> { + RedisConnectionsLogger.log.warn( + "Invalid readFrom value: '$readFromStr'. " + + "Supported values: ANY, REPLICA_PREFERRED. Defaulting to ANY.", + ) + ReadFrom.ANY + } + } +} + /** * Creates Redis connections based on configuration. * Returns null if no Redis is configured (should use in-memory storage). @@ -19,6 +44,8 @@ internal fun createRedisConnections(redisConfiguration: RedisConfiguration?): Re redisConfiguration == null -> null redisConfiguration.useCluster && !redisConfiguration.uri.isNullOrBlank() -> { + val readFromStrategy = parseReadFrom(redisConfiguration.readFrom) + val redis = RedisClusterConnection( redisConfiguration.uri, @@ -32,7 +59,7 @@ internal fun createRedisConnections(redisConfiguration: RedisConfiguration?): Re redisConfiguration.readOnlyUri ?: redisConfiguration.uri, redisConfiguration.connectionTimeoutMillis, redisConfiguration.commandTimeoutMillis, - io.lettuce.core.ReadFrom.REPLICA_PREFERRED, + readFromStrategy, ) RedisConnections(redis, readOnlyRedis) } diff --git a/core/src/test/kotlin/util/redis/RedisConnectionsTest.kt b/core/src/test/kotlin/util/redis/RedisConnectionsTest.kt new file mode 100644 index 0000000..3b99ffe --- /dev/null +++ b/core/src/test/kotlin/util/redis/RedisConnectionsTest.kt @@ -0,0 +1,50 @@ +package util.redis + +import com.amplitude.util.redis.parseReadFrom +import io.lettuce.core.ReadFrom +import org.junit.Assert.assertEquals +import org.junit.Test + +class RedisConnectionsTest { + @Test + fun `parseReadFrom handles ANY`() { + val result = parseReadFrom("ANY") + assertEquals(ReadFrom.ANY, result) + } + + @Test + fun `parseReadFrom handles any case insensitive`() { + val result1 = parseReadFrom("any") + val result2 = parseReadFrom("Any") + val result3 = parseReadFrom("aNy") + assertEquals(ReadFrom.ANY, result1) + assertEquals(ReadFrom.ANY, result2) + assertEquals(ReadFrom.ANY, result3) + } + + @Test + fun `parseReadFrom handles REPLICA_PREFERRED`() { + val result = parseReadFrom("REPLICA_PREFERRED") + assertEquals(ReadFrom.REPLICA_PREFERRED, result) + } + + @Test + fun `parseReadFrom handles replica_preferred case insensitive`() { + val result1 = parseReadFrom("replica_preferred") + val result2 = parseReadFrom("Replica_Preferred") + assertEquals(ReadFrom.REPLICA_PREFERRED, result1) + assertEquals(ReadFrom.REPLICA_PREFERRED, result2) + } + + @Test + fun `parseReadFrom defaults to ANY for invalid values`() { + val result1 = parseReadFrom("INVALID") + val result2 = parseReadFrom("MASTER") + val result3 = parseReadFrom("") + val result4 = parseReadFrom("replica") + assertEquals(ReadFrom.ANY, result1) + assertEquals(ReadFrom.ANY, result2) + assertEquals(ReadFrom.ANY, result3) + assertEquals(ReadFrom.ANY, result4) + } +}