diff --git a/core/src/main/kotlin/cohort/CohortStorage.kt b/core/src/main/kotlin/cohort/CohortStorage.kt index 7e9b826..190c026 100644 --- a/core/src/main/kotlin/cohort/CohortStorage.kt +++ b/core/src/main/kotlin/cohort/CohortStorage.kt @@ -307,6 +307,8 @@ internal class RedisCohortStorage( } override suspend fun deleteCohort(description: CohortDescription) { + val blobKey = RedisKey.CohortBlob(prefix, projectId, description.id, description.lastModified) + redis.del(blobKey) cohortBlobCache.remove(description.id) redis.hdel(RedisKey.CohortDescriptions(prefix, projectId), description.id) val cohortMembersKey = @@ -380,6 +382,7 @@ internal class RedisCohortStorage( description.id, "removed_${System.currentTimeMillis()}", ) + val existingBlobKey = RedisKey.CohortBlob(prefix, projectId, description.id, prev.lastModified) try { // Server-side set operations - no memory transfer to client! @@ -406,10 +409,11 @@ internal class RedisCohortStorage( } } } finally { - // Clean up temporary keys + // Clean up temporary and existing keys redis.del(addedKey) redis.del(removedKey) redis.expire(existingCohortKey, ttl) + redis.expire(existingBlobKey, ttl) } } else { // No previous cohort: all members are additions diff --git a/core/src/test/kotlin/cohort/CohortStorageTest.kt b/core/src/test/kotlin/cohort/CohortStorageTest.kt index 3b473fa..a729540 100644 --- a/core/src/test/kotlin/cohort/CohortStorageTest.kt +++ b/core/src/test/kotlin/cohort/CohortStorageTest.kt @@ -20,6 +20,7 @@ import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertNotNull import kotlin.test.assertNull +import kotlin.test.assertTrue import kotlin.time.Duration class CohortStorageTest { @@ -173,6 +174,14 @@ class CohortStorageTest { val b64 = redis.get(blobKey2) assertNotNull(b64) } + // old blob and members keys should be marked for expiration on update + run { + val oldBlobKey = RedisKey.CohortBlob("amplitude ", "12345", cohort.id, 1) + assertTrue(redis.expirations.containsKey(oldBlobKey.value)) + // existing cohort members key (lastModified=1) should also be set to expire + val oldMembersKey = RedisKey.CohortMembers("amplitude ", "12345", cohort.id, "User", 1) + assertTrue(redis.expirations.containsKey(oldMembersKey.value)) + } // check cohort membership exists redis.sscan(RedisKey.UserCohortMemberships("amplitude ", "12345", "User", "1"), 1000)?.let { assertEquals(setOf(cohort.id), it) diff --git a/core/src/test/kotlin/test/InMemoryRedis.kt b/core/src/test/kotlin/test/InMemoryRedis.kt index cbbb713..fba6685 100644 --- a/core/src/test/kotlin/test/InMemoryRedis.kt +++ b/core/src/test/kotlin/test/InMemoryRedis.kt @@ -8,6 +8,7 @@ internal class InMemoryRedis : Redis { private val kv = mutableMapOf() private val sets = mutableMapOf>() private val hashes = mutableMapOf>() + internal val expirations = mutableMapOf() override suspend fun get(key: RedisKey): String? { return kv[key.value] @@ -129,7 +130,7 @@ internal class InMemoryRedis : Redis { key: RedisKey, ttl: Duration, ) { - // Do nothing. + expirations[key.value] = ttl } override suspend fun saddPipeline(