From 619c9475dec6a448eccb628cfd3dd067f005ed11 Mon Sep 17 00:00:00 2001 From: Aleksandr Soloshenko Date: Thu, 9 Oct 2025 09:01:54 +0700 Subject: [PATCH 1/7] [pubsub] introduce memory and redis implementations --- pkg/pubsub/memory.go | 151 ++++++++++++++++++++++++++++++++++++++++++ pkg/pubsub/options.go | 21 ++++++ pkg/pubsub/pubsub.go | 50 ++++++++++++++ pkg/pubsub/redis.go | 132 ++++++++++++++++++++++++++++++++++++ 4 files changed, 354 insertions(+) create mode 100644 pkg/pubsub/memory.go create mode 100644 pkg/pubsub/options.go create mode 100644 pkg/pubsub/pubsub.go create mode 100644 pkg/pubsub/redis.go diff --git a/pkg/pubsub/memory.go b/pkg/pubsub/memory.go new file mode 100644 index 0000000..404f11c --- /dev/null +++ b/pkg/pubsub/memory.go @@ -0,0 +1,151 @@ +package pubsub + +import ( + "context" + "sync" + + "github.com/google/uuid" +) + +type memoryPubSub struct { + bufferSize uint + + wg sync.WaitGroup + mu sync.RWMutex + topics map[string]map[string]chan Message + closeCh chan struct{} +} + +func NewMemory(opts ...Option) *memoryPubSub { + o := options{ + bufferSize: 0, + } + o.apply(opts...) + + return &memoryPubSub{ + bufferSize: o.bufferSize, + + topics: make(map[string]map[string]chan Message), + closeCh: make(chan struct{}), + } +} + +// Publish sends a message to all subscribers of the given topic. +// This method blocks until all subscribers have received the message +// or until ctx is cancelled or the pubsub instance is closed. +func (m *memoryPubSub) Publish(ctx context.Context, topic string, data []byte) error { + select { + case <-m.closeCh: + return ErrPubSubClosed + default: + } + + if topic == "" { + return ErrInvalidTopic + } + + m.mu.RLock() + defer m.mu.RUnlock() + + subscribers, exists := m.topics[topic] + if !exists { + return nil + } + + wg := &sync.WaitGroup{} + msg := Message{Topic: topic, Data: data} + + for _, ch := range subscribers { + wg.Add(1) + go func(ch chan Message) { + defer wg.Done() + + select { + case ch <- msg: + case <-ctx.Done(): + return + case <-m.closeCh: + return + } + }(ch) + } + + wg.Wait() + + return nil +} + +func (m *memoryPubSub) Subscribe(ctx context.Context, topic string) (*Subscription, error) { + select { + case <-m.closeCh: + return nil, ErrPubSubClosed + default: + } + + if topic == "" { + return nil, ErrInvalidTopic + } + + id := uuid.NewString() + subCtx, cancel := context.WithCancel(ctx) + ch := make(chan Message, m.bufferSize) + + m.subscribe(id, topic, ch) + + m.wg.Add(1) + go func() { + select { + case <-subCtx.Done(): + case <-m.closeCh: + } + + cancel() + m.unsubscribe(id, topic) + close(ch) + + m.wg.Done() + }() + + return &Subscription{id: id, ctx: subCtx, cancel: cancel, ch: ch}, nil +} + +func (m *memoryPubSub) subscribe(id, topic string, ch chan Message) { + m.mu.Lock() + defer m.mu.Unlock() + + subscriptions, ok := m.topics[topic] + if !ok { + subscriptions = make(map[string]chan Message) + m.topics[topic] = subscriptions + } + subscriptions[id] = ch +} + +func (m *memoryPubSub) unsubscribe(id, topic string) { + m.mu.Lock() + defer m.mu.Unlock() + + subscriptions, ok := m.topics[topic] + if !ok { + return + } + delete(subscriptions, id) + if len(subscriptions) == 0 { + delete(m.topics, topic) + } +} + +func (m *memoryPubSub) Close() error { + select { + case <-m.closeCh: + return nil + default: + } + close(m.closeCh) + + m.wg.Wait() + + return nil +} + +var _ PubSub = (*memoryPubSub)(nil) diff --git a/pkg/pubsub/options.go b/pkg/pubsub/options.go new file mode 100644 index 0000000..e62d1d7 --- /dev/null +++ b/pkg/pubsub/options.go @@ -0,0 +1,21 @@ +package pubsub + +type Option func(*options) + +type options struct { + bufferSize uint +} + +func (o *options) apply(opts ...Option) *options { + for _, opt := range opts { + opt(o) + } + + return o +} + +func WithBufferSize(bufferSize uint) Option { + return func(o *options) { + o.bufferSize = bufferSize + } +} diff --git a/pkg/pubsub/pubsub.go b/pkg/pubsub/pubsub.go new file mode 100644 index 0000000..c895451 --- /dev/null +++ b/pkg/pubsub/pubsub.go @@ -0,0 +1,50 @@ +package pubsub + +import ( + "context" + "errors" +) + +var ( + ErrPubSubClosed = errors.New("pubsub is closed") + ErrInvalidTopic = errors.New("invalid topic name") +) + +type Message struct { + Topic string + Data []byte +} + +type Subscription struct { + id string + ch <-chan Message + ctx context.Context + cancel context.CancelFunc +} + +func (s *Subscription) Receive() <-chan Message { + return s.ch +} + +func (s *Subscription) Close() { + s.cancel() +} + +type Subscriber interface { + // Subscribe subscribes to a topic and returns a channel for receiving messages. + // The channel will be closed when the context is cancelled. + Subscribe(ctx context.Context, topic string) (*Subscription, error) +} + +type Publisher interface { + // Publish publishes a message to a topic. + // All subscribers to the topic will receive the message (fan-out). + Publish(ctx context.Context, topic string, data []byte) error +} + +type PubSub interface { + Publisher + Subscriber + // Close closes the pubsub instance and releases all resources. + Close() error +} diff --git a/pkg/pubsub/redis.go b/pkg/pubsub/redis.go new file mode 100644 index 0000000..2c6a432 --- /dev/null +++ b/pkg/pubsub/redis.go @@ -0,0 +1,132 @@ +package pubsub + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/google/uuid" + "github.com/redis/go-redis/v9" +) + +type redisPubSub struct { + prefix string + bufferSize uint + + client *redis.Client + + wg sync.WaitGroup + mu sync.Mutex + subscribers map[string]context.CancelFunc + closeCh chan struct{} +} + +func NewRedis(client *redis.Client, prefix string, opts ...Option) *redisPubSub { + o := options{ + bufferSize: 0, + } + o.apply(opts...) + + return &redisPubSub{ + prefix: prefix, + bufferSize: o.bufferSize, + + client: client, + + subscribers: make(map[string]context.CancelFunc), + closeCh: make(chan struct{}), + } +} + +func (r *redisPubSub) Publish(ctx context.Context, topic string, data []byte) error { + select { + case <-r.closeCh: + return ErrPubSubClosed + default: + } + + if topic == "" { + return ErrInvalidTopic + } + + return r.client.Publish(ctx, r.prefix+topic, data).Err() +} + +func (r *redisPubSub) Subscribe(ctx context.Context, topic string) (*Subscription, error) { + select { + case <-r.closeCh: + return nil, ErrPubSubClosed + default: + } + + if topic == "" { + return nil, ErrInvalidTopic + } + + ps := r.client.Subscribe(ctx, r.prefix+topic) + _, err := ps.Receive(ctx) + if err != nil { + closeErr := ps.Close() + return nil, errors.Join(fmt.Errorf("can't subscribe: %w", err), closeErr) + } + + id := uuid.NewString() + subCtx, cancel := context.WithCancel(ctx) + ch := make(chan Message, r.bufferSize) + + // Track this subscriber + r.mu.Lock() + r.subscribers[id] = cancel + r.mu.Unlock() + + r.wg.Add(1) + go func() { + defer func() { + _ = ps.Close() + close(ch) + + r.mu.Lock() + delete(r.subscribers, id) + r.mu.Unlock() + + r.wg.Done() + }() + + for { + select { + case <-r.closeCh: + return + case <-subCtx.Done(): + return + case msg, ok := <-ps.Channel(): + if !ok { + return + } + if msg != nil { + ch <- Message{ + Topic: topic, + Data: []byte(msg.Payload), + } + } + } + } + }() + + return &Subscription{id: id, ctx: subCtx, cancel: cancel, ch: ch}, nil +} + +func (r *redisPubSub) Close() error { + select { + case <-r.closeCh: + return nil + default: + close(r.closeCh) + } + + r.wg.Wait() + + return nil +} + +var _ PubSub = (*redisPubSub)(nil) From 35eba599f0bab7f42712cf3016534da6ac8f1564 Mon Sep 17 00:00:00 2001 From: Aleksandr Soloshenko Date: Sun, 12 Oct 2025 14:30:53 +0700 Subject: [PATCH 2/7] [pubsub] add module --- configs/config.example.yml | 6 ++- internal/config/config.go | 8 ++++ internal/config/module.go | 7 +++ internal/sms-gateway/app.go | 2 + internal/sms-gateway/pubsub/config.go | 7 +++ internal/sms-gateway/pubsub/module.go | 29 ++++++++++++ internal/sms-gateway/pubsub/pubsub.go | 39 +++++++++++++++ pkg/pubsub/memory.go | 27 +++++++---- pkg/pubsub/redis.go | 68 +++++++++++++++++++++++---- 9 files changed, 171 insertions(+), 22 deletions(-) create mode 100644 internal/sms-gateway/pubsub/config.go create mode 100644 internal/sms-gateway/pubsub/module.go create mode 100644 internal/sms-gateway/pubsub/pubsub.go diff --git a/configs/config.example.yml b/configs/config.example.yml index f7ae528..bef7de7 100644 --- a/configs/config.example.yml +++ b/configs/config.example.yml @@ -24,8 +24,10 @@ fcm: # firebase cloud messaging config credentials_json: "{}" # firebase credentials json (for public mode only) [FCM__CREDENTIALS_JSON] timeout_seconds: 1 # push notification send timeout [FCM__TIMEOUT_SECONDS] debounce_seconds: 5 # push notification debounce (>= 5s) [FCM__DEBOUNCE_SECONDS] -cache: # cache config - url: memory:// # cache url (memory:// or redis://) [CACHE__URL] tasks: # tasks config hashing: # hashing task (hashes processed messages for privacy purposes) interval_seconds: 15 # hashing interval in seconds [TASKS__HASHING__INTERVAL_SECONDS] +cache: # cache config + url: memory:// # cache url (memory:// or redis://) [CACHE__URL] +pubsub: # pubsub config + url: memory:// # pubsub url (memory:// or redis://) [PUBSUB__URL] diff --git a/internal/config/config.go b/internal/config/config.go index 49b5df5..f470f85 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -16,6 +16,7 @@ type Config struct { SSE SSE `yaml:"sse"` // server-sent events config Messages Messages `yaml:"messages"` // messages config Cache Cache `yaml:"cache"` // cache (memory or redis) config + PubSub PubSub `yaml:"pubsub"` // pubsub (memory or redis) config } type Gateway struct { @@ -81,6 +82,10 @@ type Cache struct { URL string `yaml:"url" envconfig:"CACHE__URL"` } +type PubSub struct { + URL string `yaml:"url" envconfig:"PUBSUB__URL"` +} + var defaultConfig = Config{ Gateway: Gateway{Mode: GatewayModePublic}, HTTP: HTTP{ @@ -113,4 +118,7 @@ var defaultConfig = Config{ Cache: Cache{ URL: "memory://", }, + PubSub: PubSub{ + URL: "memory://", + }, } diff --git a/internal/config/module.go b/internal/config/module.go index e4ca213..5b5f7ad 100644 --- a/internal/config/module.go +++ b/internal/config/module.go @@ -11,6 +11,7 @@ import ( "github.com/android-sms-gateway/server/internal/sms-gateway/modules/messages" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/sse" + "github.com/android-sms-gateway/server/internal/sms-gateway/pubsub" "github.com/capcom6/go-infra-fx/config" "github.com/capcom6/go-infra-fx/db" "github.com/capcom6/go-infra-fx/http" @@ -122,4 +123,10 @@ var Module = fx.Module( URL: cfg.Cache.URL, } }), + fx.Provide(func(cfg Config) pubsub.Config { + return pubsub.Config{ + URL: cfg.PubSub.URL, + BufferSize: 128, + } + }), ) diff --git a/internal/sms-gateway/app.go b/internal/sms-gateway/app.go index 37287cb..cf38fbf 100644 --- a/internal/sms-gateway/app.go +++ b/internal/sms-gateway/app.go @@ -21,6 +21,7 @@ import ( "github.com/android-sms-gateway/server/internal/sms-gateway/modules/webhooks" "github.com/android-sms-gateway/server/internal/sms-gateway/online" "github.com/android-sms-gateway/server/internal/sms-gateway/openapi" + "github.com/android-sms-gateway/server/internal/sms-gateway/pubsub" "github.com/capcom6/go-infra-fx/cli" "github.com/capcom6/go-infra-fx/db" "github.com/capcom6/go-infra-fx/http" @@ -45,6 +46,7 @@ var Module = fx.Module( push.Module, db.Module, cache.Module(), + pubsub.Module(), events.Module, messages.Module(), health.Module, diff --git a/internal/sms-gateway/pubsub/config.go b/internal/sms-gateway/pubsub/config.go new file mode 100644 index 0000000..932ad78 --- /dev/null +++ b/internal/sms-gateway/pubsub/config.go @@ -0,0 +1,7 @@ +package pubsub + +// Config controls the PubSub backend via a URL (e.g., "memory://", "redis://..."). +type Config struct { + URL string + BufferSize uint +} diff --git a/internal/sms-gateway/pubsub/module.go b/internal/sms-gateway/pubsub/module.go new file mode 100644 index 0000000..4f3d0bc --- /dev/null +++ b/internal/sms-gateway/pubsub/module.go @@ -0,0 +1,29 @@ +package pubsub + +import ( + "context" + + "go.uber.org/fx" + "go.uber.org/zap" +) + +func Module() fx.Option { + return fx.Module( + "pubsub", + fx.Decorate(func(log *zap.Logger) *zap.Logger { + return log.Named("pubsub") + }), + fx.Provide(New), + fx.Invoke(func(ps PubSub, logger *zap.Logger, lc fx.Lifecycle) { + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + if err := ps.Close(); err != nil { + logger.Error("pubsub close failed", zap.Error(err)) + return err + } + return nil + }, + }) + }), + ) +} diff --git a/internal/sms-gateway/pubsub/pubsub.go b/internal/sms-gateway/pubsub/pubsub.go new file mode 100644 index 0000000..f585669 --- /dev/null +++ b/internal/sms-gateway/pubsub/pubsub.go @@ -0,0 +1,39 @@ +package pubsub + +import ( + "fmt" + "net/url" + + "github.com/android-sms-gateway/server/pkg/pubsub" +) + +const ( + topicPrefix = "sms-gateway:" +) + +func New(config Config) (pubsub.PubSub, error) { + if config.URL == "" { + config.URL = "memory://" + } + + u, err := url.Parse(config.URL) + if err != nil { + return nil, fmt.Errorf("can't parse url: %w", err) + } + + opts := []pubsub.Option{} + opts = append(opts, pubsub.WithBufferSize(config.BufferSize)) + + switch u.Scheme { + case "memory": + return pubsub.NewMemory(opts...), nil + case "redis": + return pubsub.NewRedis(pubsub.RedisConfig{ + Client: nil, + URL: config.URL, + Prefix: topicPrefix, + }, opts...) + default: + return nil, fmt.Errorf("invalid scheme: %s", u.Scheme) + } +} diff --git a/pkg/pubsub/memory.go b/pkg/pubsub/memory.go index 404f11c..a2254a6 100644 --- a/pkg/pubsub/memory.go +++ b/pkg/pubsub/memory.go @@ -12,10 +12,15 @@ type memoryPubSub struct { wg sync.WaitGroup mu sync.RWMutex - topics map[string]map[string]chan Message + topics map[string]map[string]subscriber closeCh chan struct{} } +type subscriber struct { + ch chan Message + ctx context.Context +} + func NewMemory(opts ...Option) *memoryPubSub { o := options{ bufferSize: 0, @@ -25,7 +30,7 @@ func NewMemory(opts ...Option) *memoryPubSub { return &memoryPubSub{ bufferSize: o.bufferSize, - topics: make(map[string]map[string]chan Message), + topics: make(map[string]map[string]subscriber), closeCh: make(chan struct{}), } } @@ -55,19 +60,21 @@ func (m *memoryPubSub) Publish(ctx context.Context, topic string, data []byte) e wg := &sync.WaitGroup{} msg := Message{Topic: topic, Data: data} - for _, ch := range subscribers { + for _, sub := range subscribers { wg.Add(1) - go func(ch chan Message) { + go func(sub subscriber) { defer wg.Done() select { - case ch <- msg: + case sub.ch <- msg: case <-ctx.Done(): return case <-m.closeCh: return + case <-sub.ctx.Done(): + return } - }(ch) + }(sub) } wg.Wait() @@ -90,7 +97,7 @@ func (m *memoryPubSub) Subscribe(ctx context.Context, topic string) (*Subscripti subCtx, cancel := context.WithCancel(ctx) ch := make(chan Message, m.bufferSize) - m.subscribe(id, topic, ch) + m.subscribe(id, topic, subscriber{ch: ch, ctx: subCtx}) m.wg.Add(1) go func() { @@ -109,16 +116,16 @@ func (m *memoryPubSub) Subscribe(ctx context.Context, topic string) (*Subscripti return &Subscription{id: id, ctx: subCtx, cancel: cancel, ch: ch}, nil } -func (m *memoryPubSub) subscribe(id, topic string, ch chan Message) { +func (m *memoryPubSub) subscribe(id, topic string, sub subscriber) { m.mu.Lock() defer m.mu.Unlock() subscriptions, ok := m.topics[topic] if !ok { - subscriptions = make(map[string]chan Message) + subscriptions = make(map[string]subscriber) m.topics[topic] = subscriptions } - subscriptions[id] = ch + subscriptions[id] = sub } func (m *memoryPubSub) unsubscribe(id, topic string) { diff --git a/pkg/pubsub/redis.go b/pkg/pubsub/redis.go index 2c6a432..4cd99d0 100644 --- a/pkg/pubsub/redis.go +++ b/pkg/pubsub/redis.go @@ -4,17 +4,34 @@ import ( "context" "errors" "fmt" + "strings" "sync" "github.com/google/uuid" "github.com/redis/go-redis/v9" ) +// RedisConfig configures the Redis pubsub backend. +type RedisConfig struct { + // Client is the Redis client to use. + // If nil, a client is created from the URL. + // If both Client and URL are provided, Client takes precedence. + Client *redis.Client + + // URL is the Redis URL to use. + // If empty, the Redis client is not created. + URL string + + // Prefix is the prefix to use for all topics. + Prefix string +} + type redisPubSub struct { prefix string bufferSize uint - client *redis.Client + client *redis.Client + ownedClient bool wg sync.WaitGroup mu sync.Mutex @@ -22,21 +39,40 @@ type redisPubSub struct { closeCh chan struct{} } -func NewRedis(client *redis.Client, prefix string, opts ...Option) *redisPubSub { +func NewRedis(config RedisConfig, opts ...Option) (*redisPubSub, error) { + if config.Prefix != "" && !strings.HasSuffix(config.Prefix, ":") { + config.Prefix += ":" + } + + if config.Client == nil && config.URL == "" { + return nil, fmt.Errorf("no redis client or url provided") + } + + client := config.Client + if client == nil { + opt, err := redis.ParseURL(config.URL) + if err != nil { + return nil, fmt.Errorf("failed to parse redis url: %w", err) + } + + client = redis.NewClient(opt) + } + o := options{ bufferSize: 0, } o.apply(opts...) return &redisPubSub{ - prefix: prefix, + prefix: config.Prefix, bufferSize: o.bufferSize, - client: client, + client: client, + ownedClient: config.Client == nil, subscribers: make(map[string]context.CancelFunc), closeCh: make(chan struct{}), - } + }, nil } func (r *redisPubSub) Publish(ctx context.Context, topic string, data []byte) error { @@ -103,11 +139,19 @@ func (r *redisPubSub) Subscribe(ctx context.Context, topic string) (*Subscriptio if !ok { return } - if msg != nil { - ch <- Message{ - Topic: topic, - Data: []byte(msg.Payload), - } + if msg == nil { + continue + } + + select { + case ch <- Message{ + Topic: topic, + Data: []byte(msg.Payload), + }: + case <-r.closeCh: + return + case <-subCtx.Done(): + return } } } @@ -126,6 +170,10 @@ func (r *redisPubSub) Close() error { r.wg.Wait() + if r.ownedClient { + return r.client.Close() + } + return nil } From 4af39430176da5f49b87cf415bdde91773cdc2e4 Mon Sep 17 00:00:00 2001 From: Aleksandr Soloshenko Date: Wed, 15 Oct 2025 20:12:29 +0700 Subject: [PATCH 3/7] [events] use of pubsub instead of internal queue --- go.mod | 1 - go.sum | 2 - internal/sms-gateway/modules/events/events.go | 8 +- .../sms-gateway/modules/events/metrics.go | 7 +- internal/sms-gateway/modules/events/module.go | 11 ++- .../sms-gateway/modules/events/service.go | 82 +++++++++++++------ internal/sms-gateway/modules/events/types.go | 28 +++++-- internal/sms-gateway/pubsub/pubsub.go | 4 +- 8 files changed, 98 insertions(+), 45 deletions(-) diff --git a/go.mod b/go.mod index 59112e7..1a8be52 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.24.1 require ( firebase.google.com/go/v4 v4.12.1 github.com/android-sms-gateway/client-go v1.9.5 - github.com/android-sms-gateway/core v1.0.1 github.com/ansrivas/fiberprometheus/v2 v2.6.1 github.com/capcom6/go-helpers v0.3.0 github.com/capcom6/go-infra-fx v0.4.0 diff --git a/go.sum b/go.sum index 2738cf6..9c4626b 100644 --- a/go.sum +++ b/go.sum @@ -34,8 +34,6 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/android-sms-gateway/client-go v1.9.5 h1:fHrE1Pi3rKUdPVMmI9evKW0iyjB5bMIhFRxyq1wVQ+o= github.com/android-sms-gateway/client-go v1.9.5/go.mod h1:DQsReciU1xcaVW3T5Z2bqslNdsAwCFCtghawmA6g6L4= -github.com/android-sms-gateway/core v1.0.1 h1:7QyqyW3UQSQmEXQuUgXjZwHSnOd65DTxHUyhXQi6gpc= -github.com/android-sms-gateway/core v1.0.1/go.mod h1:HXczGDCKxTeuiwadPElczCx/y3Y6Wamc5kl5nFp5rVM= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM= diff --git a/internal/sms-gateway/modules/events/events.go b/internal/sms-gateway/modules/events/events.go index 89c9094..e3d2b04 100644 --- a/internal/sms-gateway/modules/events/events.go +++ b/internal/sms-gateway/modules/events/events.go @@ -6,15 +6,15 @@ import ( "github.com/android-sms-gateway/client-go/smsgateway" ) -func NewMessageEnqueuedEvent() *Event { +func NewMessageEnqueuedEvent() Event { return NewEvent(smsgateway.PushMessageEnqueued, nil) } -func NewWebhooksUpdatedEvent() *Event { +func NewWebhooksUpdatedEvent() Event { return NewEvent(smsgateway.PushWebhooksUpdated, nil) } -func NewMessagesExportRequestedEvent(since, until time.Time) *Event { +func NewMessagesExportRequestedEvent(since, until time.Time) Event { return NewEvent( smsgateway.PushMessagesExportRequested, map[string]string{ @@ -24,6 +24,6 @@ func NewMessagesExportRequestedEvent(since, until time.Time) *Event { ) } -func NewSettingsUpdatedEvent() *Event { +func NewSettingsUpdatedEvent() Event { return NewEvent(smsgateway.PushSettingsUpdated, nil) } diff --git a/internal/sms-gateway/modules/events/metrics.go b/internal/sms-gateway/modules/events/metrics.go index 13afdc8..e273467 100644 --- a/internal/sms-gateway/modules/events/metrics.go +++ b/internal/sms-gateway/modules/events/metrics.go @@ -19,8 +19,11 @@ const ( DeliveryTypeSSE = "sse" DeliveryTypeUnknown = "unknown" - FailureReasonQueueFull = "queue_full" - FailureReasonProviderFailed = "provider_failed" + FailureReasonSerializationError = "serialization_error" + FailureReasonPublishError = "publish_error" + FailureReasonProviderFailed = "provider_failed" + + EventTypeUnknown = "unknown" ) // metrics contains all Prometheus metrics for the events module diff --git a/internal/sms-gateway/modules/events/module.go b/internal/sms-gateway/modules/events/module.go index 3b6ba7e..8f6cf71 100644 --- a/internal/sms-gateway/modules/events/module.go +++ b/internal/sms-gateway/modules/events/module.go @@ -14,11 +14,18 @@ var Module = fx.Module( }), fx.Provide(newMetrics, fx.Private), fx.Provide(NewService), - fx.Invoke(func(lc fx.Lifecycle, svc *Service) { + fx.Invoke(func(lc fx.Lifecycle, svc *Service, logger *zap.Logger, sh fx.Shutdowner) { ctx, cancel := context.WithCancel(context.Background()) lc.Append(fx.Hook{ OnStart: func(_ context.Context) error { - go svc.Run(ctx) + go func() { + if err := svc.Run(ctx); err != nil { + logger.Error("Error running events service", zap.Error(err)) + if err := sh.Shutdown(fx.ExitCode(1)); err != nil { + logger.Error("Failed to shutdown", zap.Error(err)) + } + } + }() return nil }, OnStop: func(_ context.Context) error { diff --git a/internal/sms-gateway/modules/events/service.go b/internal/sms-gateway/modules/events/service.go index 202b6bb..384dbfa 100644 --- a/internal/sms-gateway/modules/events/service.go +++ b/internal/sms-gateway/modules/events/service.go @@ -3,27 +3,33 @@ package events import ( "context" "fmt" + "time" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/devices" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/sse" + "github.com/android-sms-gateway/server/internal/sms-gateway/pubsub" "go.uber.org/zap" ) +const ( + pubsubTopic = "events" +) + type Service struct { deviceSvc *devices.Service sseSvc *sse.Service pushSvc *push.Service - queue chan eventWrapper + pubsub pubsub.PubSub metrics *metrics logger *zap.Logger } -func NewService(devicesSvc *devices.Service, sseSvc *sse.Service, pushSvc *push.Service, metrics *metrics, logger *zap.Logger) *Service { +func NewService(devicesSvc *devices.Service, sseSvc *sse.Service, pushSvc *push.Service, pubsub pubsub.PubSub, metrics *metrics, logger *zap.Logger) *Service { return &Service{ deviceSvc: devicesSvc, sseSvc: sseSvc, @@ -31,44 +37,72 @@ func NewService(devicesSvc *devices.Service, sseSvc *sse.Service, pushSvc *push. metrics: metrics, - queue: make(chan eventWrapper, 128), + pubsub: pubsub, logger: logger, } } -func (s *Service) Notify(userID string, deviceID *string, event *Event) error { +func (s *Service) Notify(userID string, deviceID *string, event Event) error { + if event.EventType == "" { + return fmt.Errorf("event type is empty") + } + + subCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + wrapper := eventWrapper{ UserID: userID, DeviceID: deviceID, Event: event, } - select { - case s.queue <- wrapper: - // Successfully enqueued - s.metrics.IncrementEnqueued(string(event.eventType)) - default: - s.metrics.IncrementFailed(string(event.eventType), DeliveryTypeUnknown, FailureReasonQueueFull) - return fmt.Errorf("event queue is full") + wrapperBytes, err := wrapper.serialize() + if err != nil { + s.metrics.IncrementFailed(string(event.EventType), DeliveryTypeUnknown, FailureReasonSerializationError) + return fmt.Errorf("can't serialize event wrapper: %w", err) + } + + if err := s.pubsub.Publish(subCtx, pubsubTopic, wrapperBytes); err != nil { + s.metrics.IncrementFailed(string(event.EventType), DeliveryTypeUnknown, FailureReasonPublishError) + return fmt.Errorf("can't publish event: %w", err) } + s.metrics.IncrementEnqueued(string(event.EventType)) + return nil } -func (s *Service) Run(ctx context.Context) { +func (s *Service) Run(ctx context.Context) error { + sub, err := s.pubsub.Subscribe(ctx, pubsubTopic) + if err != nil { + return fmt.Errorf("can't subscribe to pubsub: %w", err) + } + defer sub.Close() + + ch := sub.Receive() for { select { - case wrapper := <-s.queue: - s.processEvent(wrapper) case <-ctx.Done(): s.logger.Info("Event service stopped") - return + return nil + case msg, ok := <-ch: + if !ok { + s.logger.Info("Subscription closed") + return nil + } + wrapper := new(eventWrapper) + if err := wrapper.deserialize(msg.Data); err != nil { + s.metrics.IncrementFailed(EventTypeUnknown, DeliveryTypeUnknown, FailureReasonSerializationError) + s.logger.Error("Failed to deserialize event wrapper", zap.Error(err)) + continue + } + s.processEvent(wrapper) } } } -func (s *Service) processEvent(wrapper eventWrapper) { +func (s *Service) processEvent(wrapper *eventWrapper) { // Load devices from database filters := []devices.SelectFilter{} if wrapper.DeviceID != nil { @@ -91,26 +125,26 @@ func (s *Service) processEvent(wrapper eventWrapper) { if device.PushToken != nil && *device.PushToken != "" { // Device has push token, use push service if err := s.pushSvc.Enqueue(*device.PushToken, push.Event{ - Type: wrapper.Event.eventType, - Data: wrapper.Event.data, + Type: wrapper.Event.EventType, + Data: wrapper.Event.Data, }); err != nil { s.logger.Error("Failed to enqueue push notification", zap.String("user_id", wrapper.UserID), zap.String("device_id", device.ID), zap.Error(err)) - s.metrics.IncrementFailed(string(wrapper.Event.eventType), DeliveryTypePush, FailureReasonProviderFailed) + s.metrics.IncrementFailed(string(wrapper.Event.EventType), DeliveryTypePush, FailureReasonProviderFailed) } else { - s.metrics.IncrementSent(string(wrapper.Event.eventType), DeliveryTypePush) + s.metrics.IncrementSent(string(wrapper.Event.EventType), DeliveryTypePush) } continue } // No push token, use SSE service if err := s.sseSvc.Send(device.ID, sse.Event{ - Type: wrapper.Event.eventType, - Data: wrapper.Event.data, + Type: wrapper.Event.EventType, + Data: wrapper.Event.Data, }); err != nil { s.logger.Error("Failed to send SSE notification", zap.String("user_id", wrapper.UserID), zap.String("device_id", device.ID), zap.Error(err)) - s.metrics.IncrementFailed(string(wrapper.Event.eventType), DeliveryTypeSSE, FailureReasonProviderFailed) + s.metrics.IncrementFailed(string(wrapper.Event.EventType), DeliveryTypeSSE, FailureReasonProviderFailed) } else { - s.metrics.IncrementSent(string(wrapper.Event.eventType), DeliveryTypeSSE) + s.metrics.IncrementSent(string(wrapper.Event.EventType), DeliveryTypeSSE) } } } diff --git a/internal/sms-gateway/modules/events/types.go b/internal/sms-gateway/modules/events/types.go index 76755e1..76e4d89 100644 --- a/internal/sms-gateway/modules/events/types.go +++ b/internal/sms-gateway/modules/events/types.go @@ -1,23 +1,33 @@ package events import ( + "encoding/json" + "github.com/android-sms-gateway/client-go/smsgateway" ) type Event struct { - eventType smsgateway.PushEventType - data map[string]string + EventType smsgateway.PushEventType `json:"event_type"` + Data map[string]string `json:"data"` } -func NewEvent(eventType smsgateway.PushEventType, data map[string]string) *Event { - return &Event{ - eventType: eventType, - data: data, +func NewEvent(eventType smsgateway.PushEventType, data map[string]string) Event { + return Event{ + EventType: eventType, + Data: data, } } type eventWrapper struct { - UserID string - DeviceID *string - Event *Event + UserID string `json:"user_id"` + DeviceID *string `json:"device_id,omitempty"` + Event Event `json:"event"` +} + +func (w *eventWrapper) serialize() ([]byte, error) { + return json.Marshal(w) +} + +func (w *eventWrapper) deserialize(data []byte) error { + return json.Unmarshal(data, w) } diff --git a/internal/sms-gateway/pubsub/pubsub.go b/internal/sms-gateway/pubsub/pubsub.go index f585669..84ca2e9 100644 --- a/internal/sms-gateway/pubsub/pubsub.go +++ b/internal/sms-gateway/pubsub/pubsub.go @@ -11,7 +11,9 @@ const ( topicPrefix = "sms-gateway:" ) -func New(config Config) (pubsub.PubSub, error) { +type PubSub = pubsub.PubSub + +func New(config Config) (PubSub, error) { if config.URL == "" { config.URL = "memory://" } From adb2329ee9563e60bf49ac75dd6bdb780a0896b7 Mon Sep 17 00:00:00 2001 From: Aleksandr Soloshenko Date: Thu, 16 Oct 2025 08:42:37 +0700 Subject: [PATCH 4/7] [cache] refactor Redis backend for internal client init --- internal/sms-gateway/cache/factory.go | 14 +++---- pkg/cache/cache.go | 4 ++ pkg/cache/memory.go | 4 ++ pkg/cache/redis.go | 57 +++++++++++++++++++++++---- 4 files changed, 64 insertions(+), 15 deletions(-) diff --git a/internal/sms-gateway/cache/factory.go b/internal/sms-gateway/cache/factory.go index 7689519..16659d8 100644 --- a/internal/sms-gateway/cache/factory.go +++ b/internal/sms-gateway/cache/factory.go @@ -4,7 +4,6 @@ import ( "fmt" "net/url" - "github.com/android-sms-gateway/core/redis" "github.com/android-sms-gateway/server/pkg/cache" ) @@ -40,13 +39,14 @@ func NewFactory(config Config) (Factory, error) { }, }, nil case "redis": - client, err := redis.New(redis.Config{URL: config.URL}) - if err != nil { - return nil, fmt.Errorf("can't create redis client: %w", err) - } return &factory{ new: func(name string) (Cache, error) { - return cache.NewRedis(client, name, 0), nil + return cache.NewRedis(cache.RedisConfig{ + Client: nil, + URL: config.URL, + Prefix: keyPrefix + name, + TTL: 0, + }) }, }, nil default: @@ -56,5 +56,5 @@ func NewFactory(config Config) (Factory, error) { // New implements Factory. func (f *factory) New(name string) (Cache, error) { - return f.new(keyPrefix + name) + return f.new(name) } diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index da5eb70..e50e289 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -33,4 +33,8 @@ type Cache interface { // The cache is cleared after the call. // The operation is safe for concurrent use. Drain(ctx context.Context) (map[string][]byte, error) + + // Close closes the cache. + // The operation is safe for concurrent use. + Close() error } diff --git a/pkg/cache/memory.go b/pkg/cache/memory.go index 8088c5f..b4d8d7c 100644 --- a/pkg/cache/memory.go +++ b/pkg/cache/memory.go @@ -192,4 +192,8 @@ func (m *memoryCache) cleanup(cb func()) { m.mux.Unlock() } +func (m *memoryCache) Close() error { + return nil +} + var _ Cache = (*memoryCache)(nil) diff --git a/pkg/cache/redis.go b/pkg/cache/redis.go index 3cdc47a..32e122d 100644 --- a/pkg/cache/redis.go +++ b/pkg/cache/redis.go @@ -48,26 +48,59 @@ return value ` ) +// RedisConfig configures the Redis cache backend. +type RedisConfig struct { + // Client is the Redis client to use. + // If nil, a client is created from the URL. + Client *redis.Client + + // URL is the Redis URL to use. + // If empty, the Redis client is not created. + URL string + + // Prefix is the prefix to use for all keys in the Redis cache. + Prefix string + + // TTL is the time-to-live for all cache entries. + TTL time.Duration +} + type redisCache struct { - client *redis.Client + client *redis.Client + ownedClient bool key string ttl time.Duration } -func NewRedis(client *redis.Client, prefix string, ttl time.Duration) *redisCache { - if prefix != "" && !strings.HasSuffix(prefix, ":") { - prefix += ":" +func NewRedis(config RedisConfig) (*redisCache, error) { + if config.Prefix != "" && !strings.HasSuffix(config.Prefix, ":") { + config.Prefix += ":" } - return &redisCache{ - client: client, + if config.Client == nil && config.URL == "" { + return nil, fmt.Errorf("no redis client or url provided") + } - key: prefix + redisCacheKey, + client := config.Client + if client == nil { + opt, err := redis.ParseURL(config.URL) + if err != nil { + return nil, fmt.Errorf("failed to parse redis url: %w", err) + } - ttl: ttl, + client = redis.NewClient(opt) } + + return &redisCache{ + client: client, + ownedClient: config.Client == nil, + + key: config.Prefix + redisCacheKey, + + ttl: config.TTL, + }, nil } // Cleanup implements Cache. @@ -218,4 +251,12 @@ func (r *redisCache) SetOrFail(ctx context.Context, key string, value []byte, op return nil } +func (r *redisCache) Close() error { + if r.ownedClient { + return r.client.Close() + } + + return nil +} + var _ Cache = (*redisCache)(nil) From c6802f3a9d4a4db3f3bcfff64decdb1fdfc999ab Mon Sep 17 00:00:00 2001 From: Aleksandr Soloshenko Date: Thu, 23 Oct 2025 06:13:38 +0700 Subject: [PATCH 5/7] [dev] update Makefile and gitignore --- .gitignore | 17 ++++++++--------- Makefile | 40 +++++++++++++++++++++++++++++++--------- 2 files changed, 39 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index 0c5d740..8d846b7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ # File created using '.gitignore Generator' for Visual Studio Code: https://bit.ly/vscode-gig -# Created by https://www.toptal.com/developers/gitignore/api/windows,visualstudiocode,terraform,macos,linux,go,dotenv -# Edit at https://www.toptal.com/developers/gitignore?templates=windows,visualstudiocode,terraform,macos,linux,go,dotenv +# Created by https://www.toptal.com/developers/gitignore/api/windows,visualstudiocode,macos,linux,go,dotenv,terraform +# Edit at https://www.toptal.com/developers/gitignore?templates=windows,visualstudiocode,macos,linux,go,dotenv,terraform ### dotenv ### .env @@ -157,17 +157,16 @@ $RECYCLE.BIN/ # Windows shortcuts *.lnk -# End of https://www.toptal.com/developers/gitignore/api/windows,visualstudiocode,terraform,macos,linux,go,dotenv +# End of https://www.toptal.com/developers/gitignore/api/windows,visualstudiocode,macos,linux,go,dotenv,terraform # Custom rules (everything added below won't be overriden by 'Generate .gitignore File' if you use 'Update' option) -/tmp +dist/ +tmp/ /configs/* !/configs/*.example.* -/certs - -/api/docs.go - -go.work* \ No newline at end of file +go.work* +coverage.* +benchmark.* diff --git a/Makefile b/Makefile index 9aacbd5..9f954ad 100644 --- a/Makefile +++ b/Makefile @@ -6,11 +6,37 @@ ifeq ($(OS),Windows_NT) extension = .exe endif -.DEFAULT_GOAL := build +# Default target +all: fmt lint test benchmark -init: +fmt: + golangci-lint fmt + +# Lint the code using golangci-lint +lint: + golangci-lint run --timeout=5m + +# Run tests with coverage +test: + go test -race -shuffle=on -count=1 -covermode=atomic -coverpkg=./... -coverprofile=coverage.out ./... + +# Run benchmarks +benchmark: + go test -run=^$$ -bench=. -benchmem ./... | tee benchmark.txt + +# Download dependencies +deps: go mod download +# Clean up generated files +clean: + go clean -cache -testcache + rm -f coverage.out benchmark.txt + +### + +init: deps + init-dev: init go install github.com/air-verse/air@latest \ && go install github.com/swaggo/swag/cmd/swag@latest \ @@ -31,11 +57,7 @@ db-upgrade-raw: run: go run cmd/$(project_name)/main.go -lint: - golangci-lint run ./... - -test: - go test -race -coverprofile=coverage.out -covermode=atomic ./... +test-e2e: test cd test/e2e && go test -count=1 . build: @@ -53,7 +75,7 @@ docker: docker-dev: docker compose -f deployments/docker-compose/docker-compose.dev.yml up --build -clean: +docker-clean: docker compose -f deployments/docker-compose/docker-compose.yml down --volumes -.PHONY: init init-dev air db-upgrade db-upgrade-raw run test build install docker docker-dev api-docs view-docs clean \ No newline at end of file +.PHONY: all fmt lint test benchmark deps clean init init-dev air ngrok db-upgrade db-upgrade-raw run test-e2e build install docker-build docker docker-dev docker-clean From dae0798b1274d89ebe8b995cea8c2a1b3f4982eb Mon Sep 17 00:00:00 2001 From: Aleksandr Soloshenko Date: Thu, 23 Oct 2025 06:13:57 +0700 Subject: [PATCH 6/7] [cache] fix benchmark degradation --- pkg/cache/memory_bench_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/cache/memory_bench_test.go b/pkg/cache/memory_bench_test.go index 987b339..d1d2040 100644 --- a/pkg/cache/memory_bench_test.go +++ b/pkg/cache/memory_bench_test.go @@ -17,12 +17,12 @@ func BenchmarkMemoryCache_Set(b *testing.B) { cache := cache.NewMemory(0) ctx := context.Background() key := "benchmark-key" - value := "benchmark-value" + value := []byte("benchmark-value") b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - cache.Set(ctx, key, []byte(value)) + cache.Set(ctx, key, value) } }) } @@ -69,12 +69,12 @@ func BenchmarkMemoryCache_SetOrFail(b *testing.B) { cache := cache.NewMemory(0) ctx := context.Background() key := "benchmark-key" - value := "benchmark-value" + value := []byte("benchmark-value") b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - cache.SetOrFail(ctx, key, []byte(value)) + cache.SetOrFail(ctx, key, value) } }) } @@ -310,7 +310,7 @@ func BenchmarkMemoryCache_TTLOverhead(b *testing.B) { c := cache.NewMemory(0) ctx := context.Background() key := "benchmark-key" - value := "benchmark-value" + value := []byte("benchmark-value") ttl := time.Hour benchmarks := []struct { @@ -327,9 +327,9 @@ func BenchmarkMemoryCache_TTLOverhead(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { if bm.withTTL { - c.Set(ctx, key, []byte(value), cache.WithTTL(ttl)) + c.Set(ctx, key, value, cache.WithTTL(ttl)) } else { - c.Set(ctx, key, []byte(value)) + c.Set(ctx, key, value) } } }) From ed271b22d068f234bfa7ad801bf6e6d59b34325f Mon Sep 17 00:00:00 2001 From: Aleksandr Soloshenko Date: Fri, 24 Oct 2025 10:47:35 +0700 Subject: [PATCH 7/7] [push] migrate to external cache --- go.mod | 1 + go.sum | 2 + internal/sms-gateway/app.go | 2 +- internal/sms-gateway/modules/push/module.go | 74 ++++----- internal/sms-gateway/modules/push/service.go | 151 +++++++++++------- internal/sms-gateway/modules/push/types.go | 21 ++- .../sms-gateway/modules/push/types/types.go | 4 +- 7 files changed, 158 insertions(+), 97 deletions(-) diff --git a/go.mod b/go.mod index 1a8be52..29eba45 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/nyaruka/phonenumbers v1.4.0 github.com/prometheus/client_golang v1.19.1 github.com/redis/go-redis/v9 v9.9.0 + github.com/samber/lo v1.52.0 github.com/swaggo/swag v1.16.6 go.uber.org/fx v1.24.0 go.uber.org/zap v1.27.0 diff --git a/go.sum b/go.sum index 9c4626b..1a20973 100644 --- a/go.sum +++ b/go.sum @@ -262,6 +262,8 @@ github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/samber/lo v1.52.0 h1:Rvi+3BFHES3A8meP33VPAxiBZX/Aws5RxrschYGjomw= +github.com/samber/lo v1.52.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec= diff --git a/internal/sms-gateway/app.go b/internal/sms-gateway/app.go index cf38fbf..a1b8309 100644 --- a/internal/sms-gateway/app.go +++ b/internal/sms-gateway/app.go @@ -43,7 +43,7 @@ var Module = fx.Module( openapi.Module(), handlers.Module, auth.Module, - push.Module, + push.Module(), db.Module, cache.Module(), pubsub.Module(), diff --git a/internal/sms-gateway/modules/push/module.go b/internal/sms-gateway/modules/push/module.go index 9724790..e2e2595 100644 --- a/internal/sms-gateway/modules/push/module.go +++ b/internal/sms-gateway/modules/push/module.go @@ -2,7 +2,7 @@ package push import ( "context" - "errors" + "fmt" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/fcm" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/upstream" @@ -10,41 +10,43 @@ import ( "go.uber.org/zap" ) -var Module = fx.Module( - "push", - fx.Decorate(func(log *zap.Logger) *zap.Logger { - return log.Named("push") - }), - fx.Provide(newMetrics, fx.Private), - fx.Provide( - func(cfg Config, lc fx.Lifecycle) (c client, err error) { - switch cfg.Mode { - case ModeFCM: - c, err = fcm.New(cfg.ClientOptions) - case ModeUpstream: - c, err = upstream.New(cfg.ClientOptions) - default: - return nil, errors.New("invalid push mode") - } +func Module() fx.Option { + return fx.Module( + "push", + fx.Decorate(func(log *zap.Logger) *zap.Logger { + return log.Named("push") + }), + fx.Provide(newMetrics, fx.Private), + fx.Provide( + func(cfg Config, lc fx.Lifecycle) (c client, err error) { + switch cfg.Mode { + case ModeFCM: + c, err = fcm.New(cfg.ClientOptions) + case ModeUpstream: + c, err = upstream.New(cfg.ClientOptions) + default: + return nil, fmt.Errorf("invalid push mode: %q", cfg.Mode) + } - if err != nil { - return nil, err - } + if err != nil { + return nil, err + } - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - return c.Open(ctx) - }, - OnStop: func(ctx context.Context) error { - return c.Close(ctx) - }, - }) + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + return c.Open(ctx) + }, + OnStop: func(ctx context.Context) error { + return c.Close(ctx) + }, + }) - return c, nil - }, - fx.Private, - ), - fx.Provide( - New, - ), -) + return c, nil + }, + fx.Private, + ), + fx.Provide( + New, + ), + ) +} diff --git a/internal/sms-gateway/modules/push/service.go b/internal/sms-gateway/modules/push/service.go index 7d0da19..c5fca73 100644 --- a/internal/sms-gateway/modules/push/service.go +++ b/internal/sms-gateway/modules/push/service.go @@ -5,14 +5,19 @@ import ( "fmt" "time" + "github.com/android-sms-gateway/server/internal/sms-gateway/cache" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/types" - "github.com/capcom6/go-helpers/cache" - "github.com/capcom6/go-helpers/maps" + cacheImpl "github.com/android-sms-gateway/server/pkg/cache" + "github.com/samber/lo" - "go.uber.org/fx" "go.uber.org/zap" ) +const ( + cachePrefixEvents = "events:" + cachePrefixBlacklist = "blacklist:" +) + type Config struct { Mode Mode @@ -22,53 +27,51 @@ type Config struct { Timeout time.Duration } -type Params struct { - fx.In - - Config Config - - Client client - Metrics *metrics - - Logger *zap.Logger -} - type Service struct { config Config - client client - metrics *metrics - - cache *cache.Cache[eventWrapper] - blacklist *cache.Cache[struct{}] + client client + events cache.Cache + blacklist cache.Cache - logger *zap.Logger + metrics *metrics + logger *zap.Logger } -func New(params Params) *Service { - if params.Config.Timeout == 0 { - params.Config.Timeout = time.Second +func New( + config Config, + client client, + cacheFactory cache.Factory, + metrics *metrics, + logger *zap.Logger, +) (*Service, error) { + events, err := cacheFactory.New(cachePrefixEvents) + if err != nil { + return nil, fmt.Errorf("can't create events cache: %w", err) } - if params.Config.Debounce < 5*time.Second { - params.Config.Debounce = 5 * time.Second + + blacklist, err := cacheFactory.New(cachePrefixBlacklist) + if err != nil { + return nil, fmt.Errorf("can't create blacklist cache: %w", err) } - return &Service{ - config: params.Config, + config.Timeout = max(config.Timeout, time.Second) + config.Debounce = max(config.Debounce, 5*time.Second) - client: params.Client, - metrics: params.Metrics, + return &Service{ + config: config, - cache: cache.New[eventWrapper](cache.Config{}), - blacklist: cache.New[struct{}](cache.Config{ - TTL: blacklistTimeout, - }), + client: client, + events: events, + blacklist: blacklist, - logger: params.Logger, - } + metrics: metrics, + logger: logger, + }, nil } -// Run runs the service with the provided context if a debounce is set. +// Run starts a ticker that triggers the sendAll function every debounce interval. +// It runs indefinitely until the provided context is canceled. func (s *Service) Run(ctx context.Context) { ticker := time.NewTicker(s.config.Debounce) defer ticker.Stop() @@ -85,19 +88,28 @@ func (s *Service) Run(ctx context.Context) { // Enqueue adds the data to the cache and immediately sends all messages if the debounce is 0. func (s *Service) Enqueue(token string, event types.Event) error { - if _, err := s.blacklist.Get(token); err == nil { + ctx, cancel := context.WithTimeout(context.Background(), s.config.Timeout) + defer cancel() + + if _, err := s.blacklist.Get(ctx, token); err == nil { s.metrics.IncBlacklist(BlacklistOperationSkipped) s.logger.Debug("Skipping blacklisted token", zap.String("token", token)) return nil } wrapper := eventWrapper{ - token: token, - event: &event, - retries: 0, + Token: token, + Event: event, + Retries: 0, + } + wrapperData, err := wrapper.serialize() + if err != nil { + s.metrics.IncError(1) + return fmt.Errorf("can't serialize event wrapper: %w", err) } - if err := s.cache.Set(token, wrapper); err != nil { + if err := s.events.Set(ctx, wrapper.key(), wrapperData); err != nil { + s.metrics.IncError(1) return fmt.Errorf("can't add message to cache: %w", err) } @@ -108,20 +120,43 @@ func (s *Service) Enqueue(token string, event types.Event) error { // sendAll sends messages to all targets from the cache after initializing the service. func (s *Service) sendAll(ctx context.Context) { - targets := s.cache.Drain() - if len(targets) == 0 { + rawEvents, err := s.events.Drain(ctx) + if err != nil { + s.logger.Error("Can't drain cache", zap.Error(err)) return } - messages := maps.MapValues(targets, func(w eventWrapper) types.Event { - return *w.event - }) + if len(rawEvents) == 0 { + return + } + + wrappers := lo.MapEntries( + rawEvents, + func(key string, value []byte) (string, *eventWrapper) { + wrapper := new(eventWrapper) + if err := wrapper.deserialize(value); err != nil { + s.metrics.IncError(1) + s.logger.Error("Failed to deserialize event wrapper", zap.String("key", key), zap.Binary("value", value), zap.Error(err)) + return "", nil + } + + return wrapper.Token, wrapper + }, + ) + delete(wrappers, "") + + messages := lo.MapValues( + wrappers, + func(value *eventWrapper, key string) Event { + return value.Event + }, + ) s.logger.Info("Sending messages", zap.Int("count", len(messages))) - ctx, cancel := context.WithTimeout(ctx, s.config.Timeout) + sendCtx, cancel := context.WithTimeout(ctx, s.config.Timeout) defer cancel() - errs, err := s.client.Send(ctx, messages) + errs, err := s.client.Send(sendCtx, messages) if len(errs) == 0 && err == nil { s.logger.Info("Messages sent successfully", zap.Int("count", len(messages))) return @@ -138,11 +173,11 @@ func (s *Service) sendAll(ctx context.Context) { for token, sendErr := range errs { s.logger.Error("Can't send message", zap.Error(sendErr), zap.String("token", token)) - wrapper := targets[token] - wrapper.retries++ + wrapper := wrappers[token] + wrapper.Retries++ - if wrapper.retries >= maxRetries { - if err := s.blacklist.Set(token, struct{}{}); err != nil { + if wrapper.Retries >= maxRetries { + if err := s.blacklist.Set(ctx, token, []byte{}, cacheImpl.WithTTL(blacklistTimeout)); err != nil { s.logger.Warn("Can't add to blacklist", zap.String("token", token), zap.Error(err)) } @@ -154,8 +189,16 @@ func (s *Service) sendAll(ctx context.Context) { continue } - if setErr := s.cache.SetOrFail(token, wrapper); setErr != nil { - s.logger.Info("Can't set message to cache", zap.Error(setErr)) + wrapperData, err := wrapper.serialize() + if err != nil { + s.metrics.IncError(1) + s.logger.Error("Can't serialize event wrapper", zap.Error(err)) + continue + } + + if setErr := s.events.SetOrFail(ctx, wrapper.key(), wrapperData); setErr != nil { + s.logger.Warn("Can't set message to cache", zap.Error(setErr)) + continue } s.metrics.IncRetry(RetryOutcomeRetried) diff --git a/internal/sms-gateway/modules/push/types.go b/internal/sms-gateway/modules/push/types.go index 85f9a45..eea6a51 100644 --- a/internal/sms-gateway/modules/push/types.go +++ b/internal/sms-gateway/modules/push/types.go @@ -2,6 +2,7 @@ package push import ( "context" + "encoding/json" "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/types" ) @@ -17,12 +18,24 @@ type Event = types.Event type client interface { Open(ctx context.Context) error - Send(ctx context.Context, messages map[string]types.Event) (map[string]error, error) + Send(ctx context.Context, messages map[string]Event) (map[string]error, error) Close(ctx context.Context) error } type eventWrapper struct { - token string - event *types.Event - retries int + Token string `json:"token"` + Event Event `json:"event"` + Retries int `json:"retries"` +} + +func (e *eventWrapper) key() string { + return e.Token + ":" + string(e.Event.Type) +} + +func (e *eventWrapper) serialize() ([]byte, error) { + return json.Marshal(e) +} + +func (e *eventWrapper) deserialize(data []byte) error { + return json.Unmarshal(data, e) } diff --git a/internal/sms-gateway/modules/push/types/types.go b/internal/sms-gateway/modules/push/types/types.go index 9057d96..cdfb6d6 100644 --- a/internal/sms-gateway/modules/push/types/types.go +++ b/internal/sms-gateway/modules/push/types/types.go @@ -5,6 +5,6 @@ import ( ) type Event struct { - Type smsgateway.PushEventType - Data map[string]string + Type smsgateway.PushEventType `json:"type"` + Data map[string]string `json:"data"` }