Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -256,16 +256,36 @@ synchronization:
## Agent publishes to channels: "optimizely-sync-{sdk_key}"
## For external Redis clients: Subscribe "optimizely-sync-{sdk_key}" or PSubscribe "optimizely-sync-*"
## Note: Channel configuration parsing is a known bug - planned for future release

## Redis Streams configuration (when using Redis Streams for notifications)
## batch_size: number of messages to batch before sending (default: 10)
# batch_size: 10
## flush_interval: maximum time to wait before sending a partial batch (default: 5s)
# flush_interval: 5s
## max_retries: maximum number of retry attempts for failed operations (default: 3)
# max_retries: 3
## retry_delay: initial delay between retry attempts (default: 100ms)
# retry_delay: 100ms
## max_retry_delay: maximum delay between retry attempts with exponential backoff (default: 5s)
# max_retry_delay: 5s
## connection_timeout: timeout for Redis connections (default: 10s)
# connection_timeout: 10s
## if notification synchronization is enabled, then the active notification event-stream API
## will get the notifications from available replicas
notification:
enable: false
## Use "redis" for fire-and-forget pub/sub (existing behavior)
## Use "redis-streams" for persistent message delivery with retries and acknowledgment
default: "redis"
# default: "redis-streams" # Uncomment to enable Redis Streams
## if datafile synchronization is enabled, then for each webhook API call
## the datafile will be sent to all available replicas to achieve better eventual consistency
datafile:
enable: false
## Use "redis" for fire-and-forget pub/sub (existing behavior)
## Use "redis-streams" for persistent message delivery with retries and acknowledgment
default: "redis"
# default: "redis-streams" # Uncomment to enable Redis Streams

##
## cmab: Contextual Multi-Armed Bandit configuration
Expand Down
92 changes: 91 additions & 1 deletion pkg/syncer/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package syncer
import (
"context"
"errors"
"time"

"github.com/optimizely/agent/config"
"github.com/optimizely/agent/pkg/syncer/pubsub"
Expand All @@ -28,8 +29,10 @@ import (
const (
// PubSubDefaultChan will be used as default pubsub channel name
PubSubDefaultChan = "optimizely-sync"
// PubSubRedis is the name of pubsub type of Redis
// PubSubRedis is the name of pubsub type of Redis (fire-and-forget)
PubSubRedis = "redis"
// PubSubRedisStreams is the name of pubsub type of Redis Streams (persistent)
PubSubRedisStreams = "redis-streams"
)

type SycnFeatureFlag string
Expand All @@ -48,12 +51,16 @@ func newPubSub(conf config.SyncConfig, featureFlag SycnFeatureFlag) (PubSub, err
if featureFlag == SyncFeatureFlagNotificaiton {
if conf.Notification.Default == PubSubRedis {
return getPubSubRedis(conf)
} else if conf.Notification.Default == PubSubRedisStreams {
return getPubSubRedisStreams(conf)
} else {
return nil, errors.New("pubsub type not supported")
}
} else if featureFlag == SycnFeatureFlagDatafile {
if conf.Datafile.Default == PubSubRedis {
return getPubSubRedis(conf)
} else if conf.Datafile.Default == PubSubRedisStreams {
return getPubSubRedisStreams(conf)
} else {
return nil, errors.New("pubsub type not supported")
}
Expand Down Expand Up @@ -99,9 +106,92 @@ func getPubSubRedis(conf config.SyncConfig) (PubSub, error) {
return nil, errors.New("pubsub redis database not valid, database must be int")
}

// Return original Redis pub/sub implementation (fire-and-forget)
return &pubsub.Redis{
Host: host,
Password: password,
Database: database,
}, nil
}

func getPubSubRedisStreams(conf config.SyncConfig) (PubSub, error) {
pubsubConf, found := conf.Pubsub[PubSubRedis]
if !found {
return nil, errors.New("pubsub redis config not found")
}

redisConf, ok := pubsubConf.(map[string]interface{})
if !ok {
return nil, errors.New("pubsub redis config not valid")
}

hostVal, found := redisConf["host"]
if !found {
return nil, errors.New("pubsub redis host not found")
}
host, ok := hostVal.(string)
if !ok {
return nil, errors.New("pubsub redis host not valid, host must be string")
}

passwordVal, found := redisConf["password"]
if !found {
return nil, errors.New("pubsub redis password not found")
}
password, ok := passwordVal.(string)
if !ok {
return nil, errors.New("pubsub redis password not valid, password must be string")
}

databaseVal, found := redisConf["database"]
if !found {
return nil, errors.New("pubsub redis database not found")
}
database, ok := databaseVal.(int)
if !ok {
return nil, errors.New("pubsub redis database not valid, database must be int")
}

// Parse optional Redis Streams configuration parameters
batchSize := getIntFromConfig(redisConf, "batch_size", 10)
flushInterval := getDurationFromConfig(redisConf, "flush_interval", 5*time.Second)
maxRetries := getIntFromConfig(redisConf, "max_retries", 3)
retryDelay := getDurationFromConfig(redisConf, "retry_delay", 100*time.Millisecond)
maxRetryDelay := getDurationFromConfig(redisConf, "max_retry_delay", 5*time.Second)
connTimeout := getDurationFromConfig(redisConf, "connection_timeout", 10*time.Second)

// Return Redis Streams implementation with configuration
return &pubsub.RedisStreams{
Host: host,
Password: password,
Database: database,
BatchSize: batchSize,
FlushInterval: flushInterval,
MaxRetries: maxRetries,
RetryDelay: retryDelay,
MaxRetryDelay: maxRetryDelay,
ConnTimeout: connTimeout,
}, nil
}

// getIntFromConfig safely extracts an integer value from config map with default fallback
func getIntFromConfig(config map[string]interface{}, key string, defaultValue int) int {
if val, found := config[key]; found {
if intVal, ok := val.(int); ok {
return intVal
}
}
return defaultValue
}

// getDurationFromConfig safely extracts a duration value from config map with default fallback
func getDurationFromConfig(config map[string]interface{}, key string, defaultValue time.Duration) time.Duration {
if val, found := config[key]; found {
if strVal, ok := val.(string); ok {
if duration, err := time.ParseDuration(strVal); err == nil {
return duration
}
}
}
return defaultValue
}
Loading
Loading