Skip to content

Commit

Permalink
Nats connection options (#437)
Browse files Browse the repository at this point in the history
* Replace deprecated DisconnectHandler with DisconnectErrHandler and provide output of the underlying disconnection error

* Add additional nats configuration options

* Bump nats client from 1.25.0 to 1.38.0

* Update docs with new nats options
  • Loading branch information
rsafonseca authored Jan 21, 2025
1 parent 38f56ae commit 2bbd948
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 46 deletions.
15 changes: 15 additions & 0 deletions cluster/nats_rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ type NatsRPCClient struct {
metricsReporters []metrics.Reporter
appDieChan chan bool
websocketCompression bool
reconnectJitter time.Duration
reconnectJitterTLS time.Duration
reconnectWait time.Duration
pingInterval time.Duration
maxPingsOutstanding int
}

// NewNatsRPCClient ctor
Expand Down Expand Up @@ -88,6 +93,11 @@ func (ns *NatsRPCClient) configure(config config.NatsRPCClientConfig) error {
return constants.ErrNatsNoRequestTimeout
}
ns.websocketCompression = config.WebsocketCompression
ns.reconnectJitter = config.ReconnectJitter
ns.reconnectJitterTLS = config.ReconnectJitterTLS
ns.reconnectWait = config.ReconnectWait
ns.pingInterval = config.PingInterval
ns.maxPingsOutstanding = config.MaxPingsOutstanding
return nil
}

