Skip to content

Commit

Permalink
Fix bug in redisshard initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
roivaz committed Aug 8, 2023
1 parent 24f7ee9 commit b12cdf0
Show file tree
Hide file tree
Showing 21 changed files with 261 additions and 142 deletions.
2 changes: 1 addition & 1 deletion api/v1alpha1/sentinel_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ type SentinelStatus struct {
// Addresses of the sentinel instances currently running
// +operator-sdk:csv:customresourcedefinitions:type=status
// +optional
Sentinels map[string]string `json:"sentinels,omitempty"`
Sentinels []string `json:"sentinels,omitempty"`
// MonitoredShards is the list of shards that the Sentinel
// resource is currently monitoring
// +operator-sdk:csv:customresourcedefinitions:type=status
Expand Down
6 changes: 3 additions & 3 deletions api/v1alpha1/twemproxyconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ type TargetServer struct {
ServerAddress string `json:"serverAddress"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:JSONPath=`.status.targets`,name=Selected Targets,type=string
// TwemproxyConfig is the Schema for the twemproxyconfigs API
type TwemproxyConfig struct {
metav1.TypeMeta `json:",inline"`
Expand Down
6 changes: 2 additions & 4 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions config/crd/bases/saas.3scale.net_sentinels.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -516,10 +516,10 @@ spec:
type: object
type: array
sentinels:
additionalProperties:
type: string
description: Addresses of the sentinel instances currently running
type: object
items:
type: string
type: array
type: object
type: object
served: true
Expand Down
6 changes: 5 additions & 1 deletion config/crd/bases/saas.3scale.net_twemproxyconfigs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ spec:
singular: twemproxyconfig
scope: Namespaced
versions:
- name: v1alpha1
- additionalPrinterColumns:
- jsonPath: .status.targets
name: Selected Targets
type: string
name: v1alpha1
schema:
openAPIV3Schema:
description: TwemproxyConfig is the Schema for the twemproxyconfigs API
Expand Down
5 changes: 5 additions & 0 deletions controllers/redisshard_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func (r *RedisShardReconciler) setRedisRoles(ctx context.Context, key types.Name
if err != nil {
return &sharded.Shard{Name: key.Name}, &ctrl.Result{}, err
}
if pod.Status.PodIP == "" {
log.Info("waiting for pod IP to be allocated")
return &sharded.Shard{Name: key.Name}, &ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}

redisURLs[fmt.Sprintf("%s-%d", serviceName, i)] = fmt.Sprintf("redis://%s:%d", pod.Status.PodIP, 6379)
if int(masterIndex) == i {
masterHostPort = fmt.Sprintf("%s:%d", pod.Status.PodIP, 6379)
Expand Down
26 changes: 22 additions & 4 deletions controllers/sentinel_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,19 @@ import (
"github.com/3scale/saas-operator/pkg/redis/sharded"
"github.com/go-logr/logr"
grafanav1alpha1 "github.com/grafana-operator/grafana-operator/v4/api/integreatly/v1alpha1"
"golang.org/x/time/rate"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/source"
)

Expand Down Expand Up @@ -151,9 +155,9 @@ func (r *SentinelReconciler) reconcileStatus(ctx context.Context, instance *saas
log logr.Logger) error {

// sentinels info to the status
sentinels := make(map[string]string, len(cluster.Sentinels))
for _, srv := range cluster.Sentinels {
sentinels[srv.GetAlias()] = srv.ID()
sentinels := make([]string, len(cluster.Sentinels))
for idx, srv := range cluster.Sentinels {
sentinels[idx] = srv.ID()
}

// redis shards info to the status
Expand All @@ -171,7 +175,7 @@ func (r *SentinelReconciler) reconcileStatus(ctx context.Context, instance *saas
masterError := &sharded.DiscoveryError_Master_SingleServerFailure{}
slaveError := &sharded.DiscoveryError_Slave_SingleServerFailure{}
if errors.As(merr, masterError) || errors.As(merr, slaveError) {
log.Error(merr, "errors occurred during discovery")
log.Error(merr, "DiscoveryError")
}

shards := make(saasv1alpha1.MonitoredShards, len(cluster.Shards))
Expand Down Expand Up @@ -215,5 +219,19 @@ func (r *SentinelReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&grafanav1alpha1.GrafanaDashboard{}).
Owns(&corev1.ConfigMap{}).
Watches(&source.Channel{Source: r.SentinelEvents.GetChannel()}, &handler.EnqueueRequestForObject{}).
WithOptions(controller.Options{
RateLimiter: AggressiveRateLimiter(),
}).
Complete(r)
}

func AggressiveRateLimiter() ratelimiter.RateLimiter {
// return workqueue.DefaultControllerRateLimiter()
return workqueue.NewMaxOfRateLimiter(
// First retries are more spaced that default
// Max retry time is limited to 10 seconds
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
8 changes: 7 additions & 1 deletion controllers/twemproxyconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -82,10 +83,10 @@ func (r *TwemproxyConfigReconciler) Reconcile(ctx context.Context, req ctrl.Requ
gen, err := twemproxyconfig.NewGenerator(
ctx, instance, r.Client, r.Pool, logger.WithName("generator"),
)

if err != nil {
return ctrl.Result{}, err
}

cm, err := gen.ConfigMap().Build(ctx, r.Client)
if err != nil {
return ctrl.Result{}, err
Expand Down Expand Up @@ -268,5 +269,10 @@ func (r *TwemproxyConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&corev1.ConfigMap{}).
Owns(&grafanav1alpha1.GrafanaDashboard{}).
Watches(&source.Channel{Source: r.SentinelEvents.GetChannel()}, &handler.EnqueueRequestForObject{}).
WithOptions(controller.Options{
RateLimiter: AggressiveRateLimiter(),
// this allows for different resources to be reconciled in parallel
MaxConcurrentReconciles: 2,
}).
Complete(r)
}
2 changes: 1 addition & 1 deletion pkg/generators/twemproxyconfig/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func NewGenerator(ctx context.Context, instance *saasv1alpha1.TwemproxyConfig, c
case true:
merr := shardedCluster.SentinelDiscover(ctx, sharded.SlaveReadOnlyDiscoveryOpt)
if merr != nil {
log.Error(merr, "errors occurred during discovery")
log.Error(merr, "DiscoveryError")
// Only sentinel/master discovery errors should return.
// Slave failures will just failover to the master without returning error (although it will be logged)
sentinelError := &sharded.DiscoveryError_Sentinel_Failure{}
Expand Down
4 changes: 3 additions & 1 deletion pkg/generators/twemproxyconfig/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/3scale/saas-operator/pkg/util"
"github.com/go-logr/logr"
"github.com/go-test/deep"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -634,7 +636,7 @@ func TestNewGenerator(t *testing.T) {
return
}
deep.CompareUnexportedFields = true
if diff := deep.Equal(got, tt.want); len(diff) != 0 {
if diff := cmp.Diff(got, tt.want, cmp.AllowUnexported(Generator{}), cmpopts.IgnoreUnexported(twemproxy.Server{})); len(diff) != 0 {
t.Errorf("NewGenerator() = diff %v", diff)
}
})
Expand Down
10 changes: 10 additions & 0 deletions pkg/redis/client/fake_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ func (fc *FakeClient) SentinelPing(ctx context.Context) error {
return rsp.InjectError()
}

func (fc *FakeClient) SentinelDo(ctx context.Context, args ...interface{}) (interface{}, error) {
rsp := fc.pop()
return rsp.InjectResponse(), rsp.InjectError()
}

func (fc *FakeClient) RedisRole(ctx context.Context) (interface{}, error) {
rsp := fc.pop()
return rsp.InjectResponse(), rsp.InjectError()
Expand Down Expand Up @@ -125,6 +130,11 @@ func (fc *FakeClient) RedisDebugSleep(ctx context.Context, duration time.Duratio
return nil
}

func (fc *FakeClient) RedisDo(ctx context.Context, args ...interface{}) (interface{}, error) {
rsp := fc.pop()
return rsp.InjectResponse(), rsp.InjectError()
}

func (fc *FakeClient) pop() (fakeRsp FakeResponse) {
fakeRsp, fc.Responses = fc.Responses[0], fc.Responses[1:]
return fakeRsp
Expand Down
10 changes: 10 additions & 0 deletions pkg/redis/client/goredis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func (c *GoRedisClient) SentinelPing(ctx context.Context) error {
return err
}

func (c *GoRedisClient) SentinelDo(ctx context.Context, args ...interface{}) (interface{}, error) {
val, err := c.redis.Do(ctx, args...).Result()
return val, err
}

func (c *GoRedisClient) RedisRole(ctx context.Context) (interface{}, error) {

val, err := c.redis.Do(ctx, "role").Result()
Expand Down Expand Up @@ -142,3 +147,8 @@ func (c *GoRedisClient) RedisDebugSleep(ctx context.Context, duration time.Durat
_, err := c.redis.Do(ctx, "debug", "sleep", fmt.Sprintf("%.1f", duration.Seconds())).Result()
return err
}

func (c *GoRedisClient) RedisDo(ctx context.Context, args ...interface{}) (interface{}, error) {
val, err := c.redis.Do(ctx, args...).Result()
return val, err
}
2 changes: 2 additions & 0 deletions pkg/redis/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ type TestableInterface interface {
SentinelSet(context.Context, string, string, string) error
SentinelPSubscribe(context.Context, ...string) (<-chan *redis.Message, func() error)
SentinelInfoCache(context.Context) (interface{}, error)
SentinelDo(context.Context, ...interface{}) (interface{}, error)
SentinelPing(ctx context.Context) error
RedisRole(context.Context) (interface{}, error)
RedisConfigGet(context.Context, string) ([]interface{}, error)
RedisConfigSet(context.Context, string, string) error
RedisSlaveOf(context.Context, string, string) error
RedisDebugSleep(context.Context, time.Duration) error
RedisDo(context.Context, ...interface{}) (interface{}, error)
Close() error
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/redis/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (srv *Server) CloseClient() error {
return srv.client.Close()
}

func (srv *Server) GetClient() client.TestableInterface {
return srv.client
}

func (srv *Server) GetHost() string {
return srv.host
}
Expand Down Expand Up @@ -170,7 +174,7 @@ func (srv *Server) SentinelInfoCache(ctx context.Context) (client.SentinelInfoCa
// When sentinel is unable to reach the redis slave the info field can be nil
// so we have to check this to avoid panics
if server.([]interface{})[1] != nil {
info := infoStringToMap(server.([]interface{})[1].(string))
info := InfoStringToMap(server.([]interface{})[1].(string))
result[shard][info["run_id"]] = client.RedisServerInfoCache{
CacheAge: time.Duration(server.([]interface{})[0].(int64)) * time.Millisecond,
Info: info,
Expand Down Expand Up @@ -248,7 +252,7 @@ func islice2imap(in interface{}) map[string]interface{} {
return m
}

func infoStringToMap(in string) map[string]string {
func InfoStringToMap(in string) map[string]string {

m := map[string]string{}
scanner := bufio.NewScanner(strings.NewReader(in))
Expand Down
2 changes: 1 addition & 1 deletion pkg/redis/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,7 +1409,7 @@ func Test_infoStringToMap(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := infoStringToMap(tt.args.in); !reflect.DeepEqual(got, tt.want) {
if got := InfoStringToMap(tt.args.in); !reflect.DeepEqual(got, tt.want) {
t.Errorf("infoStringToMap() = %v, want %v", got, tt.want)
}
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/redis/sharded/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (set DiscoveryOptionSet) Has(opt DiscoveryOption) bool {
// redis Server
// It always gets the role first
func (srv *RedisServer) Discover(ctx context.Context, opts ...DiscoveryOption) error {
logger := log.FromContext(ctx, "function", "(*RedisServer).DiscoverWithOptions()")
logger := log.FromContext(ctx, "function", "(*RedisServer).Discover()")

role, _, err := srv.RedisRole(ctx)
if err != nil {
Expand Down
83 changes: 83 additions & 0 deletions pkg/redis/sharded/redis_server.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package sharded

import (
"context"
"fmt"

"github.com/3scale/saas-operator/pkg/redis/client"
redis "github.com/3scale/saas-operator/pkg/redis/server"
"sigs.k8s.io/controller-runtime/pkg/log"
)

type RedisServer struct {
Expand Down Expand Up @@ -31,3 +35,82 @@ func NewRedisServerFromParams(srv *redis.Server, role client.Role, config map[st
Config: config,
}
}

func (srv *RedisServer) InitMaster(ctx context.Context) (bool, error) {
logger := log.FromContext(ctx, "function", "(*RedisServer).InitMaster")

role, slaveof, err := srv.RedisRole(ctx)
if err != nil {
return false, err
}

switch role {
case client.Slave:

if slaveof == "127.0.0.1" {
// needs initialization
if err := srv.RedisSlaveOf(ctx, "NO", "ONE"); err != nil {
return false, err
}
logger.Info(fmt.Sprintf("configured %s|%s as master", srv.GetAlias(), srv.ID()))
return true, nil

} else {
srv.Role = client.Slave
}

case client.Master:
srv.Role = client.Master
}

return false, nil
}

func (srv *RedisServer) InitSlave(ctx context.Context, master *RedisServer) (bool, error) {

logger := log.FromContext(ctx, "function", "(*RedisServer).InitSlave")

role, slaveof, err := srv.RedisRole(ctx)
if err != nil {
return false, err
}

switch role {
case client.Slave:

// needs initialization
if slaveof == "127.0.0.1" {
// validate first that the master is ready
role, _, err := master.RedisRole(ctx)
if err != nil || role != client.Master {
err := fmt.Errorf("shard master %s|%s is not ready", master.GetAlias(), master.ID())
logger.Error(err, "slave init failed")
return false, err

} else {
// if master ok, init slave
if err := srv.RedisSlaveOf(ctx, master.GetHost(), master.GetPort()); err != nil {
return false, err
}
logger.Info(fmt.Sprintf("configured %s|%s as slave", srv.GetAlias(), srv.ID()))
return true, nil
}

} else {
srv.Role = client.Slave
// FOR DEBUGGING
// val, err := srv.GetClient().RedisDo(ctx, "info", "replication")
// if err != nil {
// logger.Error(err, "unable to get info")
// } else {
// logger.Info("dump replication status", "Slave", srv.GetAlias())
// logger.Info(fmt.Sprintf("%s", redis.InfoStringToMap(val.(string))))
// }
}

case client.Master:
srv.Role = client.Master
}

return false, nil
}
Loading

0 comments on commit b12cdf0

Please sign in to comment.