Skip to content

Commit ed271b2

Browse files
committed
[push] migrate to external cache
1 parent dae0798 commit ed271b2

File tree

7 files changed

+158
-97
lines changed

7 files changed

+158
-97
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ require (
1818
github.com/nyaruka/phonenumbers v1.4.0
1919
github.com/prometheus/client_golang v1.19.1
2020
github.com/redis/go-redis/v9 v9.9.0
21+
github.com/samber/lo v1.52.0
2122
github.com/swaggo/swag v1.16.6
2223
go.uber.org/fx v1.24.0
2324
go.uber.org/zap v1.27.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,8 @@ github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
262262
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
263263
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
264264
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
265+
github.com/samber/lo v1.52.0 h1:Rvi+3BFHES3A8meP33VPAxiBZX/Aws5RxrschYGjomw=
266+
github.com/samber/lo v1.52.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0=
265267
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
266268
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
267269
github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec=

internal/sms-gateway/app.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ var Module = fx.Module(
4343
openapi.Module(),
4444
handlers.Module,
4545
auth.Module,
46-
push.Module,
46+
push.Module(),
4747
db.Module,
4848
cache.Module(),
4949
pubsub.Module(),

internal/sms-gateway/modules/push/module.go

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,49 +2,51 @@ package push
22

33
import (
44
"context"
5-
"errors"
5+
"fmt"
66

77
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/fcm"
88
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/upstream"
99
"go.uber.org/fx"
1010
"go.uber.org/zap"
1111
)
1212

13-
var Module = fx.Module(
14-
"push",
15-
fx.Decorate(func(log *zap.Logger) *zap.Logger {
16-
return log.Named("push")
17-
}),
18-
fx.Provide(newMetrics, fx.Private),
19-
fx.Provide(
20-
func(cfg Config, lc fx.Lifecycle) (c client, err error) {
21-
switch cfg.Mode {
22-
case ModeFCM:
23-
c, err = fcm.New(cfg.ClientOptions)
24-
case ModeUpstream:
25-
c, err = upstream.New(cfg.ClientOptions)
26-
default:
27-
return nil, errors.New("invalid push mode")
28-
}
13+
func Module() fx.Option {
14+
return fx.Module(
15+
"push",
16+
fx.Decorate(func(log *zap.Logger) *zap.Logger {
17+
return log.Named("push")
18+
}),
19+
fx.Provide(newMetrics, fx.Private),
20+
fx.Provide(
21+
func(cfg Config, lc fx.Lifecycle) (c client, err error) {
22+
switch cfg.Mode {
23+
case ModeFCM:
24+
c, err = fcm.New(cfg.ClientOptions)
25+
case ModeUpstream:
26+
c, err = upstream.New(cfg.ClientOptions)
27+
default:
28+
return nil, fmt.Errorf("invalid push mode: %q", cfg.Mode)
29+
}
2930

30-
if err != nil {
31-
return nil, err
32-
}
31+
if err != nil {
32+
return nil, err
33+
}
3334

34-
lc.Append(fx.Hook{
35-
OnStart: func(ctx context.Context) error {
36-
return c.Open(ctx)
37-
},
38-
OnStop: func(ctx context.Context) error {
39-
return c.Close(ctx)
40-
},
41-
})
35+
lc.Append(fx.Hook{
36+
OnStart: func(ctx context.Context) error {
37+
return c.Open(ctx)
38+
},
39+
OnStop: func(ctx context.Context) error {
40+
return c.Close(ctx)
41+
},
42+
})
4243

43-
return c, nil
44-
},
45-
fx.Private,
46-
),
47-
fx.Provide(
48-
New,
49-
),
50-
)
44+
return c, nil
45+
},
46+
fx.Private,
47+
),
48+
fx.Provide(
49+
New,
50+
),
51+
)
52+
}

internal/sms-gateway/modules/push/service.go

Lines changed: 97 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,19 @@ import (
55
"fmt"
66
"time"
77

8+
"github.com/android-sms-gateway/server/internal/sms-gateway/cache"
89
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/types"
9-
"github.com/capcom6/go-helpers/cache"
10-
"github.com/capcom6/go-helpers/maps"
10+
cacheImpl "github.com/android-sms-gateway/server/pkg/cache"
11+
"github.com/samber/lo"
1112

12-
"go.uber.org/fx"
1313
"go.uber.org/zap"
1414
)
1515

16+
const (
17+
cachePrefixEvents = "events:"
18+
cachePrefixBlacklist = "blacklist:"
19+
)
20+
1621
type Config struct {
1722
Mode Mode
1823

@@ -22,53 +27,51 @@ type Config struct {
2227
Timeout time.Duration
2328
}
2429

25-
type Params struct {
26-
fx.In
27-
28-
Config Config
29-
30-
Client client
31-
Metrics *metrics
32-
33-
Logger *zap.Logger
34-
}
35-
3630
type Service struct {
3731
config Config
3832

39-
client client
40-
metrics *metrics
41-
42-
cache *cache.Cache[eventWrapper]
43-
blacklist *cache.Cache[struct{}]
33+
client client
34+
events cache.Cache
35+
blacklist cache.Cache
4436

45-
logger *zap.Logger
37+
metrics *metrics
38+
logger *zap.Logger
4639
}
4740

48-
func New(params Params) *Service {
49-
if params.Config.Timeout == 0 {
50-
params.Config.Timeout = time.Second
41+
func New(
42+
config Config,
43+
client client,
44+
cacheFactory cache.Factory,
45+
metrics *metrics,
46+
logger *zap.Logger,
47+
) (*Service, error) {
48+
events, err := cacheFactory.New(cachePrefixEvents)
49+
if err != nil {
50+
return nil, fmt.Errorf("can't create events cache: %w", err)
5151
}
52-
if params.Config.Debounce < 5*time.Second {
53-
params.Config.Debounce = 5 * time.Second
52+
53+
blacklist, err := cacheFactory.New(cachePrefixBlacklist)
54+
if err != nil {
55+
return nil, fmt.Errorf("can't create blacklist cache: %w", err)
5456
}
5557

56-
return &Service{
57-
config: params.Config,
58+
config.Timeout = max(config.Timeout, time.Second)
59+
config.Debounce = max(config.Debounce, 5*time.Second)
5860

59-
client: params.Client,
60-
metrics: params.Metrics,
61+
return &Service{
62+
config: config,
6163

62-
cache: cache.New[eventWrapper](cache.Config{}),
63-
blacklist: cache.New[struct{}](cache.Config{
64-
TTL: blacklistTimeout,
65-
}),
64+
client: client,
65+
events: events,
66+
blacklist: blacklist,
6667

67-
logger: params.Logger,
68-
}
68+
metrics: metrics,
69+
logger: logger,
70+
}, nil
6971
}
7072

71-
// Run runs the service with the provided context if a debounce is set.
73+
// Run starts a ticker that triggers the sendAll function every debounce interval.
74+
// It runs indefinitely until the provided context is canceled.
7275
func (s *Service) Run(ctx context.Context) {
7376
ticker := time.NewTicker(s.config.Debounce)
7477
defer ticker.Stop()
@@ -85,19 +88,28 @@ func (s *Service) Run(ctx context.Context) {
8588

8689
// Enqueue adds the data to the cache and immediately sends all messages if the debounce is 0.
8790
func (s *Service) Enqueue(token string, event types.Event) error {
88-
if _, err := s.blacklist.Get(token); err == nil {
91+
ctx, cancel := context.WithTimeout(context.Background(), s.config.Timeout)
92+
defer cancel()
93+
94+
if _, err := s.blacklist.Get(ctx, token); err == nil {
8995
s.metrics.IncBlacklist(BlacklistOperationSkipped)
9096
s.logger.Debug("Skipping blacklisted token", zap.String("token", token))
9197
return nil
9298
}
9399

94100
wrapper := eventWrapper{
95-
token: token,
96-
event: &event,
97-
retries: 0,
101+
Token: token,
102+
Event: event,
103+
Retries: 0,
104+
}
105+
wrapperData, err := wrapper.serialize()
106+
if err != nil {
107+
s.metrics.IncError(1)
108+
return fmt.Errorf("can't serialize event wrapper: %w", err)
98109
}
99110

100-
if err := s.cache.Set(token, wrapper); err != nil {
111+
if err := s.events.Set(ctx, wrapper.key(), wrapperData); err != nil {
112+
s.metrics.IncError(1)
101113
return fmt.Errorf("can't add message to cache: %w", err)
102114
}
103115

@@ -108,20 +120,43 @@ func (s *Service) Enqueue(token string, event types.Event) error {
108120

109121
// sendAll sends messages to all targets from the cache after initializing the service.
110122
func (s *Service) sendAll(ctx context.Context) {
111-
targets := s.cache.Drain()
112-
if len(targets) == 0 {
123+
rawEvents, err := s.events.Drain(ctx)
124+
if err != nil {
125+
s.logger.Error("Can't drain cache", zap.Error(err))
113126
return
114127
}
115128

116-
messages := maps.MapValues(targets, func(w eventWrapper) types.Event {
117-
return *w.event
118-
})
129+
if len(rawEvents) == 0 {
130+
return
131+
}
132+
133+
wrappers := lo.MapEntries(
134+
rawEvents,
135+
func(key string, value []byte) (string, *eventWrapper) {
136+
wrapper := new(eventWrapper)
137+
if err := wrapper.deserialize(value); err != nil {
138+
s.metrics.IncError(1)
139+
s.logger.Error("Failed to deserialize event wrapper", zap.String("key", key), zap.Binary("value", value), zap.Error(err))
140+
return "", nil
141+
}
142+
143+
return wrapper.Token, wrapper
144+
},
145+
)
146+
delete(wrappers, "")
147+
148+
messages := lo.MapValues(
149+
wrappers,
150+
func(value *eventWrapper, key string) Event {
151+
return value.Event
152+
},
153+
)
119154

120155
s.logger.Info("Sending messages", zap.Int("count", len(messages)))
121-
ctx, cancel := context.WithTimeout(ctx, s.config.Timeout)
156+
sendCtx, cancel := context.WithTimeout(ctx, s.config.Timeout)
122157
defer cancel()
123158

124-
errs, err := s.client.Send(ctx, messages)
159+
errs, err := s.client.Send(sendCtx, messages)
125160
if len(errs) == 0 && err == nil {
126161
s.logger.Info("Messages sent successfully", zap.Int("count", len(messages)))
127162
return
@@ -138,11 +173,11 @@ func (s *Service) sendAll(ctx context.Context) {
138173
for token, sendErr := range errs {
139174
s.logger.Error("Can't send message", zap.Error(sendErr), zap.String("token", token))
140175

141-
wrapper := targets[token]
142-
wrapper.retries++
176+
wrapper := wrappers[token]
177+
wrapper.Retries++
143178

144-
if wrapper.retries >= maxRetries {
145-
if err := s.blacklist.Set(token, struct{}{}); err != nil {
179+
if wrapper.Retries >= maxRetries {
180+
if err := s.blacklist.Set(ctx, token, []byte{}, cacheImpl.WithTTL(blacklistTimeout)); err != nil {
146181
s.logger.Warn("Can't add to blacklist", zap.String("token", token), zap.Error(err))
147182
}
148183

@@ -154,8 +189,16 @@ func (s *Service) sendAll(ctx context.Context) {
154189
continue
155190
}
156191

157-
if setErr := s.cache.SetOrFail(token, wrapper); setErr != nil {
158-
s.logger.Info("Can't set message to cache", zap.Error(setErr))
192+
wrapperData, err := wrapper.serialize()
193+
if err != nil {
194+
s.metrics.IncError(1)
195+
s.logger.Error("Can't serialize event wrapper", zap.Error(err))
196+
continue
197+
}
198+
199+
if setErr := s.events.SetOrFail(ctx, wrapper.key(), wrapperData); setErr != nil {
200+
s.logger.Warn("Can't set message to cache", zap.Error(setErr))
201+
continue
159202
}
160203

161204
s.metrics.IncRetry(RetryOutcomeRetried)

internal/sms-gateway/modules/push/types.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package push
22

33
import (
44
"context"
5+
"encoding/json"
56

67
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/types"
78
)
@@ -17,12 +18,24 @@ type Event = types.Event
1718

1819
type client interface {
1920
Open(ctx context.Context) error
20-
Send(ctx context.Context, messages map[string]types.Event) (map[string]error, error)
21+
Send(ctx context.Context, messages map[string]Event) (map[string]error, error)
2122
Close(ctx context.Context) error
2223
}
2324

2425
type eventWrapper struct {
25-
token string
26-
event *types.Event
27-
retries int
26+
Token string `json:"token"`
27+
Event Event `json:"event"`
28+
Retries int `json:"retries"`
29+
}
30+
31+
func (e *eventWrapper) key() string {
32+
return e.Token + ":" + string(e.Event.Type)
33+
}
34+
35+
func (e *eventWrapper) serialize() ([]byte, error) {
36+
return json.Marshal(e)
37+
}
38+
39+
func (e *eventWrapper) deserialize(data []byte) error {
40+
return json.Unmarshal(data, e)
2841
}

internal/sms-gateway/modules/push/types/types.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@ import (
55
)
66

77
type Event struct {
8-
Type smsgateway.PushEventType
9-
Data map[string]string
8+
Type smsgateway.PushEventType `json:"type"`
9+
Data map[string]string `json:"data"`
1010
}

0 commit comments

Comments
 (0)