Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ projects:
configuration:
redis:
uri: "YOUR REDIS URI" # e.g. "redis://localhost:6379"
useCluster: false # Set to true for Redis Cluster
readFrom: "ANY" # Read routing: "ANY" (default) or "REPLICA_PREFERRED"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have two options ANY and REPLICA_PREFERRED can we just make this a boolean that defaults false. E.g. readFromReplica where false=ANY and true=REPLICA_PREFERRED?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be extended to support more values if need arises in the future.

```

The developer docs contain [additional information about configuration](https://docs.developers.amplitude.com/experiment/infra/evaluation-proxy#configuration).
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/kotlin/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ data class RedisConfiguration(
val uri: String? = null,
val readOnlyUri: String? = uri,
val useCluster: Boolean = false,
val readFrom: String = Default.REDIS_READ_FROM,
val prefix: String = Default.REDIS_PREFIX,
val scanLimit: Long = Default.REDIS_SCAN_LIMIT,
val connectionTimeoutMillis: Long = Default.REDIS_CONNECTION_TIMEOUT_MILLIS,
Expand All @@ -174,6 +175,7 @@ data class RedisConfiguration(

return if (redisUri != null) {
val redisReadOnlyUri = stringEnv(EnvKey.REDIS_READ_ONLY_URI, Default.REDIS_READ_ONLY_URI) ?: redisUri
val redisReadFrom = stringEnv(EnvKey.REDIS_READ_FROM, Default.REDIS_READ_FROM)!!
val redisPrefix = stringEnv(EnvKey.REDIS_PREFIX, Default.REDIS_PREFIX)!!
val redisScanLimit = longEnv(EnvKey.REDIS_SCAN_LIMIT, Default.REDIS_SCAN_LIMIT)!!
val connectionTimeoutMillis = longEnv(EnvKey.REDIS_CONNECTION_TIMEOUT_MILLIS, Default.REDIS_CONNECTION_TIMEOUT_MILLIS)!!
Expand All @@ -184,6 +186,7 @@ data class RedisConfiguration(
uri = redisUri,
readOnlyUri = redisReadOnlyUri,
useCluster = useCluster,
readFrom = redisReadFrom,
prefix = redisPrefix,
scanLimit = redisScanLimit,
connectionTimeoutMillis = connectionTimeoutMillis,
Expand Down Expand Up @@ -242,6 +245,7 @@ object EnvKey {
const val REDIS_URI = "AMPLITUDE_REDIS_URI"
const val REDIS_READ_ONLY_URI = "AMPLITUDE_REDIS_READ_ONLY_URI"
const val REDIS_USE_CLUSTER = "AMPLITUDE_REDIS_USE_CLUSTER"
const val REDIS_READ_FROM = "AMPLITUDE_REDIS_READ_FROM"
const val REDIS_PREFIX = "AMPLITUDE_REDIS_PREFIX"
const val REDIS_SCAN_LIMIT = "AMPLITUDE_REDIS_SCAN_LIMIT"
const val REDIS_CONNECTION_TIMEOUT_MILLIS = "AMPLITUDE_REDIS_CONNECTION_TIMEOUT_MILLIS"
Expand Down Expand Up @@ -277,6 +281,7 @@ object Default {
val REDIS_URI: String? = null
val REDIS_READ_ONLY_URI: String? = null
const val REDIS_USE_CLUSTER = false
const val REDIS_READ_FROM = "ANY"
const val REDIS_PREFIX = "amplitude"
const val REDIS_SCAN_LIMIT = 10000L
const val REDIS_CONNECTION_TIMEOUT_MILLIS = 10000L
Expand Down
29 changes: 28 additions & 1 deletion core/src/main/kotlin/util/redis/RedisConnections.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.amplitude.util.redis

import com.amplitude.RedisConfiguration
import com.amplitude.util.logger
import io.lettuce.core.ReadFrom

/**
* Container for Redis connections - primary for writes, readOnly for high-volume reads
Expand All @@ -10,6 +12,29 @@ internal data class RedisConnections(
val readOnly: Redis,
)

internal object RedisConnectionsLogger {
val log by logger()
}

/**
* Parse readFrom configuration string to Lettuce ReadFrom enum.
* Supports: ANY, REPLICA_PREFERRED
* Invalid values default to ANY with a warning.
*/
internal fun parseReadFrom(readFromStr: String): ReadFrom {
return when (readFromStr.uppercase()) {
"ANY" -> ReadFrom.ANY
"REPLICA_PREFERRED" -> ReadFrom.REPLICA_PREFERRED
else -> {
RedisConnectionsLogger.log.warn(
"Invalid readFrom value: '$readFromStr'. " +
"Supported values: ANY, REPLICA_PREFERRED. Defaulting to ANY.",
)
ReadFrom.ANY
}
}
}

/**
* Creates Redis connections based on configuration.
* Returns null if no Redis is configured (should use in-memory storage).
Expand All @@ -19,6 +44,8 @@ internal fun createRedisConnections(redisConfiguration: RedisConfiguration?): Re
redisConfiguration == null -> null

redisConfiguration.useCluster && !redisConfiguration.uri.isNullOrBlank() -> {
val readFromStrategy = parseReadFrom(redisConfiguration.readFrom)

val redis =
RedisClusterConnection(
redisConfiguration.uri,
Expand All @@ -32,7 +59,7 @@ internal fun createRedisConnections(redisConfiguration: RedisConfiguration?): Re
redisConfiguration.readOnlyUri ?: redisConfiguration.uri,
redisConfiguration.connectionTimeoutMillis,
redisConfiguration.commandTimeoutMillis,
io.lettuce.core.ReadFrom.REPLICA_PREFERRED,
readFromStrategy,
)
RedisConnections(redis, readOnlyRedis)
}
Expand Down
50 changes: 50 additions & 0 deletions core/src/test/kotlin/util/redis/RedisConnectionsTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package util.redis

import com.amplitude.util.redis.parseReadFrom
import io.lettuce.core.ReadFrom
import org.junit.Assert.assertEquals
import org.junit.Test

class RedisConnectionsTest {
@Test
fun `parseReadFrom handles ANY`() {
val result = parseReadFrom("ANY")
assertEquals(ReadFrom.ANY, result)
}

@Test
fun `parseReadFrom handles any case insensitive`() {
val result1 = parseReadFrom("any")
val result2 = parseReadFrom("Any")
val result3 = parseReadFrom("aNy")
assertEquals(ReadFrom.ANY, result1)
assertEquals(ReadFrom.ANY, result2)
assertEquals(ReadFrom.ANY, result3)
}

@Test
fun `parseReadFrom handles REPLICA_PREFERRED`() {
val result = parseReadFrom("REPLICA_PREFERRED")
assertEquals(ReadFrom.REPLICA_PREFERRED, result)
}

@Test
fun `parseReadFrom handles replica_preferred case insensitive`() {
val result1 = parseReadFrom("replica_preferred")
val result2 = parseReadFrom("Replica_Preferred")
assertEquals(ReadFrom.REPLICA_PREFERRED, result1)
assertEquals(ReadFrom.REPLICA_PREFERRED, result2)
}

@Test
fun `parseReadFrom defaults to ANY for invalid values`() {
val result1 = parseReadFrom("INVALID")
val result2 = parseReadFrom("MASTER")
val result3 = parseReadFrom("")
val result4 = parseReadFrom("replica")
assertEquals(ReadFrom.ANY, result1)
assertEquals(ReadFrom.ANY, result2)
assertEquals(ReadFrom.ANY, result3)
assertEquals(ReadFrom.ANY, result4)
}
}