Skip to content

Commit c96b2b5

Browse files
committed
[pubsub] add module
1 parent 56f4ebd commit c96b2b5

File tree

6 files changed

+80
-0
lines changed

6 files changed

+80
-0
lines changed

internal/config/config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type Config struct {
1515
Tasks Tasks `yaml:"tasks"` // tasks config
1616
SSE SSE `yaml:"sse"` // server-sent events config
1717
Cache Cache `yaml:"cache"` // cache (memory or redis) config
18+
PubSub PubSub `yaml:"pubsub"` // pubsub (memory or redis) config
1819
}
1920

2021
type Gateway struct {
@@ -75,6 +76,10 @@ type Cache struct {
7576
URL string `yaml:"url" envconfig:"CACHE__URL"`
7677
}
7778

79+
type PubSub struct {
80+
URL string `yaml:"url" envconfig:"PUBSUB__URL"`
81+
}
82+
7883
var defaultConfig = Config{
7984
Gateway: Gateway{Mode: GatewayModePublic},
8085
HTTP: HTTP{
@@ -103,4 +108,7 @@ var defaultConfig = Config{
103108
Cache: Cache{
104109
URL: "memory://",
105110
},
111+
PubSub: PubSub{
112+
URL: "memory://",
113+
},
106114
}

internal/config/module.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/messages"
1212
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push"
1313
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/sse"
14+
"github.com/android-sms-gateway/server/internal/sms-gateway/pubsub"
1415
"github.com/capcom6/go-infra-fx/config"
1516
"github.com/capcom6/go-infra-fx/db"
1617
"github.com/capcom6/go-infra-fx/http"
@@ -121,4 +122,10 @@ var Module = fx.Module(
121122
URL: cfg.Cache.URL,
122123
}
123124
}),
125+
fx.Provide(func(cfg Config) pubsub.Config {
126+
return pubsub.Config{
127+
URL: cfg.PubSub.URL,
128+
BufferSize: 128,
129+
}
130+
}),
124131
)

internal/sms-gateway/app.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/webhooks"
2222
"github.com/android-sms-gateway/server/internal/sms-gateway/online"
2323
"github.com/android-sms-gateway/server/internal/sms-gateway/openapi"
24+
"github.com/android-sms-gateway/server/internal/sms-gateway/pubsub"
2425
"github.com/capcom6/go-infra-fx/cli"
2526
"github.com/capcom6/go-infra-fx/db"
2627
"github.com/capcom6/go-infra-fx/http"
@@ -45,6 +46,7 @@ var Module = fx.Module(
4546
push.Module,
4647
db.Module,
4748
cache.Module(),
49+
pubsub.Module(),
4850
events.Module,
4951
messages.Module,
5052
health.Module,
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package pubsub
2+
3+
// Config controls the PubSub backend via a URL (e.g., "memory://", "redis://...").
4+
type Config struct {
5+
URL string
6+
BufferSize uint
7+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package pubsub
2+
3+
import (
4+
"go.uber.org/fx"
5+
"go.uber.org/zap"
6+
)
7+
8+
func Module() fx.Option {
9+
return fx.Module(
10+
"pubsub",
11+
fx.Decorate(func(log *zap.Logger) *zap.Logger {
12+
return log.Named("pubsub")
13+
}),
14+
fx.Provide(New),
15+
)
16+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package pubsub
2+
3+
import (
4+
"fmt"
5+
"net/url"
6+
7+
"github.com/android-sms-gateway/core/redis"
8+
"github.com/android-sms-gateway/server/pkg/pubsub"
9+
)
10+
11+
const (
12+
topicPrefix = "sms-gateway:"
13+
)
14+
15+
func New(config Config) (pubsub.PubSub, error) {
16+
if config.URL == "" {
17+
config.URL = "memory://"
18+
}
19+
20+
u, err := url.Parse(config.URL)
21+
if err != nil {
22+
return nil, fmt.Errorf("can't parse url: %w", err)
23+
}
24+
25+
opts := []pubsub.Option{}
26+
opts = append(opts, pubsub.WithBufferSize(config.BufferSize))
27+
28+
switch u.Scheme {
29+
case "memory":
30+
return pubsub.NewMemory(opts...), nil
31+
case "redis":
32+
client, err := redis.New(redis.Config{URL: config.URL})
33+
if err != nil {
34+
return nil, fmt.Errorf("can't create redis client: %w", err)
35+
}
36+
return pubsub.NewRedis(client, topicPrefix, opts...), nil
37+
default:
38+
return nil, fmt.Errorf("invalid scheme: %s", u.Scheme)
39+
}
40+
}

0 commit comments

Comments
 (0)