Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/src/main/kotlin/cohort/CohortStorage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ internal class RedisCohortStorage(
}

override suspend fun deleteCohort(description: CohortDescription) {
val blobKey = RedisKey.CohortBlob(prefix, projectId, description.id, description.lastModified)
redis.del(blobKey)
cohortBlobCache.remove(description.id)
redis.hdel(RedisKey.CohortDescriptions(prefix, projectId), description.id)
val cohortMembersKey =
Expand Down Expand Up @@ -380,6 +382,7 @@ internal class RedisCohortStorage(
description.id,
"removed_${System.currentTimeMillis()}",
)
val existingBlobKey = RedisKey.CohortBlob(prefix, projectId, description.id, prev.lastModified)

try {
// Server-side set operations - no memory transfer to client!
Expand All @@ -406,10 +409,11 @@ internal class RedisCohortStorage(
}
}
} finally {
// Clean up temporary keys
// Clean up temporary and existing keys
redis.del(addedKey)
redis.del(removedKey)
redis.expire(existingCohortKey, ttl)
redis.expire(existingBlobKey, ttl)
}
} else {
// No previous cohort: all members are additions
Expand Down
9 changes: 9 additions & 0 deletions core/src/test/kotlin/cohort/CohortStorageTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
import kotlin.time.Duration

class CohortStorageTest {
Expand Down Expand Up @@ -173,6 +174,14 @@ class CohortStorageTest {
val b64 = redis.get(blobKey2)
assertNotNull(b64)
}
// old blob and members keys should be marked for expiration on update
run {
val oldBlobKey = RedisKey.CohortBlob("amplitude ", "12345", cohort.id, 1)
assertTrue(redis.expirations.containsKey(oldBlobKey.value))
// existing cohort members key (lastModified=1) should also be set to expire
val oldMembersKey = RedisKey.CohortMembers("amplitude ", "12345", cohort.id, "User", 1)
assertTrue(redis.expirations.containsKey(oldMembersKey.value))
}
// check cohort membership exists
redis.sscan(RedisKey.UserCohortMemberships("amplitude ", "12345", "User", "1"), 1000)?.let {
assertEquals(setOf(cohort.id), it)
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/kotlin/test/InMemoryRedis.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ internal class InMemoryRedis : Redis {
private val kv = mutableMapOf<String, String>()
private val sets = mutableMapOf<String, MutableSet<String>>()
private val hashes = mutableMapOf<String, MutableMap<String, String>>()
internal val expirations = mutableMapOf<String, Duration>()

override suspend fun get(key: RedisKey): String? {
return kv[key.value]
Expand Down Expand Up @@ -129,7 +130,7 @@ internal class InMemoryRedis : Redis {
key: RedisKey,
ttl: Duration,
) {
// Do nothing.
expirations[key.value] = ttl
}

override suspend fun saddPipeline(
Expand Down