diff --git a/core/src/main/kotlin/Config.kt b/core/src/main/kotlin/Config.kt index c0cb1b9..478d5fa 100644 --- a/core/src/main/kotlin/Config.kt +++ b/core/src/main/kotlin/Config.kt @@ -3,6 +3,7 @@ package com.amplitude import com.amplitude.util.booleanEnv import com.amplitude.util.intEnv import com.amplitude.util.json +import com.amplitude.util.logger import com.amplitude.util.longEnv import com.amplitude.util.stringEnv import com.amplitude.util.yaml @@ -87,7 +88,12 @@ data class Configuration( val flagSyncIntervalMillis: Long = Default.FLAG_SYNC_INTERVAL_MILLIS, val cohortSyncIntervalMillis: Long = Default.COHORT_SYNC_INTERVAL_MILLIS, val maxCohortSize: Int = Default.MAX_COHORT_SIZE, + @Deprecated( + message = "Assignment service is deprecated. Use exposure with Exposure service instead.", + replaceWith = ReplaceWith("exposure"), + ) val assignment: AssignmentConfiguration = AssignmentConfiguration(), + val exposure: ExposureConfiguration = ExposureConfiguration(), val redis: RedisConfiguration? = null, val metrics: MetricsConfiguration = MetricsConfiguration(), ) { @@ -117,11 +123,16 @@ data class Configuration( )!!, maxCohortSize = intEnv(EnvKey.MAX_COHORT_SIZE, Default.MAX_COHORT_SIZE)!!, assignment = AssignmentConfiguration.fromEnv(), + exposure = ExposureConfiguration.fromEnv(), redis = RedisConfiguration.fromEnv(), ) } } +@Deprecated( + message = "Assignment service is deprecated. Use ExposureConfiguration with Exposure service instead.", + replaceWith = ReplaceWith("ExposureConfiguration"), +) @Serializable data class AssignmentConfiguration( val filterCapacity: Int = Default.ASSIGNMENT_FILTER_CAPACITY, @@ -130,8 +141,21 @@ data class AssignmentConfiguration( val useBatchMode: Boolean = Default.ASSIGNMENT_USE_BATCH_MODE, ) { companion object { - fun fromEnv() = - AssignmentConfiguration( + private val log by logger() + + fun fromEnv(): AssignmentConfiguration { + // Log deprecation warning if any assignment env vars are set + if (System.getenv(EnvKey.ASSIGNMENT_FILTER_CAPACITY) != null || + System.getenv(EnvKey.ASSIGNMENT_EVENT_UPLOAD_THRESHOLD) != null || + System.getenv(EnvKey.ASSIGNMENT_EVENT_UPLOAD_PERIOD_MILLIS) != null || + System.getenv(EnvKey.ASSIGNMENT_USE_BATCH_MODE) != null + ) { + log.warn( + "DEPRECATION WARNING: Assignment configuration (AMPLITUDE_ASSIGNMENT_*) is deprecated. " + + "Use Exposure configuration (AMPLITUDE_EXPOSURE_*) with X-Amp-Exp-Exposure-Track header instead.", + ) + } + return AssignmentConfiguration( filterCapacity = intEnv( EnvKey.ASSIGNMENT_FILTER_CAPACITY, @@ -153,6 +177,41 @@ data class AssignmentConfiguration( Default.ASSIGNMENT_USE_BATCH_MODE, ), ) + } + } +} + +@Serializable +data class ExposureConfiguration( + val filterCapacity: Int = Default.EXPOSURE_FILTER_CAPACITY, + val eventUploadThreshold: Int = Default.EXPOSURE_EVENT_UPLOAD_THRESHOLD, + val eventUploadPeriodMillis: Int = Default.EXPOSURE_EVENT_UPLOAD_PERIOD_MILLIS, + val useBatchMode: Boolean = Default.EXPOSURE_USE_BATCH_MODE, +) { + companion object { + fun fromEnv() = + ExposureConfiguration( + filterCapacity = + intEnv( + EnvKey.EXPOSURE_FILTER_CAPACITY, + Default.EXPOSURE_FILTER_CAPACITY, + )!!, + eventUploadThreshold = + intEnv( + EnvKey.EXPOSURE_EVENT_UPLOAD_THRESHOLD, + Default.EXPOSURE_EVENT_UPLOAD_THRESHOLD, + )!!, + eventUploadPeriodMillis = + intEnv( + EnvKey.EXPOSURE_EVENT_UPLOAD_PERIOD_MILLIS, + Default.EXPOSURE_EVENT_UPLOAD_PERIOD_MILLIS, + )!!, + useBatchMode = + booleanEnv( + EnvKey.EXPOSURE_USE_BATCH_MODE, + Default.EXPOSURE_USE_BATCH_MODE, + ), + ) } } @@ -242,6 +301,11 @@ object EnvKey { const val ASSIGNMENT_EVENT_UPLOAD_PERIOD_MILLIS = "AMPLITUDE_ASSIGNMENT_EVENT_UPLOAD_PERIOD_MILLIS" const val ASSIGNMENT_USE_BATCH_MODE = "AMPLITUDE_ASSIGNMENT_USE_BATCH_MODE" + const val EXPOSURE_FILTER_CAPACITY = "AMPLITUDE_EXPOSURE_FILTER_CAPACITY" + const val EXPOSURE_EVENT_UPLOAD_THRESHOLD = "AMPLITUDE_EXPOSURE_EVENT_UPLOAD_THRESHOLD" + const val EXPOSURE_EVENT_UPLOAD_PERIOD_MILLIS = "AMPLITUDE_EXPOSURE_EVENT_UPLOAD_PERIOD_MILLIS" + const val EXPOSURE_USE_BATCH_MODE = "AMPLITUDE_EXPOSURE_USE_BATCH_MODE" + const val REDIS_URI = "AMPLITUDE_REDIS_URI" const val REDIS_READ_ONLY_URI = "AMPLITUDE_REDIS_READ_ONLY_URI" const val REDIS_USE_CLUSTER = "AMPLITUDE_REDIS_USE_CLUSTER" @@ -278,6 +342,11 @@ object Default { const val ASSIGNMENT_EVENT_UPLOAD_PERIOD_MILLIS = 10000 const val ASSIGNMENT_USE_BATCH_MODE = true + const val EXPOSURE_FILTER_CAPACITY = 1 shl 20 + const val EXPOSURE_EVENT_UPLOAD_THRESHOLD = 100 + const val EXPOSURE_EVENT_UPLOAD_PERIOD_MILLIS = 10000 + const val EXPOSURE_USE_BATCH_MODE = true + val REDIS_URI: String? = null val REDIS_READ_ONLY_URI: String? = null const val REDIS_USE_CLUSTER = false diff --git a/core/src/main/kotlin/EvaluationProxy.kt b/core/src/main/kotlin/EvaluationProxy.kt index cf4de54..9951022 100644 --- a/core/src/main/kotlin/EvaluationProxy.kt +++ b/core/src/main/kotlin/EvaluationProxy.kt @@ -5,6 +5,7 @@ import com.amplitude.cohort.CohortStorage import com.amplitude.cohort.getCohortStorage import com.amplitude.deployment.DeploymentStorage import com.amplitude.deployment.getDeploymentStorage +import com.amplitude.exposure.AmplitudeExposureTracker import com.amplitude.project.Project import com.amplitude.project.ProjectApi import com.amplitude.project.ProjectApiV1 @@ -101,6 +102,13 @@ class EvaluationProxy internal constructor( suspend fun start() { log.info("Starting evaluation proxy.") + // Check for deprecated assignment configuration in YAML + if (configuration.assignment != AssignmentConfiguration()) { + log.warn( + "DEPRECATION WARNING: 'assignment' configuration is deprecated. " + + "Use 'exposure' configuration with X-Amp-Exp-Exposure-Track header instead.", + ) + } /* * Fetch deployments, setup initial mappings for each project * configuration, and create the project proxy. @@ -263,6 +271,7 @@ class EvaluationProxy internal constructor( deploymentKey: String?, user: Map?, flagKeys: Set? = null, + tracksExposure: Boolean = false, ): EvaluationProxyResponse = Metrics.wrapRequestMetric({ EvaluationProxyEvaluationRequest }, { EvaluationProxyEvaluationRequestError(it) }) { val project = @@ -278,7 +287,7 @@ class EvaluationProxy internal constructor( "Project proxy not found for project.", ) return@wrapRequestMetric Metrics.with({ Evaluation }, { e -> EvaluationFailure(e) }) { - projectProxy.evaluate(deploymentKey, user, flagKeys) + projectProxy.evaluate(deploymentKey, user, flagKeys, tracksExposure) } } @@ -286,6 +295,7 @@ class EvaluationProxy internal constructor( deploymentKey: String?, user: Map?, flagKeys: Set? = null, + tracksExposure: Boolean = false, ): EvaluationProxyResponse = Metrics.wrapRequestMetric({ EvaluationProxyEvaluationRequest }, { EvaluationProxyEvaluationRequestError(it) }) { val project = @@ -301,7 +311,7 @@ class EvaluationProxy internal constructor( "Project proxy not found for project.", ) return@wrapRequestMetric Metrics.with({ Evaluation }, { e -> EvaluationFailure(e) }) { - projectProxy.evaluateV1(deploymentKey, user, flagKeys) + projectProxy.evaluateV1(deploymentKey, user, flagKeys, tracksExposure) } } @@ -366,6 +376,12 @@ class EvaluationProxy internal constructor( configuration.analyticsServerUrl, configuration.assignment, ) + val exposureTracker = + AmplitudeExposureTracker( + project.apiKey, + configuration.analyticsServerUrl, + configuration.exposure, + ) val deploymentStorage = getDeploymentStorage(project.id, configuration.redis) val cohortStorage = getCohortStorage( @@ -377,6 +393,7 @@ class EvaluationProxy internal constructor( project, configuration, assignmentTracker, + exposureTracker, deploymentStorage, cohortStorage, ) diff --git a/core/src/main/kotlin/Metrics.kt b/core/src/main/kotlin/Metrics.kt index 5eceb99..f2e4885 100644 --- a/core/src/main/kotlin/Metrics.kt +++ b/core/src/main/kotlin/Metrics.kt @@ -20,6 +20,14 @@ data object AssignmentEventSend : Metric() data class AssignmentEventSendFailure(val exception: Exception) : FailureMetric() +data object ExposureEvent : Metric() + +data object ExposureEventFilter : Metric() + +data object ExposureEventSend : Metric() + +data class ExposureEventSendFailure(val exception: Exception) : FailureMetric() + data object DeploymentsFetch : Metric() data class DeploymentsFetchFailure(val exception: Exception) : FailureMetric() diff --git a/core/src/main/kotlin/assignment/Assignment.kt b/core/src/main/kotlin/assignment/Assignment.kt index 2f8e148..cff15f9 100644 --- a/core/src/main/kotlin/assignment/Assignment.kt +++ b/core/src/main/kotlin/assignment/Assignment.kt @@ -7,6 +7,10 @@ import com.amplitude.util.userId internal const val DAY_MILLIS: Long = 24 * 60 * 60 * 1000 +@Deprecated( + message = "Assignment service is deprecated. Use Exposure with Exposure service instead.", + replaceWith = ReplaceWith("com.amplitude.exposure.Exposure"), +) internal data class Assignment( val context: EvaluationContext, val results: Map, diff --git a/core/src/main/kotlin/assignment/AssignmentFilter.kt b/core/src/main/kotlin/assignment/AssignmentFilter.kt index 1717cac..2559a53 100644 --- a/core/src/main/kotlin/assignment/AssignmentFilter.kt +++ b/core/src/main/kotlin/assignment/AssignmentFilter.kt @@ -2,10 +2,18 @@ package com.amplitude.assignment import com.amplitude.util.Cache +@Deprecated( + message = "Assignment service is deprecated. Use ExposureFilter with Exposure service instead.", + replaceWith = ReplaceWith("com.amplitude.exposure.ExposureFilter"), +) internal interface AssignmentFilter { suspend fun shouldTrack(assignment: Assignment): Boolean } +@Deprecated( + message = "Assignment service is deprecated. Use InMemoryExposureFilter with Exposure service instead.", + replaceWith = ReplaceWith("com.amplitude.exposure.InMemoryExposureFilter"), +) internal class InMemoryAssignmentFilter(size: Int) : AssignmentFilter { // Cache of canonical assignment to the last sent timestamp. private val cache = Cache(size, DAY_MILLIS) diff --git a/core/src/main/kotlin/assignment/AssignmentTracker.kt b/core/src/main/kotlin/assignment/AssignmentTracker.kt index 3b6f816..152dd8e 100644 --- a/core/src/main/kotlin/assignment/AssignmentTracker.kt +++ b/core/src/main/kotlin/assignment/AssignmentTracker.kt @@ -22,10 +22,18 @@ private object FlagType { const val RELEASE_GROUP = "release-group" } +@Deprecated( + message = "Assignment service is deprecated. Use ExposureTracker with Exposure service instead.", + replaceWith = ReplaceWith("com.amplitude.exposure.ExposureTracker"), +) internal interface AssignmentTracker { suspend fun track(assignment: Assignment) } +@Deprecated( + message = "Assignment service is deprecated. Use AmplitudeExposureTracker with Exposure service instead.", + replaceWith = ReplaceWith("com.amplitude.exposure.AmplitudeExposureTracker"), +) internal class AmplitudeAssignmentTracker( private val amplitude: Amplitude, private val assignmentFilter: AssignmentFilter, diff --git a/core/src/main/kotlin/exposure/Exposure.kt b/core/src/main/kotlin/exposure/Exposure.kt new file mode 100644 index 0000000..b5e1965 --- /dev/null +++ b/core/src/main/kotlin/exposure/Exposure.kt @@ -0,0 +1,23 @@ +package com.amplitude.exposure + +import com.amplitude.experiment.evaluation.EvaluationContext +import com.amplitude.experiment.evaluation.EvaluationVariant +import com.amplitude.util.deviceId +import com.amplitude.util.userId + +internal const val DAY_MILLIS: Long = 24 * 60 * 60 * 1000 + +internal data class Exposure( + val context: EvaluationContext, + val results: Map, + val timestamp: Long = System.currentTimeMillis(), +) + +internal fun Exposure.canonicalize(): String { + val sb = StringBuilder().append(this.context.userId()?.trim(), " ", this.context.deviceId()?.trim(), " ") + for (key in this.results.keys.sorted()) { + val variant = this.results[key] + sb.append(key.trim(), " ", variant?.key?.trim(), " ") + } + return sb.toString() +} diff --git a/core/src/main/kotlin/exposure/ExposureFilter.kt b/core/src/main/kotlin/exposure/ExposureFilter.kt new file mode 100644 index 0000000..f322bd5 --- /dev/null +++ b/core/src/main/kotlin/exposure/ExposureFilter.kt @@ -0,0 +1,21 @@ +package com.amplitude.exposure + +import com.amplitude.util.Cache + +internal interface ExposureFilter { + suspend fun shouldTrack(exposure: Exposure): Boolean +} + +internal class InMemoryExposureFilter(size: Int) : ExposureFilter { + // Cache of canonical exposure to the last sent timestamp. + private val cache = Cache(size, DAY_MILLIS) + + override suspend fun shouldTrack(exposure: Exposure): Boolean { + val canonicalExposure = exposure.canonicalize() + val track = cache.get(canonicalExposure) == null + if (track) { + cache.set(canonicalExposure, Unit) + } + return track + } +} diff --git a/core/src/main/kotlin/exposure/ExposureTracker.kt b/core/src/main/kotlin/exposure/ExposureTracker.kt new file mode 100644 index 0000000..a2fdc63 --- /dev/null +++ b/core/src/main/kotlin/exposure/ExposureTracker.kt @@ -0,0 +1,137 @@ +package com.amplitude.exposure + +import com.amplitude.Amplitude +import com.amplitude.Event +import com.amplitude.ExposureConfiguration +import com.amplitude.ExposureEvent +import com.amplitude.ExposureEventFilter +import com.amplitude.ExposureEventSend +import com.amplitude.ExposureEventSendFailure +import com.amplitude.Metrics +import com.amplitude.util.deviceId +import com.amplitude.util.groups +import com.amplitude.util.logger +import com.amplitude.util.userId +import org.json.JSONObject + +private object FlagType { + const val RELEASE = "release" + const val EXPERIMENT = "experiment" + const val MUTUAL_EXCLUSION_GROUP = "mutual-exclusion-group" + const val HOLDOUT_GROUP = "holdout-group" + const val RELEASE_GROUP = "release-group" +} + +internal interface ExposureTracker { + suspend fun track(exposure: Exposure) +} + +internal class AmplitudeExposureTracker( + private val amplitude: Amplitude, + private val exposureFilter: ExposureFilter, +) : ExposureTracker { + companion object { + val log by logger() + } + + constructor( + apiKey: String, + serverUrl: String, + config: ExposureConfiguration, + ) : this( + amplitude = + Amplitude.getInstance("exposure-$apiKey").apply { + setServerUrl(serverUrl) + setEventUploadThreshold(config.eventUploadThreshold) + setEventUploadPeriodMillis(config.eventUploadPeriodMillis) + useBatchMode(config.useBatchMode) + init(apiKey) + }, + exposureFilter = InMemoryExposureFilter(config.filterCapacity), + ) + + override suspend fun track(exposure: Exposure) { + try { + Metrics.track(ExposureEvent) + if (exposureFilter.shouldTrack(exposure)) { + val events = exposure.toAmplitudeEvents() + if (events.isNotEmpty()) { + Metrics.with({ ExposureEventSend }, { e -> ExposureEventSendFailure(e) }) { + events.forEach { event -> + amplitude.logEvent(event) + } + } + } else { + Metrics.track(ExposureEventFilter) + } + } else { + Metrics.track(ExposureEventFilter) + } + } catch (e: Exception) { + log.error("Failed to track exposure event", e) + } + } +} + +internal fun Exposure.toAmplitudeEvents(): List { + val events = mutableListOf() + val canonicalizedExposure = this.canonicalize() + + for ((flagKey, variant) in this.results) { + // Skip if variant is not deployed + val isDeployed = variant.metadata?.get("deployed") as? Boolean ?: false + if (!isDeployed) { + continue + } + + // Skip default variant exposures + val isDefault = variant.metadata?.get("default") as? Boolean ?: false + if (isDefault) { + continue + } + + val event = + Event( + "[Experiment] Exposure", + this.context.userId(), + this.context.deviceId(), + ) + + val groups = this.context.groups() + if (!groups.isNullOrEmpty()) { + event.groups = JSONObject(groups) + } + + event.eventProperties = + JSONObject().apply { + put("[Experiment] Flag Key", flagKey) + if (variant.key != null) { + put("[Experiment] Variant", variant.key) + } + if (variant.metadata != null) { + put("metadata", JSONObject(variant.metadata)) + } + } + + event.userProperties = + JSONObject().apply { + val set = JSONObject() + val flagType = variant.metadata?.get("flagType") as? String + if (flagType != FlagType.MUTUAL_EXCLUSION_GROUP) { + if (variant.key != null) { + set.put("[Experiment] $flagKey", variant.key) + } + } + put("\$set", set) + } + + // Insert ID includes flagKey to make it unique per flag + val hash = ("$flagKey $canonicalizedExposure").hashCode() + val day = this.timestamp / DAY_MILLIS + event.insertId = "${this.context.userId()} ${this.context.deviceId()} $hash $day" + + events.add(event) + } + + return events +} diff --git a/core/src/main/kotlin/project/ProjectProxy.kt b/core/src/main/kotlin/project/ProjectProxy.kt index a4effcc..e02f666 100644 --- a/core/src/main/kotlin/project/ProjectProxy.kt +++ b/core/src/main/kotlin/project/ProjectProxy.kt @@ -14,6 +14,8 @@ import com.amplitude.deployment.DeploymentStorage import com.amplitude.experiment.evaluation.EvaluationEngineImpl import com.amplitude.experiment.evaluation.EvaluationVariant import com.amplitude.experiment.evaluation.topologicalSort +import com.amplitude.exposure.Exposure +import com.amplitude.exposure.ExposureTracker import com.amplitude.util.json import com.amplitude.util.logger import com.amplitude.util.toEvaluationContext @@ -26,6 +28,7 @@ internal class ProjectProxy( private val project: Project, configuration: Configuration, private val assignmentTracker: AssignmentTracker, + private val exposureTracker: ExposureTracker, private val deploymentStorage: DeploymentStorage, private val cohortStorage: CohortStorage, ) { @@ -120,11 +123,12 @@ internal class ProjectProxy( deploymentKey: String?, user: Map?, flagKeys: Set? = null, + tracksExposure: Boolean = false, ): EvaluationProxyResponse { if (deploymentKey.isNullOrEmpty()) { return EvaluationProxyResponse.error(HttpStatusCode.Unauthorized, "Invalid deployment") } - val result = evaluateInternal(deploymentKey, user, flagKeys) + val result = evaluateInternal(deploymentKey, user, flagKeys, tracksExposure) return EvaluationProxyResponse(HttpStatusCode.OK, json.encodeToString(result)) } @@ -132,12 +136,13 @@ internal class ProjectProxy( deploymentKey: String?, user: Map?, flagKeys: Set? = null, + tracksExposure: Boolean = false, ): EvaluationProxyResponse { if (deploymentKey.isNullOrEmpty()) { return EvaluationProxyResponse(HttpStatusCode.Unauthorized, "Invalid deployment") } val result = - evaluateInternal(deploymentKey, user, flagKeys).filter { entry -> + evaluateInternal(deploymentKey, user, flagKeys, tracksExposure).filter { entry -> val default = entry.value.metadata?.get("default") as? Boolean ?: false val deployed = entry.value.metadata?.get("deployed") as? Boolean ?: true (!default && deployed) @@ -149,6 +154,7 @@ internal class ProjectProxy( deploymentKey: String, user: Map?, flagKeys: Set? = null, + tracksExposure: Boolean = false, ): Map { // Get flag configs for the deployment from storage and topo sort. val storageFlags = deploymentStorage.getAllFlags(deploymentKey) @@ -191,6 +197,11 @@ internal class ProjectProxy( launch { assignmentTracker.track(Assignment(evaluationContext, result)) } + if (tracksExposure) { + launch { + exposureTracker.track(Exposure(evaluationContext, result)) + } + } } } return result diff --git a/core/src/test/kotlin/exposure/ExposureFilterTest.kt b/core/src/test/kotlin/exposure/ExposureFilterTest.kt new file mode 100644 index 0000000..95ee172 --- /dev/null +++ b/core/src/test/kotlin/exposure/ExposureFilterTest.kt @@ -0,0 +1,181 @@ +package exposure + +import com.amplitude.experiment.evaluation.EvaluationVariant +import com.amplitude.exposure.Exposure +import com.amplitude.exposure.InMemoryExposureFilter +import com.amplitude.util.toEvaluationContext +import kotlinx.coroutines.runBlocking +import org.junit.Assert +import org.junit.Test +import test.user + +class ExposureFilterTest { + @Test + fun `test single exposure`() = + runBlocking { + val filter = InMemoryExposureFilter(100) + val exposure = + Exposure( + user(userId = "user").toEvaluationContext(), + mapOf( + "flag-key-1" to EvaluationVariant(key = "on"), + "flag-key-2" to EvaluationVariant(key = "control"), + ), + ) + Assert.assertTrue(filter.shouldTrack(exposure)) + } + + @Test + fun `test duplicate exposures`() = + runBlocking { + val filter = InMemoryExposureFilter(100) + val exposure1 = + Exposure( + user(userId = "user").toEvaluationContext(), + mapOf( + "flag-key-1" to EvaluationVariant(key = "on"), + "flag-key-2" to EvaluationVariant(key = "control"), + ), + ) + filter.shouldTrack(exposure1) + val exposure2 = + Exposure( + user(userId = "user").toEvaluationContext(), + mapOf( + "flag-key-1" to EvaluationVariant(key = "on"), + "flag-key-2" to EvaluationVariant(key = "control"), + ), + ) + Assert.assertFalse(filter.shouldTrack(exposure2)) + } + + @Test + fun `test same user different results`() = + runBlocking { + val filter = InMemoryExposureFilter(100) + val exposure1 = + Exposure( + user(userId = "user").toEvaluationContext(), + mapOf( + "flag-key-1" to EvaluationVariant(key = "on"), + "flag-key-2" to EvaluationVariant(key = "control"), + ), + ) + Assert.assertTrue(filter.shouldTrack(exposure1)) + val exposure2 = + Exposure( + user(userId = "user").toEvaluationContext(), + mapOf( + "flag-key-1" to EvaluationVariant(key = "control"), + "flag-key-2" to EvaluationVariant(key = "on"), + ), + ) + Assert.assertTrue(filter.shouldTrack(exposure2)) + } + + @Test + fun `test same results for different users`() = + runBlocking { + val filter = InMemoryExposureFilter(100) + val exposure1 = + Exposure( + user(userId = "user").toEvaluationContext(), + mapOf( + "flag-key-1" to EvaluationVariant(key = "on"), + "flag-key-2" to EvaluationVariant(key = "control"), + ), + ) + Assert.assertTrue(filter.shouldTrack(exposure1)) + val exposure2 = + Exposure( + user(userId = "different user").toEvaluationContext(), + mapOf( + "flag-key-1" to EvaluationVariant(key = "on"), + "flag-key-2" to EvaluationVariant(key = "control"), + ), + ) + Assert.assertTrue(filter.shouldTrack(exposure2)) + } + + @Test + fun `test empty results`() = + runBlocking { + val filter = InMemoryExposureFilter(100) + val exposure1 = + Exposure( + user(userId = "user").toEvaluationContext(), + mapOf(), + ) + Assert.assertTrue(filter.shouldTrack(exposure1)) + val exposure2 = + Exposure( + user(userId = "user").toEvaluationContext(), + mapOf(), + ) + Assert.assertFalse(filter.shouldTrack(exposure2)) + val exposure3 = + Exposure( + user(userId = "different user").toEvaluationContext(), + mapOf(), + ) + Assert.assertTrue(filter.shouldTrack(exposure3)) + } + + @Test + fun `test duplicate exposures with different result ordering`() = + runBlocking { + val filter = InMemoryExposureFilter(100) + val exposure1 = + Exposure( + user(userId = "user").toEvaluationContext(), + linkedMapOf( + "flag-key-1" to EvaluationVariant(key = "on"), + "flag-key-2" to EvaluationVariant(key = "control"), + ), + ) + Assert.assertTrue(filter.shouldTrack(exposure1)) + val exposure2 = + Exposure( + user(userId = "user").toEvaluationContext(), + linkedMapOf( + "flag-key-2" to EvaluationVariant(key = "control"), + "flag-key-1" to EvaluationVariant(key = "on"), + ), + ) + Assert.assertFalse(filter.shouldTrack(exposure2)) + } + + @Test + fun `test lru replacement`() = + runBlocking { + val filter = InMemoryExposureFilter(2) + val exposure1 = + Exposure( + user(userId = "user").toEvaluationContext(), + mapOf( + "flag-key-1" to EvaluationVariant(key = "on"), + "flag-key-2" to EvaluationVariant(key = "control"), + ), + ) + Assert.assertTrue(filter.shouldTrack(exposure1)) + val exposure2 = + Exposure( + user(userId = "user2").toEvaluationContext(), + mapOf( + "flag-key-1" to EvaluationVariant(key = "on"), + "flag-key-2" to EvaluationVariant(key = "control"), + ), + ) + Assert.assertTrue(filter.shouldTrack(exposure2)) + val exposure3 = + Exposure( + user(userId = "user3").toEvaluationContext(), + mapOf( + "flag-key-1" to EvaluationVariant(key = "on"), + "flag-key-2" to EvaluationVariant(key = "control"), + ), + ) + Assert.assertTrue(filter.shouldTrack(exposure3)) + Assert.assertTrue(filter.shouldTrack(exposure1)) + } +} diff --git a/core/src/test/kotlin/exposure/ExposureTrackerTest.kt b/core/src/test/kotlin/exposure/ExposureTrackerTest.kt new file mode 100644 index 0000000..22a234f --- /dev/null +++ b/core/src/test/kotlin/exposure/ExposureTrackerTest.kt @@ -0,0 +1,201 @@ +package exposure + +import com.amplitude.experiment.evaluation.EvaluationVariant +import com.amplitude.exposure.DAY_MILLIS +import com.amplitude.exposure.Exposure +import com.amplitude.exposure.toAmplitudeEvents +import com.amplitude.util.toEvaluationContext +import kotlinx.coroutines.runBlocking +import org.junit.Assert +import org.junit.Test +import test.user + +class ExposureTrackerTest { + @Test + fun `test exposure to amplitude events - creates per-flag events`() = + runBlocking { + val context = user(userId = "user", deviceId = "device").toEvaluationContext() + val results = + mapOf( + "flag-key-1" to + EvaluationVariant( + key = "on", + metadata = + mapOf( + "deployed" to true, + "segmentName" to "Segment", + "flagVersion" to 13, + ), + ), + "flag-key-2" to + EvaluationVariant( + key = "control", + metadata = + mapOf( + "deployed" to true, + "segmentName" to "All Other Users", + "flagVersion" to 12, + ), + ), + ) + val exposure = Exposure(context, results) + val events = exposure.toAmplitudeEvents() + + // Should create one event per flag + Assert.assertEquals(2, events.size) + + for (event in events) { + Assert.assertEquals("[Experiment] Exposure", event.eventType) + Assert.assertEquals("user", event.userId) + Assert.assertEquals("device", event.deviceId) + + val flagKey = event.eventProperties?.getString("[Experiment] Flag Key") + Assert.assertNotNull(flagKey) + Assert.assertEquals(results[flagKey]?.key, event.eventProperties?.getString("[Experiment] Variant")) + } + } + + @Test + fun `test exposure skips default variants`() = + runBlocking { + val context = user(userId = "user", deviceId = "device").toEvaluationContext() + val results = + mapOf( + "flag-key-1" to + EvaluationVariant( + key = "on", + metadata = mapOf("deployed" to true), + ), + "flag-key-2" to + EvaluationVariant( + key = "off", + metadata = mapOf("deployed" to true, "default" to true), + ), + ) + val exposure = Exposure(context, results) + val events = exposure.toAmplitudeEvents() + + // Should only create event for non-default flag + Assert.assertEquals(1, events.size) + Assert.assertEquals("flag-key-1", events[0].eventProperties?.getString("[Experiment] Flag Key")) + } + + @Test + fun `test exposure insert id includes flag key`() = + runBlocking { + val context = user(userId = "user", deviceId = "device").toEvaluationContext() + val results = + mapOf( + "flag-key-1" to + EvaluationVariant( + key = "on", + metadata = mapOf("deployed" to true), + ), + ) + val exposure = Exposure(context, results) + val events = exposure.toAmplitudeEvents() + + // Insert ID should include flag key in the hash + val canonicalization = "user device flag-key-1 on " + val hash = ("flag-key-1 $canonicalization").hashCode() + val day = exposure.timestamp / DAY_MILLIS + val expected = "user device $hash $day" + Assert.assertEquals(expected, events[0].insertId) + } + + @Test + fun `test exposure with no eligible variants returns empty list`() = + runBlocking { + val context = user(userId = "user", deviceId = "device").toEvaluationContext() + val results = + mapOf( + "flag-key-1" to + EvaluationVariant( + key = "off", + metadata = mapOf("deployed" to true, "default" to true), + ), + "flag-key-2" to + EvaluationVariant( + key = "control", + metadata = mapOf("deployed" to true, "default" to true), + ), + ) + val exposure = Exposure(context, results) + val events = exposure.toAmplitudeEvents() + + // Both flags should be skipped (both are default variants) + Assert.assertEquals(0, events.size) + } + + @Test + fun `test exposure skips non-deployed variants`() = + runBlocking { + val context = user(userId = "user", deviceId = "device").toEvaluationContext() + val results = + mapOf( + "flag-key-1" to + EvaluationVariant( + key = "on", + metadata = mapOf("deployed" to true), + ), + "flag-key-2" to + EvaluationVariant( + key = "control", + metadata = mapOf("deployed" to false), + ), + "flag-key-3" to + EvaluationVariant( + key = "treatment", + // No deployed metadata - should be skipped + ), + ) + val exposure = Exposure(context, results) + val events = exposure.toAmplitudeEvents() + + // Should only create event for deployed flag + Assert.assertEquals(1, events.size) + Assert.assertEquals("flag-key-1", events[0].eventProperties?.getString("[Experiment] Flag Key")) + } + + @Test + fun `test exposure skips mutual exclusion group user properties`() = + runBlocking { + val context = user(userId = "user", deviceId = "device").toEvaluationContext() + val results = + mapOf( + "flag-key-1" to + EvaluationVariant( + key = "on", + metadata = mapOf("deployed" to true, "flagType" to "mutual-exclusion-group"), + ), + ) + val exposure = Exposure(context, results) + val events = exposure.toAmplitudeEvents() + + Assert.assertEquals(1, events.size) + // User properties $set should be empty for mutual exclusion groups + val setProps = events[0].userProperties?.getJSONObject("\$set") + Assert.assertEquals(0, setProps?.length()) + } + + @Test + fun `test exposure sets user properties for non-mutual-exclusion-group`() = + runBlocking { + val context = user(userId = "user", deviceId = "device").toEvaluationContext() + val results = + mapOf( + "flag-key-1" to + EvaluationVariant( + key = "on", + metadata = mapOf("deployed" to true, "flagType" to "experiment"), + ), + ) + val exposure = Exposure(context, results) + val events = exposure.toAmplitudeEvents() + + Assert.assertEquals(1, events.size) + // User properties $set should contain the flag assignment + val setProps = events[0].userProperties?.getJSONObject("\$set") + Assert.assertEquals("on", setProps?.getString("[Experiment] flag-key-1")) + } +} diff --git a/core/src/test/kotlin/project/ProjectProxyTest.kt b/core/src/test/kotlin/project/ProjectProxyTest.kt index 90e97d0..ea3d1f1 100644 --- a/core/src/test/kotlin/project/ProjectProxyTest.kt +++ b/core/src/test/kotlin/project/ProjectProxyTest.kt @@ -6,6 +6,7 @@ import com.amplitude.cohort.GetCohortResponse import com.amplitude.cohort.InMemoryCohortStorage import com.amplitude.cohort.toCohortDescription import com.amplitude.deployment.InMemoryDeploymentStorage +import com.amplitude.exposure.ExposureTracker import com.amplitude.project.ProjectProxy import com.amplitude.util.json import io.ktor.http.HttpStatusCode @@ -31,6 +32,7 @@ class ProjectProxyTest { fun `test get flag configs, null deployment, unauthorized`(): Unit = runBlocking { val assignmentTracker = mockk() + val exposureTracker = mockk() val deploymentStorage = InMemoryDeploymentStorage() val cohortStorage = InMemoryCohortStorage() val projectProxy = @@ -38,6 +40,7 @@ class ProjectProxyTest { project, configuration, assignmentTracker, + exposureTracker, deploymentStorage, cohortStorage, ) @@ -51,6 +54,7 @@ class ProjectProxyTest { val deployment = deployment("deployment") val flag = flag("flag") val assignmentTracker = mockk() + val exposureTracker = mockk() val deploymentStorage = InMemoryDeploymentStorage().apply { putFlag(deployment.key, flag) @@ -61,6 +65,7 @@ class ProjectProxyTest { project, configuration, assignmentTracker, + exposureTracker, deploymentStorage, cohortStorage, ) @@ -73,6 +78,7 @@ class ProjectProxyTest { fun `test get cohort, null cohort id, not found`(): Unit = runBlocking { val assignmentTracker = mockk() + val exposureTracker = mockk() val deploymentStorage = InMemoryDeploymentStorage() val cohortStorage = InMemoryCohortStorage() val projectProxy = @@ -80,6 +86,7 @@ class ProjectProxyTest { project, configuration, assignmentTracker, + exposureTracker, deploymentStorage, cohortStorage, ) @@ -92,6 +99,7 @@ class ProjectProxyTest { runBlocking { val cohort = cohort("a") val assignmentTracker = mockk() + val exposureTracker = mockk() val deploymentStorage = InMemoryDeploymentStorage() val cohortStorage = InMemoryCohortStorage().apply { @@ -104,6 +112,7 @@ class ProjectProxyTest { project, configuration, assignmentTracker, + exposureTracker, deploymentStorage, cohortStorage, ) @@ -121,6 +130,7 @@ class ProjectProxyTest { runBlocking { val cohort = cohort("a", 100) val assignmentTracker = mockk() + val exposureTracker = mockk() val deploymentStorage = InMemoryDeploymentStorage() val cohortStorage = InMemoryCohortStorage().apply { @@ -133,6 +143,7 @@ class ProjectProxyTest { project, configuration, assignmentTracker, + exposureTracker, deploymentStorage, cohortStorage, ) @@ -150,6 +161,7 @@ class ProjectProxyTest { runBlocking { val cohort = cohort("a", 100) val assignmentTracker = mockk() + val exposureTracker = mockk() val deploymentStorage = InMemoryDeploymentStorage() val cohortStorage = InMemoryCohortStorage().apply { @@ -162,6 +174,7 @@ class ProjectProxyTest { project, configuration, assignmentTracker, + exposureTracker, deploymentStorage, cohortStorage, ) @@ -176,6 +189,7 @@ class ProjectProxyTest { val flag = flag("flag", setOf("a")) val cohort = cohort("a", 100, size = 100) val assignmentTracker = mockk() + val exposureTracker = mockk() val deploymentStorage = InMemoryDeploymentStorage().apply { putFlag(deployment.key, flag) @@ -191,6 +205,7 @@ class ProjectProxyTest { project, configuration, assignmentTracker, + exposureTracker, deploymentStorage, cohortStorage, ) @@ -208,6 +223,7 @@ class ProjectProxyTest { runBlocking { val cohort = cohort("a", 100, size = 100) val assignmentTracker = mockk() + val exposureTracker = mockk() val deploymentStorage = InMemoryDeploymentStorage() val cohortStorage = InMemoryCohortStorage().apply { @@ -220,6 +236,7 @@ class ProjectProxyTest { project, configuration, assignmentTracker, + exposureTracker, deploymentStorage, cohortStorage, ) @@ -231,6 +248,7 @@ class ProjectProxyTest { fun `test get cohort memberships for group, null deployment, unauthorized`(): Unit = runBlocking { val assignmentTracker = mockk() + val exposureTracker = mockk() val deploymentStorage = InMemoryDeploymentStorage() val cohortStorage = InMemoryCohortStorage() val projectProxy = @@ -238,6 +256,7 @@ class ProjectProxyTest { project, configuration, assignmentTracker, + exposureTracker, deploymentStorage, cohortStorage, ) @@ -250,6 +269,7 @@ class ProjectProxyTest { runBlocking { val deployment = deployment("deployment") val assignmentTracker = mockk() + val exposureTracker = mockk() val deploymentStorage = InMemoryDeploymentStorage() val cohortStorage = InMemoryCohortStorage() val projectProxy = @@ -257,6 +277,7 @@ class ProjectProxyTest { project, configuration, assignmentTracker, + exposureTracker, deploymentStorage, cohortStorage, ) @@ -269,6 +290,7 @@ class ProjectProxyTest { runBlocking { val deployment = deployment("deployment") val assignmentTracker = mockk() + val exposureTracker = mockk() val deploymentStorage = InMemoryDeploymentStorage() val cohortStorage = InMemoryCohortStorage() val projectProxy = @@ -276,6 +298,7 @@ class ProjectProxyTest { project, configuration, assignmentTracker, + exposureTracker, deploymentStorage, cohortStorage, ) @@ -290,6 +313,7 @@ class ProjectProxyTest { val cohort = cohort("a") val flag = flag("flag", setOf("a")) val assignmentTracker = mockk() + val exposureTracker = mockk() val deploymentStorage = InMemoryDeploymentStorage().apply { putFlag(deployment.key, flag) @@ -305,6 +329,7 @@ class ProjectProxyTest { project, configuration, assignmentTracker, + exposureTracker, deploymentStorage, cohortStorage, ) @@ -317,6 +342,7 @@ class ProjectProxyTest { fun `test evaluate, null deployment, unauthorized`(): Unit = runBlocking { val assignmentTracker = mockk() + val exposureTracker = mockk() val deploymentStorage = InMemoryDeploymentStorage() val cohortStorage = InMemoryCohortStorage() val projectProxy = @@ -324,6 +350,7 @@ class ProjectProxyTest { project, configuration, assignmentTracker, + exposureTracker, deploymentStorage, cohortStorage, ) @@ -337,6 +364,7 @@ class ProjectProxyTest { runBlocking { val deployment = deployment("deployment") val assignmentTracker = mockk() + val exposureTracker = mockk() val deploymentStorage = InMemoryDeploymentStorage() val cohortStorage = InMemoryCohortStorage() val projectProxy = @@ -344,6 +372,7 @@ class ProjectProxyTest { project, configuration, assignmentTracker, + exposureTracker, deploymentStorage, cohortStorage, ) @@ -360,6 +389,7 @@ class ProjectProxyTest { val flag = flag("flag", setOf("a")) val deployment = deployment("deployment") val assignmentTracker = mockk() + val exposureTracker = mockk() val deploymentStorage = InMemoryDeploymentStorage().apply { putFlag(deployment.key, flag) @@ -375,6 +405,7 @@ class ProjectProxyTest { project, configuration, assignmentTracker, + exposureTracker, deploymentStorage, cohortStorage, ) diff --git a/service/src/main/kotlin/Server.kt b/service/src/main/kotlin/Server.kt index 7fb0772..200a9c0 100644 --- a/service/src/main/kotlin/Server.kt +++ b/service/src/main/kotlin/Server.kt @@ -213,7 +213,8 @@ suspend fun ApplicationCall.evaluate( val deploymentKey = request.headers.getDeploymentKey() val user = request.userProvider() val flagKeys = request.getFlagKeys() - val result = evaluationProxy.evaluate(deploymentKey, user, flagKeys) + val tracksExposure = request.getTracksExposure() + val result = evaluationProxy.evaluate(deploymentKey, user, flagKeys, tracksExposure) respond(result.status, result.body) } @@ -225,7 +226,8 @@ suspend fun ApplicationCall.evaluateV1( val deploymentKey = request.headers.getDeploymentKey() val user = request.userProvider() val flagKeys = request.getFlagKeys() - val result = evaluationProxy.evaluateV1(deploymentKey, user, flagKeys) + val tracksExposure = request.getTracksExposure() + val result = evaluationProxy.evaluateV1(deploymentKey, user, flagKeys, tracksExposure) respond(result.status, result.body) } @@ -267,6 +269,22 @@ internal fun Headers.getApiAndSecretKey(): Pair { } } +/** + * Get the tracksExposure option from the request header. + * When true, exposure events will be sent to Amplitude. + * + * Header: X-Amp-Exp-Exposure-Track with values "track" or "no-track" + * Defaults to false (no-track) if header is not set or has an unknown value. + */ +private fun ApplicationRequest.getTracksExposure(): Boolean { + val headerValue = this.headers["X-Amp-Exp-Exposure-Track"] + return when (headerValue) { + "track" -> true + "no-track" -> false + else -> false // Default to no-track if header not set or unknown value + } +} + /** * Get the flag keys from the request. Either contained in header or query params. * Flag keys are used to filter the results to only required flags. diff --git a/service/src/main/kotlin/plugins/Monitoring.kt b/service/src/main/kotlin/plugins/Monitoring.kt index d0eb8dc..c94e43b 100644 --- a/service/src/main/kotlin/plugins/Monitoring.kt +++ b/service/src/main/kotlin/plugins/Monitoring.kt @@ -20,6 +20,10 @@ import com.amplitude.EvaluationProxyGetMembershipsRequest import com.amplitude.EvaluationProxyGetMembershipsRequestError import com.amplitude.EvaluationProxyRequest import com.amplitude.EvaluationProxyRequestError +import com.amplitude.ExposureEvent +import com.amplitude.ExposureEventFilter +import com.amplitude.ExposureEventSend +import com.amplitude.ExposureEventSendFailure import com.amplitude.FailureMetric import com.amplitude.FlagsFetch import com.amplitude.FlagsFetchFailure @@ -86,6 +90,18 @@ class PrometheusMetrics( is AssignmentEventSendFailure -> { prometheus.counter("amplitude_proxy_assignment_send_failure_total").increment() } + is ExposureEvent -> { + prometheus.counter("amplitude_proxy_exposure_total").increment() + } + is ExposureEventFilter -> { + prometheus.counter("amplitude_proxy_exposure_filter_total").increment() + } + is ExposureEventSend -> { + prometheus.counter("amplitude_proxy_exposure_send_total").increment() + } + is ExposureEventSendFailure -> { + prometheus.counter("amplitude_proxy_exposure_send_failure_total").increment() + } is DeploymentsFetch -> { prometheus.counter("amplitude_proxy_deployments_fetch_total").increment() }