Expand Down Expand Up @@ -240,9 +250,14 @@ func (ns *NatsRPCClient) Init() error {
conn, err := setupNatsConn(
ns.connString,
ns.appDieChan,
nats.RetryOnFailedConnect(true),
nats.MaxReconnects(ns.maxReconnectionRetries),
nats.Timeout(ns.connectionTimeout),
nats.Compression(ns.websocketCompression),
nats.ReconnectJitter(ns.reconnectJitter, ns.reconnectJitterTLS),
nats.ReconnectWait(ns.reconnectWait),
nats.PingInterval(ns.pingInterval),
nats.MaxPingsOutstanding(ns.maxPingsOutstanding),
)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions cluster/nats_rpc_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func getChannel(serverType, serverID string) string {
func setupNatsConn(connectString string, appDieChan chan bool, options ...nats.Option) (*nats.Conn, error) {
natsOptions := append(
options,
nats.DisconnectHandler(func(_ *nats.Conn) {
logger.Log.Warn("disconnected from nats!")
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
logger.Log.Warnf("disconnected from nats! Reason: %q\n", err)
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
logger.Log.Warnf("reconnected to nats server %s with address %s in cluster %s!", nc.ConnectedServerName(), nc.ConnectedAddr(), nc.ConnectedClusterName())
Expand Down
17 changes: 16 additions & 1 deletion cluster/nats_rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type NatsRPCServer struct {
sessionPool session.SessionPool
appDieChan chan bool
websocketCompression bool
reconnectJitter time.Duration
reconnectJitterTLS time.Duration
reconnectWait time.Duration
pingInterval time.Duration
maxPingsOutstanding int
}

// NewNatsRPCServer ctor
Expand Down Expand Up @@ -116,6 +121,11 @@ func (ns *NatsRPCServer) configure(config config.NatsRPCServerConfig) error {
ns.responses = make([]*protos.Response, ns.service)
ns.requests = make([]*protos.Request, ns.service)
ns.websocketCompression = config.WebsocketCompression
ns.reconnectJitter = config.ReconnectJitter
ns.reconnectJitterTLS = config.ReconnectJitterTLS
ns.reconnectWait = config.ReconnectWait
ns.pingInterval = config.PingInterval
ns.maxPingsOutstanding = config.MaxPingsOutstanding
return nil
}

Expand Down Expand Up @@ -275,7 +285,7 @@ func (ns *NatsRPCServer) processMessages(threadID int) {
Msg: err.Error(),
},
}

logger.Log.Errorf("error getting context from request: %s", err)
} else {
ns.responses[threadID], err = ns.pitayaServer.Call(ctx, ns.requests[threadID])
Expand Down Expand Up @@ -332,9 +342,14 @@ func (ns *NatsRPCServer) Init() error {
conn, err := setupNatsConn(
ns.connString,
ns.appDieChan,
nats.RetryOnFailedConnect(true),
nats.MaxReconnects(ns.maxReconnectionRetries),
nats.Timeout(ns.connectionTimeout),
nats.Compression(ns.websocketCompression),
nats.ReconnectJitter(ns.reconnectJitter, ns.reconnectJitterTLS),
nats.ReconnectWait(ns.reconnectWait),
nats.PingInterval(ns.pingInterval),
nats.MaxPingsOutstanding(ns.maxPingsOutstanding),
)
if err != nil {
return err
Expand Down
20 changes: 20 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ type NatsRPCClientConfig struct {
RequestTimeout time.Duration `mapstructure:"requesttimeout"`
ConnectionTimeout time.Duration `mapstructure:"connectiontimeout"`
WebsocketCompression bool `mapstructure:"websocketcompression"`
ReconnectJitter time.Duration `mapstructure:"reconnectjitter"`
ReconnectJitterTLS time.Duration `mapstructure:"reconnectjittertls"`
ReconnectWait time.Duration `mapstructure:"reconnectwait"`
PingInterval time.Duration `mapstructure:"pinginterval"`
MaxPingsOutstanding int `mapstructure:"maxpingsoutstanding"`
}

// newDefaultNatsRPCClientConfig provides default nats client configuration
Expand All @@ -219,6 +224,11 @@ func newDefaultNatsRPCClientConfig() *NatsRPCClientConfig {
RequestTimeout: time.Duration(5 * time.Second),
ConnectionTimeout: time.Duration(2 * time.Second),
WebsocketCompression: true,
ReconnectJitter: time.Duration(100 * time.Millisecond),
ReconnectJitterTLS: time.Duration(1 * time.Second),
ReconnectWait: time.Duration(time.Second),
PingInterval: time.Duration(2 * time.Minute),
MaxPingsOutstanding: 3,
}
}

Expand All @@ -233,6 +243,11 @@ type NatsRPCServerConfig struct {
Services int `mapstructure:"services"`
ConnectionTimeout time.Duration `mapstructure:"connectiontimeout"`
WebsocketCompression bool `mapstructure:"websocketcompression"`
ReconnectJitter time.Duration `mapstructure:"reconnectjitter"`
ReconnectJitterTLS time.Duration `mapstructure:"reconnectjittertls"`
ReconnectWait time.Duration `mapstructure:"reconnectwait"`
PingInterval time.Duration `mapstructure:"pinginterval"`
MaxPingsOutstanding int `mapstructure:"maxpingsoutstanding"`
}

// newDefaultNatsRPCServerConfig provides default nats server configuration
Expand All @@ -250,6 +265,11 @@ func newDefaultNatsRPCServerConfig() *NatsRPCServerConfig {
Services: 30,
ConnectionTimeout: time.Duration(2 * time.Second),
WebsocketCompression: true,
ReconnectJitter: time.Duration(100 * time.Millisecond),
ReconnectJitterTLS: time.Duration(1 * time.Second),
ReconnectWait: time.Duration(time.Second),
PingInterval: time.Duration(2 * time.Minute),
MaxPingsOutstanding: 3,
}
}

Expand Down
10 changes: 10 additions & 0 deletions config/viper_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,22 @@ func (c *Config) fillDefaultValues() {
"pitaya.cluster.rpc.client.nats.connectiontimeout": pitayaConfig.Cluster.RPC.Client.Nats.ConnectionTimeout,
"pitaya.cluster.rpc.client.nats.maxreconnectionretries": pitayaConfig.Cluster.RPC.Client.Nats.MaxReconnectionRetries,
"pitaya.cluster.rpc.client.nats.websocketcompression": pitayaConfig.Cluster.RPC.Client.Nats.WebsocketCompression,
"pitaya.cluster.rpc.client.nats.reconnectjitter": pitayaConfig.Cluster.RPC.Client.Nats.ReconnectJitter,
"pitaya.cluster.rpc.client.nats.reconnectjittertls": pitayaConfig.Cluster.RPC.Client.Nats.ReconnectJitterTLS,
"pitaya.cluster.rpc.client.nats.reconnectwait": pitayaConfig.Cluster.RPC.Client.Nats.ReconnectWait,
"pitaya.cluster.rpc.client.nats.pinginterval": pitayaConfig.Cluster.RPC.Client.Nats.PingInterval,
"pitaya.cluster.rpc.client.nats.maxpingsoutstanding": pitayaConfig.Cluster.RPC.Client.Nats.MaxPingsOutstanding,
"pitaya.cluster.rpc.client.nats.requesttimeout": pitayaConfig.Cluster.RPC.Client.Nats.RequestTimeout,
"pitaya.cluster.rpc.server.grpc.port": pitayaConfig.Cluster.RPC.Server.Grpc.Port,
"pitaya.cluster.rpc.server.nats.connect": pitayaConfig.Cluster.RPC.Server.Nats.Connect,
"pitaya.cluster.rpc.server.nats.connectiontimeout": pitayaConfig.Cluster.RPC.Server.Nats.ConnectionTimeout,
"pitaya.cluster.rpc.server.nats.maxreconnectionretries": pitayaConfig.Cluster.RPC.Server.Nats.MaxReconnectionRetries,
"pitaya.cluster.rpc.server.nats.websocketcompression": pitayaConfig.Cluster.RPC.Server.Nats.WebsocketCompression,
"pitaya.cluster.rpc.server.nats.reconnectjitter": pitayaConfig.Cluster.RPC.Server.Nats.ReconnectJitter,
"pitaya.cluster.rpc.server.nats.reconnectjittertls": pitayaConfig.Cluster.RPC.Server.Nats.ReconnectJitterTLS,
"pitaya.cluster.rpc.server.nats.reconnectwait": pitayaConfig.Cluster.RPC.Server.Nats.ReconnectWait,
"pitaya.cluster.rpc.server.nats.pinginterval": pitayaConfig.Cluster.RPC.Server.Nats.PingInterval,
"pitaya.cluster.rpc.server.nats.maxpingsoutstanding": pitayaConfig.Cluster.RPC.Server.Nats.MaxPingsOutstanding,
"pitaya.cluster.rpc.server.nats.services": pitayaConfig.Cluster.RPC.Server.Nats.Services,
"pitaya.cluster.rpc.server.nats.buffer.messages": pitayaConfig.Cluster.RPC.Server.Nats.Buffer.Messages,
"pitaya.cluster.rpc.server.nats.buffer.push": pitayaConfig.Cluster.RPC.Server.Nats.Buffer.Push,
Expand Down
40 changes: 40 additions & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,26 @@ The configurations only need to be set if the RPC Service is enabled with the gi
- true
- bool
- Enables compression in websocket connections to NATS. Needs both client and server to be enabled
* - pitaya.cluster.rpc.client.nats.reconnectjitter
- 100ms
- time.Time
- ReconnectJitter sets the upper bound for a random delay added to ReconnectWait during a reconnect when no TLS is used
* - pitaya.cluster.rpc.client.nats.reconnectjittertls
- 1s
- time.Time
- ReconnectJitterTLS sets the upper bound for a random delay added to ReconnectWait during a reconnect when TLS is used
* - pitaya.cluster.rpc.client.nats.reconnectwait
- 1s
- time.Time
- ReconnectWait sets the time to backoff after attempting to (and failing to) reconnect
* - pitaya.cluster.rpc.client.nats.pinginterval
- 2m
- time.Time
- PingInterval is the period at which the client will be sending ping commands to the server, disabled if 0 or negative
* - pitaya.cluster.rpc.client.nats.maxpingsoutstanding
- 3
- int
- MaxPingsOutstanding is the maximum number of pending ping commands that can be awaiting a response before raising an ErrStaleConnection error
* - pitaya.cluster.rpc.server.nats.connect
- nats://localhost:4222
- string
Expand All @@ -150,6 +170,26 @@ The configurations only need to be set if the RPC Service is enabled with the gi
- true
- bool
- Enables compression in websocket connections to NATS. Needs both client and server to be enabled
* - pitaya.cluster.rpc.server.nats.reconnectjitter
- 100ms
- time.Time
- ReconnectJitter sets the upper bound for a random delay added to ReconnectWait during a reconnect when no TLS is used
* - pitaya.cluster.rpc.server.nats.reconnectjittertls
- 1s
- time.Time
- ReconnectJitterTLS sets the upper bound for a random delay added to ReconnectWait during a reconnect when TLS is used
* - pitaya.cluster.rpc.server.nats.reconnectwait
- 1s
- time.Time
- ReconnectWait sets the time to backoff after attempting to (and failing to) reconnect
* - pitaya.cluster.rpc.server.nats.pinginterval
- 2m
- time.Time
- PingInterval is the period at which the client will be sending ping commands to the server, disabled if 0 or negative
* - pitaya.cluster.rpc.server.nats.maxpingsoutstanding
- 3
- int
- MaxPingsOutstanding is the maximum number of pending ping commands that can be awaiting a response before raising an ErrStaleConnection error
* - pitaya.cluster.rpc.server.grpc.port
- 3434
- int
Expand Down
20 changes: 10 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/mailgun/proxyproto v1.0.0
github.com/mitchellh/mapstructure v1.5.0
github.com/nats-io/nats-server/v2 v2.8.4
github.com/nats-io/nats.go v1.25.0
github.com/nats-io/nats.go v1.38.0
github.com/nats-io/nuid v1.0.1
github.com/opentracing/opentracing-go v1.2.0
github.com/prometheus/client_golang v1.16.0
Expand All @@ -26,7 +26,7 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.5.11
go.etcd.io/etcd/client/v3 v3.5.11
go.etcd.io/etcd/tests/v3 v3.5.11
golang.org/x/net v0.17.0
golang.org/x/net v0.25.0
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
)
Expand All @@ -45,7 +45,6 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/garyburd/redigo v1.6.4 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-playground/locales v0.14.1 // indirect
Expand All @@ -62,15 +61,15 @@ require (
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.6 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nkeys v0.4.9 // indirect
github.com/pelletier/go-toml/v2 v2.0.7 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down Expand Up @@ -102,12 +101,13 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.10.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
Expand Down
Loading

0 comments on commit 2bbd948

Please sign in to comment.