Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 71 additions & 2 deletions core/src/main/kotlin/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.amplitude
import com.amplitude.util.booleanEnv
import com.amplitude.util.intEnv
import com.amplitude.util.json
import com.amplitude.util.logger
import com.amplitude.util.longEnv
import com.amplitude.util.stringEnv
import com.amplitude.util.yaml
Expand Down Expand Up @@ -87,7 +88,12 @@ data class Configuration(
val flagSyncIntervalMillis: Long = Default.FLAG_SYNC_INTERVAL_MILLIS,
val cohortSyncIntervalMillis: Long = Default.COHORT_SYNC_INTERVAL_MILLIS,
val maxCohortSize: Int = Default.MAX_COHORT_SIZE,
@Deprecated(
message = "Assignment service is deprecated. Use exposure with Exposure service instead.",
replaceWith = ReplaceWith("exposure"),
)
val assignment: AssignmentConfiguration = AssignmentConfiguration(),
val exposure: ExposureConfiguration = ExposureConfiguration(),
val redis: RedisConfiguration? = null,
val metrics: MetricsConfiguration = MetricsConfiguration(),
) {
Expand Down Expand Up @@ -117,11 +123,16 @@ data class Configuration(
)!!,
maxCohortSize = intEnv(EnvKey.MAX_COHORT_SIZE, Default.MAX_COHORT_SIZE)!!,
assignment = AssignmentConfiguration.fromEnv(),
exposure = ExposureConfiguration.fromEnv(),
redis = RedisConfiguration.fromEnv(),
)
}
}

