Skip to content

Commit e9cc024

Browse files
Support redis cluster readfrom strategy (#28)
1 parent e9c08d9 commit e9cc024

4 files changed

Lines changed: 85 additions & 1 deletion

File tree

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ projects:
2525
configuration:
2626
redis:
2727
uri: "YOUR REDIS URI" # e.g. "redis://localhost:6379"
28+
useCluster: false # Set to true for Redis Cluster
29+
readFrom: "ANY" # Read routing: "ANY" (default) or "REPLICA_PREFERRED"
2830
```
2931
3032
The developer docs contain [additional information about configuration](https://docs.developers.amplitude.com/experiment/infra/evaluation-proxy#configuration).

core/src/main/kotlin/Config.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ data class RedisConfiguration(
161161
val uri: String? = null,
162162
val readOnlyUri: String? = uri,
163163
val useCluster: Boolean = false,
164+
val readFrom: String = Default.REDIS_READ_FROM,
164165
val prefix: String = Default.REDIS_PREFIX,
165166
val scanLimit: Long = Default.REDIS_SCAN_LIMIT,
166167
val connectionTimeoutMillis: Long = Default.REDIS_CONNECTION_TIMEOUT_MILLIS,
@@ -174,6 +175,7 @@ data class RedisConfiguration(
174175

175176
return if (redisUri != null) {
176177
val redisReadOnlyUri = stringEnv(EnvKey.REDIS_READ_ONLY_URI, Default.REDIS_READ_ONLY_URI) ?: redisUri
178+
val redisReadFrom = stringEnv(EnvKey.REDIS_READ_FROM, Default.REDIS_READ_FROM)!!
177179
val redisPrefix = stringEnv(EnvKey.REDIS_PREFIX, Default.REDIS_PREFIX)!!
178180
val redisScanLimit = longEnv(EnvKey.REDIS_SCAN_LIMIT, Default.REDIS_SCAN_LIMIT)!!
179181
val connectionTimeoutMillis = longEnv(EnvKey.REDIS_CONNECTION_TIMEOUT_MILLIS, Default.REDIS_CONNECTION_TIMEOUT_MILLIS)!!
@@ -184,6 +186,7 @@ data class RedisConfiguration(
184186
uri = redisUri,
185187
readOnlyUri = redisReadOnlyUri,
186188
useCluster = useCluster,
189+
readFrom = redisReadFrom,
187190
prefix = redisPrefix,
188191
scanLimit = redisScanLimit,
189192
connectionTimeoutMillis = connectionTimeoutMillis,
@@ -242,6 +245,7 @@ object EnvKey {
242245
const val REDIS_URI = "AMPLITUDE_REDIS_URI"
243246
const val REDIS_READ_ONLY_URI = "AMPLITUDE_REDIS_READ_ONLY_URI"
244247
const val REDIS_USE_CLUSTER = "AMPLITUDE_REDIS_USE_CLUSTER"
248+
const val REDIS_READ_FROM = "AMPLITUDE_REDIS_READ_FROM"
245249
const val REDIS_PREFIX = "AMPLITUDE_REDIS_PREFIX"
246250
const val REDIS_SCAN_LIMIT = "AMPLITUDE_REDIS_SCAN_LIMIT"
247251
const val REDIS_CONNECTION_TIMEOUT_MILLIS = "AMPLITUDE_REDIS_CONNECTION_TIMEOUT_MILLIS"
@@ -277,6 +281,7 @@ object Default {
277281
val REDIS_URI: String? = null
278282
val REDIS_READ_ONLY_URI: String? = null
279283
const val REDIS_USE_CLUSTER = false
284+
const val REDIS_READ_FROM = "ANY"
280285
const val REDIS_PREFIX = "amplitude"
281286
const val REDIS_SCAN_LIMIT = 10000L
282287
const val REDIS_CONNECTION_TIMEOUT_MILLIS = 10000L

core/src/main/kotlin/util/redis/RedisConnections.kt

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.amplitude.util.redis
22

33
import com.amplitude.RedisConfiguration
4+
import com.amplitude.util.logger
5+
import io.lettuce.core.ReadFrom
46

57
/**
68
* Container for Redis connections - primary for writes, readOnly for high-volume reads
@@ -10,6 +12,29 @@ internal data class RedisConnections(
1012
val readOnly: Redis,
1113
)
1214

15+
internal object RedisConnectionsLogger {
16+
val log by logger()
17+
}
18+
19+
/**
20+
* Parse readFrom configuration string to Lettuce ReadFrom enum.
21+
* Supports: ANY, REPLICA_PREFERRED
22+
* Invalid values default to ANY with a warning.
23+
*/
24+
internal fun parseReadFrom(readFromStr: String): ReadFrom {
25+
return when (readFromStr.uppercase()) {
26+
"ANY" -> ReadFrom.ANY
27+
"REPLICA_PREFERRED" -> ReadFrom.REPLICA_PREFERRED
28+
else -> {
29+
RedisConnectionsLogger.log.warn(
30+
"Invalid readFrom value: '$readFromStr'. " +
31+
"Supported values: ANY, REPLICA_PREFERRED. Defaulting to ANY.",
32+
)
33+
ReadFrom.ANY
34+
}
35+
}
36+
}
37+
1338
/**
1439
* Creates Redis connections based on configuration.
1540
* Returns null if no Redis is configured (should use in-memory storage).
@@ -19,6 +44,8 @@ internal fun createRedisConnections(redisConfiguration: RedisConfiguration?): Re
1944
redisConfiguration == null -> null
2045

2146
redisConfiguration.useCluster && !redisConfiguration.uri.isNullOrBlank() -> {
47+
val readFromStrategy = parseReadFrom(redisConfiguration.readFrom)
48+
2249
val redis =
2350
RedisClusterConnection(
2451
redisConfiguration.uri,
@@ -32,7 +59,7 @@ internal fun createRedisConnections(redisConfiguration: RedisConfiguration?): Re
3259
redisConfiguration.readOnlyUri ?: redisConfiguration.uri,
3360
redisConfiguration.connectionTimeoutMillis,
3461
redisConfiguration.commandTimeoutMillis,
35-
io.lettuce.core.ReadFrom.REPLICA_PREFERRED,
62+
readFromStrategy,
3663
)
3764
RedisConnections(redis, readOnlyRedis)
3865
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package util.redis
2+
3+
import com.amplitude.util.redis.parseReadFrom
4+
import io.lettuce.core.ReadFrom
5+
import org.junit.Assert.assertEquals
6+
import org.junit.Test
7+
8+
class RedisConnectionsTest {
9+
@Test
10+
fun `parseReadFrom handles ANY`() {
11+
val result = parseReadFrom("ANY")
12+
assertEquals(ReadFrom.ANY, result)
13+
}
14+
15+
@Test
16+
fun `parseReadFrom handles any case insensitive`() {
17+
val result1 = parseReadFrom("any")
18+
val result2 = parseReadFrom("Any")
19+
val result3 = parseReadFrom("aNy")
20+
assertEquals(ReadFrom.ANY, result1)
21+
assertEquals(ReadFrom.ANY, result2)
22+
assertEquals(ReadFrom.ANY, result3)
23+
}
24+
25+
@Test
26+
fun `parseReadFrom handles REPLICA_PREFERRED`() {
27+
val result = parseReadFrom("REPLICA_PREFERRED")
28+
assertEquals(ReadFrom.REPLICA_PREFERRED, result)
29+
}
30+
31+
@Test
32+
fun `parseReadFrom handles replica_preferred case insensitive`() {
33+
val result1 = parseReadFrom("replica_preferred")
34+
val result2 = parseReadFrom("Replica_Preferred")
35+
assertEquals(ReadFrom.REPLICA_PREFERRED, result1)
36+
assertEquals(ReadFrom.REPLICA_PREFERRED, result2)
37+
}
38+
39+
@Test
40+
fun `parseReadFrom defaults to ANY for invalid values`() {
41+
val result1 = parseReadFrom("INVALID")
42+
val result2 = parseReadFrom("MASTER")
43+
val result3 = parseReadFrom("")
44+
val result4 = parseReadFrom("replica")
45+
assertEquals(ReadFrom.ANY, result1)
46+
assertEquals(ReadFrom.ANY, result2)
47+
assertEquals(ReadFrom.ANY, result3)
48+
assertEquals(ReadFrom.ANY, result4)
49+
}
50+
}

0 commit comments

Comments
 (0)