Skip to content

Commit 397e59b

Browse files
rautelaanirudhAnirudh Rautela
and
Anirudh Rautela
authored
Treatment service Poller (caraml-dev#87)
* Added changes for using queue only if poller is not enabled in config * Added changes for poller config in treatment service * Added changes for poller in treatment service * Added changes for poller in treatment service * Added changes for poller in treatment service * Reverted poller changes for management-service * Fixed treatment service config test case * Refactored poller * Fixed poller condition * Refactored poller * Removed unused struct * Integrated PR review comments * Integrated PR review comments * Integrated PR review comments * Added sample poller config * Integrated PR review comments * Integrated PR review comments * Integrated PR review comments * Integrated PR review comments --------- Co-authored-by: Anirudh Rautela <[email protected]>
1 parent d328d9e commit 397e59b

File tree

9 files changed

+112
-13
lines changed

9 files changed

+112
-13
lines changed

treatment-service/appcontext/appcontext.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type AppContext struct {
2323
SegmenterService services.SegmenterService
2424

2525
AssignedTreatmentLogger *monitoring.AssignedTreatmentLogger
26+
LocalStorage *models.LocalStorage
2627
}
2728

2829
func NewAppContext(cfg *config.Config) (*AppContext, error) {
@@ -129,6 +130,7 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) {
129130
TreatmentService: treatmentSvc,
130131
AssignedTreatmentLogger: logger,
131132
MessageQueueService: messageQueueService,
133+
LocalStorage: localStorage,
132134
}
133135

134136
return appContext, nil

treatment-service/config/config.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ package config
33
import (
44
"fmt"
55
"strconv"
6+
"time"
67

78
"github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic"
89
"github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry"
9-
1010
common_config "github.com/caraml-dev/xp/common/config"
1111
common_mq_config "github.com/caraml-dev/xp/common/messagequeue"
1212
"github.com/caraml-dev/xp/treatment-service/models"
@@ -24,16 +24,17 @@ type Config struct {
2424
Port int `json:"port" default:"8080" validate:"required"`
2525
ProjectIds []string `json:"project_ids" default:""`
2626

27-
AssignedTreatmentLogger AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"`
28-
DebugConfig DebugConfig `json:"debug_config" validate:"required,dive"`
29-
NewRelicConfig newrelic.Config `json:"new_relic_config"`
30-
SentryConfig sentry.Config `json:"sentry_config"`
31-
DeploymentConfig DeploymentConfig `json:"deployment_config" validate:"required,dive"`
32-
MessageQueueConfig common_mq_config.MessageQueueConfig `json:"message_queue_config" validate:"required,dive"`
33-
ManagementService ManagementServiceConfig `json:"management_service" validate:"required,dive"`
34-
MonitoringConfig Monitoring `json:"monitoring_config"`
35-
SwaggerConfig SwaggerConfig `json:"swagger_config" validate:"required,dive"`
36-
SegmenterConfig map[string]interface{} `json:"segmenter_config"`
27+
AssignedTreatmentLogger AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"`
28+
DebugConfig DebugConfig `json:"debug_config" validate:"required,dive"`
29+
NewRelicConfig newrelic.Config `json:"new_relic_config"`
30+
SentryConfig sentry.Config `json:"sentry_config"`
31+
DeploymentConfig DeploymentConfig `json:"deployment_config" validate:"required,dive"`
32+
MessageQueueConfig common_mq_config.MessageQueueConfig `json:"message_queue_config" validate:"required,dive"`
33+
ManagementService ManagementServiceConfig `json:"management_service" validate:"required,dive"`
34+
MonitoringConfig Monitoring `json:"monitoring_config"`
35+
SwaggerConfig SwaggerConfig `json:"swagger_config" validate:"required,dive"`
36+
SegmenterConfig map[string]interface{} `json:"segmenter_config"`
37+
ManagementServicePollerConfig ManagementServicePollerConfig `json:"management_service_poller_config" validate:"required,dive"`
3738
}
3839

3940
type AssignedTreatmentLoggerConfig struct {
@@ -94,6 +95,11 @@ type ManagementServiceConfig struct {
9495
AuthorizationEnabled bool `json:"authorization_enabled"`
9596
}
9697

98+
type ManagementServicePollerConfig struct {
99+
Enabled bool `default:"false"`
100+
PollInterval time.Duration `default:"30s"`
101+
}
102+
97103
func (c *Config) GetProjectIds() []models.ProjectId {
98104
projectIds := make([]models.ProjectId, 0)
99105
for _, projectIdString := range c.ProjectIds {

treatment-service/config/config_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package config
22

33
import (
44
"testing"
5+
"time"
56

67
"github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic"
78
"github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry"
@@ -64,6 +65,10 @@ func TestDefaultConfigs(t *testing.T) {
6465
},
6566
SentryConfig: sentry.Config{Enabled: false, Labels: emptyStringMap},
6667
SegmenterConfig: make(map[string]interface{}),
68+
ManagementServicePollerConfig: ManagementServicePollerConfig{
69+
Enabled: false,
70+
PollInterval: 30 * time.Second,
71+
},
6772
}
6873
cfg, err := Load()
6974
require.NoError(t, err)
@@ -127,6 +132,10 @@ func TestLoadMultipleConfigs(t *testing.T) {
127132
},
128133
SentryConfig: sentry.Config{Enabled: true, DSN: "my.amazing.sentry.dsn", Labels: map[string]string{"app": "xp-treatment-service"}},
129134
SegmenterConfig: map[string]interface{}{"s2_ids": map[string]interface{}{"mins2celllevel": 9, "maxs2celllevel": 15}},
135+
ManagementServicePollerConfig: ManagementServicePollerConfig{
136+
Enabled: false,
137+
PollInterval: 30 * time.Second,
138+
},
130139
}
131140

132141
cfg, err := Load(configFiles...)

treatment-service/config/example.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,7 @@ SegmenterConfig:
3030
S2_IDs:
3131
MinS2CellLevel: 10
3232
MaxS2CellLevel: 14
33+
34+
PollerConfig:
35+
Enabled: true
36+
PollInterval: 10s

treatment-service/go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ require (
1414
github.com/deepmap/oapi-codegen v1.11.0
1515
github.com/getkin/kin-openapi v0.94.0
1616
github.com/go-chi/chi/v5 v5.0.7
17+
github.com/go-playground/validator/v10 v10.11.1
1718
github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
1819
github.com/golang/geo v0.0.0-20210211234256-740aa86cb551
1920
github.com/google/go-cmp v0.6.0
@@ -103,6 +104,8 @@ require (
103104
github.com/go-openapi/jsonpointer v0.19.6 // indirect
104105
github.com/go-openapi/jsonreference v0.20.2 // indirect
105106
github.com/go-openapi/swag v0.22.3 // indirect
107+
github.com/go-playground/locales v0.14.0 // indirect
108+
github.com/go-playground/universal-translator v0.18.0 // indirect
106109
github.com/go-viper/mapstructure/v2 v2.0.0 // indirect
107110
github.com/goccy/go-json v0.9.11 // indirect
108111
github.com/gofrs/flock v0.8.1 // indirect
@@ -136,6 +139,7 @@ require (
136139
github.com/klauspost/asmfmt v1.3.2 // indirect
137140
github.com/klauspost/compress v1.17.4 // indirect
138141
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
142+
github.com/leodido/go-urn v1.2.1 // indirect
139143
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
140144
github.com/magiconair/properties v1.8.7 // indirect
141145
github.com/mailru/easyjson v0.7.7 // indirect

treatment-service/go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,13 +333,18 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh
333333
github.com/go-openapi/swag v0.21.1/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
334334
github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g=
335335
github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
336+
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
336337
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
337338
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
339+
github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU=
338340
github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs=
339341
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
342+
github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho=
340343
github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA=
341344
github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4=
342345
github.com/go-playground/validator/v10 v10.11.0/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU=
346+
github.com/go-playground/validator/v10 v10.11.1 h1:prmOlTVv+YjZjmRmNSF3VmspqJIxJWXmqUsHwfTRRkQ=
347+
github.com/go-playground/validator/v10 v10.11.1/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU=
343348
github.com/go-sql-driver/mysql v1.3.0 h1:pgwjLi/dvffoP9aabwkT3AKpXQM93QARkjFhDDqC1UE=
344349
github.com/go-sql-driver/mysql v1.3.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
345350
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
@@ -599,6 +604,7 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
599604
github.com/labstack/echo/v4 v4.7.2/go.mod h1:xkCDAdFCIf8jsFQ5NnbK7oqaF/yU1A1X20Ltm0OvSks=
600605
github.com/labstack/gommon v0.3.1/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM=
601606
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
607+
github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w=
602608
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
603609
github.com/lestrrat-go/backoff/v2 v2.0.8/go.mod h1:rHP/q/r9aT27n24JQLa7JhSQZCKBBOiM/uP402WwN8Y=
604610
github.com/lestrrat-go/blackmagic v1.0.0/go.mod h1:TNgH//0vYSs8VXDCfkZLgIrVTTXQELZffUV0tz3MtdQ=

treatment-service/models/storage.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ func (s *LocalStorage) DumpExperiments(filepath string) error {
486486
return os.WriteFile(filepath, file, 0644)
487487
}
488488

489-
func (s *LocalStorage) init() error {
489+
func (s *LocalStorage) Init() error {
490490
s.Lock()
491491
defer s.Unlock()
492492

@@ -592,7 +592,7 @@ func NewLocalStorage(
592592
}
593593
segmenterCache := make(map[ProjectId]map[string]schema.SegmenterType)
594594
s := LocalStorage{managementClient: xpClient, subscribedProjectIds: projectIds, ProjectSegmenters: segmenterCache}
595-
err = s.init()
595+
err = s.Init()
596596

597597
return &s, err
598598
}

treatment-service/server/poller.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package server
2+
3+
import (
4+
"log"
5+
"time"
6+
7+
"github.com/caraml-dev/xp/treatment-service/config"
8+
"github.com/caraml-dev/xp/treatment-service/models"
9+
)
10+
11+
type Poller struct {
12+
pollerConfig config.ManagementServicePollerConfig
13+
localStorage *models.LocalStorage
14+
stopChannel chan struct{}
15+
}
16+
17+
// NewPoller creates a new Poller instance with the given configuration and local storage.
18+
// pollerConfig: configuration for the poller
19+
// localStorage: local storage to be used by the poller
20+
func NewPoller(pollerConfig config.ManagementServicePollerConfig, localStorage *models.LocalStorage) *Poller {
21+
return &Poller{
22+
pollerConfig: pollerConfig,
23+
localStorage: localStorage,
24+
stopChannel: make(chan struct{}),
25+
}
26+
}
27+
28+
func (p *Poller) Start() {
29+
ticker := time.NewTicker(p.pollerConfig.PollInterval)
30+
go func() {
31+
for {
32+
select {
33+
case <-ticker.C:
34+
err := p.Refresh()
35+
log.Printf("Polling at %v with interval %v", time.Now(), p.pollerConfig.PollInterval)
36+
if err != nil {
37+
log.Printf("Error updating local storage: %v", err)
38+
continue
39+
}
40+
case <-p.stopChannel:
41+
ticker.Stop()
42+
return
43+
}
44+
}
45+
}()
46+
}
47+
48+
func (p *Poller) Stop() {
49+
close(p.stopChannel)
50+
}
51+
52+
func (p *Poller) Refresh() error {
53+
err := p.localStorage.Init()
54+
return err
55+
}

treatment-service/server/server.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ type Server struct {
3333
subscribe bool
3434
// cleanup captures all the actions to be executed on server shut down
3535
cleanup []func()
36+
// poller captures the poller instance
37+
poller *Poller
3638
}
3739

3840
// NewServer creates and configures an APIServer serving all application routes.
@@ -106,6 +108,11 @@ func NewServer(configFiles []string) (*Server, error) {
106108
subscribe = true
107109
}
108110

111+
var poller *Poller
112+
if cfg.ManagementServicePollerConfig.Enabled {
113+
poller = NewPoller(cfg.ManagementServicePollerConfig, appCtx.LocalStorage)
114+
}
115+
109116
srv := http.Server{
110117
Addr: cfg.ListenAddress(),
111118
Handler: mux,
@@ -116,6 +123,7 @@ func NewServer(configFiles []string) (*Server, error) {
116123
appContext: appCtx,
117124
subscribe: subscribe,
118125
cleanup: cleanup,
126+
poller: poller,
119127
}, nil
120128
}
121129

@@ -133,6 +141,11 @@ func (srv *Server) Start() {
133141
}()
134142
log.Printf("Listening on %s\n", srv.Addr)
135143

144+
if srv.poller != nil {
145+
log.Println("Starting poller...")
146+
srv.poller.Start()
147+
}
148+
136149
stop := make(chan os.Signal, 1)
137150
signal.Notify(stop, os.Interrupt)
138151

0 commit comments

Comments
 (0)