@Deprecated(
message = "Assignment service is deprecated. Use ExposureConfiguration with Exposure service instead.",
replaceWith = ReplaceWith("ExposureConfiguration"),
)
@Serializable
data class AssignmentConfiguration(
val filterCapacity: Int = Default.ASSIGNMENT_FILTER_CAPACITY,
Expand All @@ -130,8 +141,21 @@ data class AssignmentConfiguration(
val useBatchMode: Boolean = Default.ASSIGNMENT_USE_BATCH_MODE,
) {
companion object {
fun fromEnv() =
AssignmentConfiguration(
private val log by logger()

fun fromEnv(): AssignmentConfiguration {
// Log deprecation warning if any assignment env vars are set
if (System.getenv(EnvKey.ASSIGNMENT_FILTER_CAPACITY) != null ||
System.getenv(EnvKey.ASSIGNMENT_EVENT_UPLOAD_THRESHOLD) != null ||
System.getenv(EnvKey.ASSIGNMENT_EVENT_UPLOAD_PERIOD_MILLIS) != null ||
System.getenv(EnvKey.ASSIGNMENT_USE_BATCH_MODE) != null
) {
log.warn(
"DEPRECATION WARNING: Assignment configuration (AMPLITUDE_ASSIGNMENT_*) is deprecated. " +
"Use Exposure configuration (AMPLITUDE_EXPOSURE_*) with X-Amp-Exp-Exposure-Track header instead.",
)
}
return AssignmentConfiguration(
filterCapacity =
intEnv(
EnvKey.ASSIGNMENT_FILTER_CAPACITY,
Expand All @@ -153,6 +177,41 @@ data class AssignmentConfiguration(
Default.ASSIGNMENT_USE_BATCH_MODE,
),
)
}
}
}

@Serializable
data class ExposureConfiguration(
val filterCapacity: Int = Default.EXPOSURE_FILTER_CAPACITY,
val eventUploadThreshold: Int = Default.EXPOSURE_EVENT_UPLOAD_THRESHOLD,
val eventUploadPeriodMillis: Int = Default.EXPOSURE_EVENT_UPLOAD_PERIOD_MILLIS,
val useBatchMode: Boolean = Default.EXPOSURE_USE_BATCH_MODE,
) {
companion object {
fun fromEnv() =
ExposureConfiguration(
filterCapacity =
intEnv(
EnvKey.EXPOSURE_FILTER_CAPACITY,
Default.EXPOSURE_FILTER_CAPACITY,
)!!,
eventUploadThreshold =
intEnv(
EnvKey.EXPOSURE_EVENT_UPLOAD_THRESHOLD,
Default.EXPOSURE_EVENT_UPLOAD_THRESHOLD,
)!!,
eventUploadPeriodMillis =
intEnv(
EnvKey.EXPOSURE_EVENT_UPLOAD_PERIOD_MILLIS,
Default.EXPOSURE_EVENT_UPLOAD_PERIOD_MILLIS,
)!!,
useBatchMode =
booleanEnv(
EnvKey.EXPOSURE_USE_BATCH_MODE,
Default.EXPOSURE_USE_BATCH_MODE,
),
)
}
}

Expand Down Expand Up @@ -242,6 +301,11 @@ object EnvKey {
const val ASSIGNMENT_EVENT_UPLOAD_PERIOD_MILLIS = "AMPLITUDE_ASSIGNMENT_EVENT_UPLOAD_PERIOD_MILLIS"
const val ASSIGNMENT_USE_BATCH_MODE = "AMPLITUDE_ASSIGNMENT_USE_BATCH_MODE"

const val EXPOSURE_FILTER_CAPACITY = "AMPLITUDE_EXPOSURE_FILTER_CAPACITY"
const val EXPOSURE_EVENT_UPLOAD_THRESHOLD = "AMPLITUDE_EXPOSURE_EVENT_UPLOAD_THRESHOLD"
const val EXPOSURE_EVENT_UPLOAD_PERIOD_MILLIS = "AMPLITUDE_EXPOSURE_EVENT_UPLOAD_PERIOD_MILLIS"
const val EXPOSURE_USE_BATCH_MODE = "AMPLITUDE_EXPOSURE_USE_BATCH_MODE"

const val REDIS_URI = "AMPLITUDE_REDIS_URI"
const val REDIS_READ_ONLY_URI = "AMPLITUDE_REDIS_READ_ONLY_URI"
const val REDIS_USE_CLUSTER = "AMPLITUDE_REDIS_USE_CLUSTER"
Expand Down Expand Up @@ -278,6 +342,11 @@ object Default {
const val ASSIGNMENT_EVENT_UPLOAD_PERIOD_MILLIS = 10000
const val ASSIGNMENT_USE_BATCH_MODE = true

const val EXPOSURE_FILTER_CAPACITY = 1 shl 20
const val EXPOSURE_EVENT_UPLOAD_THRESHOLD = 100
const val EXPOSURE_EVENT_UPLOAD_PERIOD_MILLIS = 10000
const val EXPOSURE_USE_BATCH_MODE = true

val REDIS_URI: String? = null
val REDIS_READ_ONLY_URI: String? = null
const val REDIS_USE_CLUSTER = false
Expand Down
21 changes: 19 additions & 2 deletions core/src/main/kotlin/EvaluationProxy.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.amplitude.cohort.CohortStorage
import com.amplitude.cohort.getCohortStorage
import com.amplitude.deployment.DeploymentStorage
import com.amplitude.deployment.getDeploymentStorage
import com.amplitude.exposure.AmplitudeExposureTracker
import com.amplitude.project.Project
import com.amplitude.project.ProjectApi
import com.amplitude.project.ProjectApiV1
Expand Down Expand Up @@ -101,6 +102,13 @@ class EvaluationProxy internal constructor(

suspend fun start() {
log.info("Starting evaluation proxy.")
// Check for deprecated assignment configuration in YAML
if (configuration.assignment != AssignmentConfiguration()) {
log.warn(
"DEPRECATION WARNING: 'assignment' configuration is deprecated. " +
"Use 'exposure' configuration with X-Amp-Exp-Exposure-Track header instead.",
)
}
/*
* Fetch deployments, setup initial mappings for each project
* configuration, and create the project proxy.
Expand Down Expand Up @@ -263,6 +271,7 @@ class EvaluationProxy internal constructor(
deploymentKey: String?,
user: Map<String, Any?>?,
flagKeys: Set<String>? = null,
tracksExposure: Boolean = false,
): EvaluationProxyResponse =
Metrics.wrapRequestMetric({ EvaluationProxyEvaluationRequest }, { EvaluationProxyEvaluationRequestError(it) }) {
val project =
Expand All @@ -278,14 +287,15 @@ class EvaluationProxy internal constructor(
"Project proxy not found for project.",
)
return@wrapRequestMetric Metrics.with({ Evaluation }, { e -> EvaluationFailure(e) }) {
projectProxy.evaluate(deploymentKey, user, flagKeys)
projectProxy.evaluate(deploymentKey, user, flagKeys, tracksExposure)
}
}

suspend fun evaluateV1(
deploymentKey: String?,
user: Map<String, Any?>?,
flagKeys: Set<String>? = null,
tracksExposure: Boolean = false,
): EvaluationProxyResponse =
Metrics.wrapRequestMetric({ EvaluationProxyEvaluationRequest }, { EvaluationProxyEvaluationRequestError(it) }) {
val project =
Expand All @@ -301,7 +311,7 @@ class EvaluationProxy internal constructor(
"Project proxy not found for project.",
)
return@wrapRequestMetric Metrics.with({ Evaluation }, { e -> EvaluationFailure(e) }) {
projectProxy.evaluateV1(deploymentKey, user, flagKeys)
projectProxy.evaluateV1(deploymentKey, user, flagKeys, tracksExposure)
}
}

Expand Down Expand Up @@ -366,6 +376,12 @@ class EvaluationProxy internal constructor(
configuration.analyticsServerUrl,
configuration.assignment,
)
val exposureTracker =
AmplitudeExposureTracker(
project.apiKey,
configuration.analyticsServerUrl,
configuration.exposure,
)
val deploymentStorage = getDeploymentStorage(project.id, configuration.redis)
val cohortStorage =
getCohortStorage(
Expand All @@ -377,6 +393,7 @@ class EvaluationProxy internal constructor(
project,
configuration,
assignmentTracker,
exposureTracker,
deploymentStorage,
cohortStorage,
)
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/kotlin/Metrics.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ data object AssignmentEventSend : Metric()

data class AssignmentEventSendFailure(val exception: Exception) : FailureMetric()

data object ExposureEvent : Metric()

data object ExposureEventFilter : Metric()

data object ExposureEventSend : Metric()

data class ExposureEventSendFailure(val exception: Exception) : FailureMetric()

data object DeploymentsFetch : Metric()

data class DeploymentsFetchFailure(val exception: Exception) : FailureMetric()
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/kotlin/assignment/Assignment.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import com.amplitude.util.userId

internal const val DAY_MILLIS: Long = 24 * 60 * 60 * 1000

@Deprecated(
message = "Assignment service is deprecated. Use Exposure with Exposure service instead.",
replaceWith = ReplaceWith("com.amplitude.exposure.Exposure"),
)
internal data class Assignment(
val context: EvaluationContext,
val results: Map<String, EvaluationVariant>,
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/kotlin/assignment/AssignmentFilter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@ package com.amplitude.assignment

import com.amplitude.util.Cache

@Deprecated(
message = "Assignment service is deprecated. Use ExposureFilter with Exposure service instead.",
replaceWith = ReplaceWith("com.amplitude.exposure.ExposureFilter"),
)
internal interface AssignmentFilter {
suspend fun shouldTrack(assignment: Assignment): Boolean
}

@Deprecated(
message = "Assignment service is deprecated. Use InMemoryExposureFilter with Exposure service instead.",
replaceWith = ReplaceWith("com.amplitude.exposure.InMemoryExposureFilter"),
)
internal class InMemoryAssignmentFilter(size: Int) : AssignmentFilter {
// Cache of canonical assignment to the last sent timestamp.
private val cache = Cache<String, Unit>(size, DAY_MILLIS)
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/kotlin/assignment/AssignmentTracker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,18 @@ private object FlagType {
const val RELEASE_GROUP = "release-group"
}

@Deprecated(
message = "Assignment service is deprecated. Use ExposureTracker with Exposure service instead.",
replaceWith = ReplaceWith("com.amplitude.exposure.ExposureTracker"),
)
internal interface AssignmentTracker {
suspend fun track(assignment: Assignment)
}

@Deprecated(
message = "Assignment service is deprecated. Use AmplitudeExposureTracker with Exposure service instead.",
replaceWith = ReplaceWith("com.amplitude.exposure.AmplitudeExposureTracker"),
)
internal class AmplitudeAssignmentTracker(
private val amplitude: Amplitude,
private val assignmentFilter: AssignmentFilter,
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/kotlin/exposure/Exposure.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.amplitude.exposure

import com.amplitude.experiment.evaluation.EvaluationContext
import com.amplitude.experiment.evaluation.EvaluationVariant
import com.amplitude.util.deviceId
import com.amplitude.util.userId

internal const val DAY_MILLIS: Long = 24 * 60 * 60 * 1000

internal data class Exposure(
val context: EvaluationContext,
val results: Map<String, EvaluationVariant>,
val timestamp: Long = System.currentTimeMillis(),
)

internal fun Exposure.canonicalize(): String {
val sb = StringBuilder().append(this.context.userId()?.trim(), " ", this.context.deviceId()?.trim(), " ")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we should just canonicalize userId and deviceId while having full context being part of Exposure object.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate?

for (key in this.results.keys.sorted()) {
val variant = this.results[key]
sb.append(key.trim(), " ", variant?.key?.trim(), " ")
}
return sb.toString()
}
21 changes: 21 additions & 0 deletions core/src/main/kotlin/exposure/ExposureFilter.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.amplitude.exposure

import com.amplitude.util.Cache

internal interface ExposureFilter {
suspend fun shouldTrack(exposure: Exposure): Boolean
}

internal class InMemoryExposureFilter(size: Int) : ExposureFilter {
// Cache of canonical exposure to the last sent timestamp.
private val cache = Cache<String, Unit>(size, DAY_MILLIS)

override suspend fun shouldTrack(exposure: Exposure): Boolean {
val canonicalExposure = exposure.canonicalize()
val track = cache.get(canonicalExposure) == null
if (track) {
cache.set(canonicalExposure, Unit)
}
return track
}
}
Loading