File tree Expand file tree Collapse file tree 6 files changed +90
-0
lines changed Expand file tree Collapse file tree 6 files changed +90
-0
lines changed Original file line number Diff line number Diff line change @@ -15,6 +15,7 @@ type Config struct {
1515 Tasks Tasks `yaml:"tasks"` // tasks config
1616 SSE SSE `yaml:"sse"` // server-sent events config
1717 Cache Cache `yaml:"cache"` // cache (memory or redis) config
18+ PubSub PubSub `yaml:"pubsub"` // pubsub (memory or redis) config
1819}
1920
2021type Gateway struct {
@@ -75,6 +76,10 @@ type Cache struct {
7576 URL string `yaml:"url" envconfig:"CACHE__URL"`
7677}
7778
79+ type PubSub struct {
80+ URL string `yaml:"url" envconfig:"PUBSUB__URL"`
81+ }
82+
7883var defaultConfig = Config {
7984 Gateway : Gateway {Mode : GatewayModePublic },
8085 HTTP : HTTP {
@@ -103,4 +108,7 @@ var defaultConfig = Config{
103108 Cache : Cache {
104109 URL : "memory://" ,
105110 },
111+ PubSub : PubSub {
112+ URL : "memory://" ,
113+ },
106114}
Original file line number Diff line number Diff line change @@ -11,6 +11,7 @@ import (
1111 "github.com/android-sms-gateway/server/internal/sms-gateway/modules/messages"
1212 "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push"
1313 "github.com/android-sms-gateway/server/internal/sms-gateway/modules/sse"
14+ "github.com/android-sms-gateway/server/internal/sms-gateway/pubsub"
1415 "github.com/capcom6/go-infra-fx/config"
1516 "github.com/capcom6/go-infra-fx/db"
1617 "github.com/capcom6/go-infra-fx/http"
@@ -121,4 +122,10 @@ var Module = fx.Module(
121122 URL : cfg .Cache .URL ,
122123 }
123124 }),
125+ fx .Provide (func (cfg Config ) pubsub.Config {
126+ return pubsub.Config {
127+ URL : cfg .PubSub .URL ,
128+ BufferSize : 128 ,
129+ }
130+ }),
124131)
Original file line number Diff line number Diff line change @@ -21,6 +21,7 @@ import (
2121 "github.com/android-sms-gateway/server/internal/sms-gateway/modules/webhooks"
2222 "github.com/android-sms-gateway/server/internal/sms-gateway/online"
2323 "github.com/android-sms-gateway/server/internal/sms-gateway/openapi"
24+ "github.com/android-sms-gateway/server/internal/sms-gateway/pubsub"
2425 "github.com/capcom6/go-infra-fx/cli"
2526 "github.com/capcom6/go-infra-fx/db"
2627 "github.com/capcom6/go-infra-fx/http"
@@ -45,6 +46,7 @@ var Module = fx.Module(
4546 push .Module ,
4647 db .Module ,
4748 cache .Module (),
49+ pubsub .Module (),
4850 events .Module ,
4951 messages .Module ,
5052 health .Module ,
Original file line number Diff line number Diff line change 1+ package pubsub
2+
3+ // Config controls the PubSub backend via a URL (e.g., "memory://", "redis://...").
4+ type Config struct {
5+ URL string
6+ BufferSize uint
7+ }
Original file line number Diff line number Diff line change 1+ package pubsub
2+
3+ import (
4+ "context"
5+
6+ "github.com/android-sms-gateway/server/pkg/pubsub"
7+ "go.uber.org/fx"
8+ "go.uber.org/zap"
9+ )
10+
11+ func Module () fx.Option {
12+ return fx .Module (
13+ "pubsub" ,
14+ fx .Decorate (func (log * zap.Logger ) * zap.Logger {
15+ return log .Named ("pubsub" )
16+ }),
17+ fx .Provide (New ),
18+ fx .Invoke (func (pubsub pubsub.PubSub , lc fx.Lifecycle ) {
19+ lc .Append (fx.Hook {
20+ OnStop : func (_ context.Context ) error {
21+ return pubsub .Close ()
22+ },
23+ })
24+ }),
25+ )
26+ }
Original file line number Diff line number Diff line change 1+ package pubsub
2+
3+ import (
4+ "fmt"
5+ "net/url"
6+
7+ "github.com/android-sms-gateway/core/redis"
8+ "github.com/android-sms-gateway/server/pkg/pubsub"
9+ )
10+
11+ const (
12+ topicPrefix = "sms-gateway:"
13+ )
14+
15+ func New (config Config ) (pubsub.PubSub , error ) {
16+ if config .URL == "" {
17+ config .URL = "memory://"
18+ }
19+
20+ u , err := url .Parse (config .URL )
21+ if err != nil {
22+ return nil , fmt .Errorf ("can't parse url: %w" , err )
23+ }
24+
25+ opts := []pubsub.Option {}
26+ opts = append (opts , pubsub .WithBufferSize (config .BufferSize ))
27+
28+ switch u .Scheme {
29+ case "memory" :
30+ return pubsub .NewMemory (opts ... ), nil
31+ case "redis" :
32+ client , err := redis .New (redis.Config {URL : config .URL })
33+ if err != nil {
34+ return nil , fmt .Errorf ("can't create redis client: %w" , err )
35+ }
36+ return pubsub .NewRedis (client , topicPrefix , opts ... ), nil
37+ default :
38+ return nil , fmt .Errorf ("invalid scheme: %s" , u .Scheme )
39+ }
40+ }
You can’t perform that action at this time.
0 commit comments