Skip to content

Commit

Permalink
Fix bug in redisshard initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
roivaz committed Aug 7, 2023
1 parent 24f7ee9 commit fd47dd9
Show file tree
Hide file tree
Showing 15 changed files with 234 additions and 120 deletions.
5 changes: 5 additions & 0 deletions controllers/redisshard_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func (r *RedisShardReconciler) setRedisRoles(ctx context.Context, key types.Name
if err != nil {
return &sharded.Shard{Name: key.Name}, &ctrl.Result{}, err
}
if pod.Status.PodIP == "" {
log.Info("waiting for pod IP to be allocated")
return &sharded.Shard{Name: key.Name}, &ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}

redisURLs[fmt.Sprintf("%s-%d", serviceName, i)] = fmt.Sprintf("redis://%s:%d", pod.Status.PodIP, 6379)
if int(masterIndex) == i {
masterHostPort = fmt.Sprintf("%s:%d", pod.Status.PodIP, 6379)
Expand Down
20 changes: 19 additions & 1 deletion controllers/sentinel_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,19 @@ import (
"github.com/3scale/saas-operator/pkg/redis/sharded"
"github.com/go-logr/logr"
grafanav1alpha1 "github.com/grafana-operator/grafana-operator/v4/api/integreatly/v1alpha1"
"golang.org/x/time/rate"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/source"
)

Expand Down Expand Up @@ -171,7 +175,7 @@ func (r *SentinelReconciler) reconcileStatus(ctx context.Context, instance *saas
masterError := &sharded.DiscoveryError_Master_SingleServerFailure{}
slaveError := &sharded.DiscoveryError_Slave_SingleServerFailure{}
if errors.As(merr, masterError) || errors.As(merr, slaveError) {
log.Error(merr, "errors occurred during discovery")
log.Error(merr, "DiscoveryError")
}

shards := make(saasv1alpha1.MonitoredShards, len(cluster.Shards))
Expand Down Expand Up @@ -215,5 +219,19 @@ func (r *SentinelReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&grafanav1alpha1.GrafanaDashboard{}).
Owns(&corev1.ConfigMap{}).
Watches(&source.Channel{Source: r.SentinelEvents.GetChannel()}, &handler.EnqueueRequestForObject{}).
WithOptions(controller.Options{
RateLimiter: AggressiveRateLimiter(),
}).
Complete(r)
}

func AggressiveRateLimiter() ratelimiter.RateLimiter {
// return workqueue.DefaultControllerRateLimiter()
return workqueue.NewMaxOfRateLimiter(
// First retries are more spaced that default
// Max retry time is limited to 10 seconds
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
6 changes: 5 additions & 1 deletion controllers/twemproxyconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -82,10 +83,10 @@ func (r *TwemproxyConfigReconciler) Reconcile(ctx context.Context, req ctrl.Requ
gen, err := twemproxyconfig.NewGenerator(
ctx, instance, r.Client, r.Pool, logger.WithName("generator"),
)

if err != nil {
return ctrl.Result{}, err
}

cm, err := gen.ConfigMap().Build(ctx, r.Client)
if err != nil {
return ctrl.Result{}, err
Expand Down Expand Up @@ -268,5 +269,8 @@ func (r *TwemproxyConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&corev1.ConfigMap{}).
Owns(&grafanav1alpha1.GrafanaDashboard{}).
Watches(&source.Channel{Source: r.SentinelEvents.GetChannel()}, &handler.EnqueueRequestForObject{}).
WithOptions(controller.Options{
RateLimiter: AggressiveRateLimiter(),
}).
Complete(r)
}
2 changes: 1 addition & 1 deletion pkg/generators/twemproxyconfig/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func NewGenerator(ctx context.Context, instance *saasv1alpha1.TwemproxyConfig, c
case true:
merr := shardedCluster.SentinelDiscover(ctx, sharded.SlaveReadOnlyDiscoveryOpt)
if merr != nil {
log.Error(merr, "errors occurred during discovery")
log.Error(merr, "DiscoveryError")
// Only sentinel/master discovery errors should return.
// Slave failures will just failover to the master without returning error (although it will be logged)
sentinelError := &sharded.DiscoveryError_Sentinel_Failure{}
Expand Down
4 changes: 3 additions & 1 deletion pkg/generators/twemproxyconfig/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/3scale/saas-operator/pkg/util"
"github.com/go-logr/logr"
"github.com/go-test/deep"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -634,7 +636,7 @@ func TestNewGenerator(t *testing.T) {
return
}
deep.CompareUnexportedFields = true
if diff := deep.Equal(got, tt.want); len(diff) != 0 {
if diff := cmp.Diff(got, tt.want, cmp.AllowUnexported(Generator{}), cmpopts.IgnoreUnexported(twemproxy.Server{})); len(diff) != 0 {
t.Errorf("NewGenerator() = diff %v", diff)
}
})
Expand Down
10 changes: 10 additions & 0 deletions pkg/redis/client/fake_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ func (fc *FakeClient) SentinelPing(ctx context.Context) error {
return rsp.InjectError()
}

func (fc *FakeClient) SentinelDo(ctx context.Context, args ...interface{}) (interface{}, error) {
rsp := fc.pop()
return rsp.InjectResponse(), rsp.InjectError()
}

func (fc *FakeClient) RedisRole(ctx context.Context) (interface{}, error) {
rsp := fc.pop()
return rsp.InjectResponse(), rsp.InjectError()
Expand Down Expand Up @@ -125,6 +130,11 @@ func (fc *FakeClient) RedisDebugSleep(ctx context.Context, duration time.Duratio
return nil
}

func (fc *FakeClient) RedisDo(ctx context.Context, args ...interface{}) (interface{}, error) {
rsp := fc.pop()
return rsp.InjectResponse(), rsp.InjectError()
}

func (fc *FakeClient) pop() (fakeRsp FakeResponse) {
fakeRsp, fc.Responses = fc.Responses[0], fc.Responses[1:]
return fakeRsp
Expand Down
10 changes: 10 additions & 0 deletions pkg/redis/client/goredis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func (c *GoRedisClient) SentinelPing(ctx context.Context) error {
return err
}

func (c *GoRedisClient) SentinelDo(ctx context.Context, args ...interface{}) (interface{}, error) {
val, err := c.redis.Do(ctx, args...).Result()
return val, err
}

func (c *GoRedisClient) RedisRole(ctx context.Context) (interface{}, error) {

val, err := c.redis.Do(ctx, "role").Result()
Expand Down Expand Up @@ -142,3 +147,8 @@ func (c *GoRedisClient) RedisDebugSleep(ctx context.Context, duration time.Durat
_, err := c.redis.Do(ctx, "debug", "sleep", fmt.Sprintf("%.1f", duration.Seconds())).Result()
return err
}

func (c *GoRedisClient) RedisDo(ctx context.Context, args ...interface{}) (interface{}, error) {
val, err := c.redis.Do(ctx, args...).Result()
return val, err
}
2 changes: 2 additions & 0 deletions pkg/redis/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ type TestableInterface interface {
SentinelSet(context.Context, string, string, string) error
SentinelPSubscribe(context.Context, ...string) (<-chan *redis.Message, func() error)
SentinelInfoCache(context.Context) (interface{}, error)
SentinelDo(context.Context, ...interface{}) (interface{}, error)
SentinelPing(ctx context.Context) error
RedisRole(context.Context) (interface{}, error)
RedisConfigGet(context.Context, string) ([]interface{}, error)
RedisConfigSet(context.Context, string, string) error
RedisSlaveOf(context.Context, string, string) error
RedisDebugSleep(context.Context, time.Duration) error
RedisDo(context.Context, ...interface{}) (interface{}, error)
Close() error
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/redis/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (srv *Server) CloseClient() error {
return srv.client.Close()
}

func (srv *Server) GetClient() client.TestableInterface {
return srv.client
}

func (srv *Server) GetHost() string {
return srv.host
}
Expand Down Expand Up @@ -170,7 +174,7 @@ func (srv *Server) SentinelInfoCache(ctx context.Context) (client.SentinelInfoCa
// When sentinel is unable to reach the redis slave the info field can be nil
// so we have to check this to avoid panics
if server.([]interface{})[1] != nil {
info := infoStringToMap(server.([]interface{})[1].(string))
info := InfoStringToMap(server.([]interface{})[1].(string))
result[shard][info["run_id"]] = client.RedisServerInfoCache{
CacheAge: time.Duration(server.([]interface{})[0].(int64)) * time.Millisecond,
Info: info,
Expand Down Expand Up @@ -248,7 +252,7 @@ func islice2imap(in interface{}) map[string]interface{} {
return m
}

func infoStringToMap(in string) map[string]string {
func InfoStringToMap(in string) map[string]string {

m := map[string]string{}
scanner := bufio.NewScanner(strings.NewReader(in))
Expand Down
2 changes: 1 addition & 1 deletion pkg/redis/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,7 +1409,7 @@ func Test_infoStringToMap(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := infoStringToMap(tt.args.in); !reflect.DeepEqual(got, tt.want) {
if got := InfoStringToMap(tt.args.in); !reflect.DeepEqual(got, tt.want) {
t.Errorf("infoStringToMap() = %v, want %v", got, tt.want)
}
})
Expand Down
83 changes: 83 additions & 0 deletions pkg/redis/sharded/redis_server.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package sharded

import (
"context"
"fmt"

"github.com/3scale/saas-operator/pkg/redis/client"
redis "github.com/3scale/saas-operator/pkg/redis/server"
"sigs.k8s.io/controller-runtime/pkg/log"
)

type RedisServer struct {
Expand Down Expand Up @@ -31,3 +35,82 @@ func NewRedisServerFromParams(srv *redis.Server, role client.Role, config map[st
Config: config,
}
}

func (srv *RedisServer) InitMaster(ctx context.Context) (bool, error) {
logger := log.FromContext(ctx, "function", "(*RedisServer).InitMaster")

role, slaveof, err := srv.RedisRole(ctx)
if err != nil {
return false, err
}

switch role {
case client.Slave:

if slaveof == "127.0.0.1" {
// needs initialization
if err := srv.RedisSlaveOf(ctx, "NO", "ONE"); err != nil {
return false, err
}
logger.Info(fmt.Sprintf("configured %s|%s as master", srv.GetAlias(), srv.ID()))
return true, nil

} else {
srv.Role = client.Slave
}

case client.Master:
srv.Role = client.Master
}

return false, nil
}

func (srv *RedisServer) InitSlave(ctx context.Context, master *RedisServer) (bool, error) {

logger := log.FromContext(ctx, "function", "(*RedisServer).InitSlave")

role, slaveof, err := srv.RedisRole(ctx)
if err != nil {
return false, err
}

switch role {
case client.Slave:

// needs initialization
if slaveof == "127.0.0.1" {
// validate first that the master is ready
role, _, err := master.RedisRole(ctx)
if err != nil || role != client.Master {
err := fmt.Errorf("shard master %s|%s is not ready", master.GetAlias(), master.ID())
logger.Error(err, "slave init failed")
return false, err

} else {
// if master ok, init slave
if err := srv.RedisSlaveOf(ctx, master.GetHost(), master.GetPort()); err != nil {
return false, err
}
logger.Info(fmt.Sprintf("configured %s|%s as slave", srv.GetAlias(), srv.ID()))
return true, nil
}

} else {
srv.Role = client.Slave
// FOR DEBUGGING
// val, err := srv.GetClient().RedisDo(ctx, "info", "replication")
// if err != nil {
// logger.Error(err, "unable to get info")
// } else {
// logger.Info("dump replication status", "Slave", srv.GetAlias())
// logger.Info(fmt.Sprintf("%s", redis.InfoStringToMap(val.(string))))
// }
}

case client.Master:
srv.Role = client.Master
}

return false, nil
}
59 changes: 25 additions & 34 deletions pkg/redis/sharded/redis_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sharded
import (
"context"
"fmt"
"net"
"sort"
"strings"

Expand Down Expand Up @@ -216,44 +215,36 @@ func (shard *Shard) GetServerByID(hostport string) (*RedisServer, error) {

// Init initializes the shard if not already initialized
func (shard *Shard) Init(ctx context.Context, masterHostPort string) ([]string, error) {
logger := log.FromContext(ctx, "function", "(*Shard).Init")
changed := []string{}
merr := util.MultiError{}
listChanged := []string{}
var master *RedisServer

for idx, srv := range shard.Servers {
role, slaveof, err := srv.RedisRole(ctx)
if err != nil {
return changed, err
// Init the master
for _, srv := range shard.Servers {
if srv.ID() == masterHostPort {
master = srv
changed, err := master.InitMaster(ctx)
if err != nil {
return listChanged, append(merr, err)
}
if changed {
listChanged = append(listChanged, master.ID())
}
}
}

if role == client.Slave {

if slaveof == "127.0.0.1" {

if masterHostPort == srv.ID() {
if err := srv.RedisSlaveOf(ctx, "NO", "ONE"); err != nil {
return changed, err
}
logger.Info(fmt.Sprintf("configured %s as master", srv.ID()))
changed = append(changed, srv.ID())
} else {
host, port, _ := net.SplitHostPort(masterHostPort)
if err := srv.RedisSlaveOf(ctx, host, port); err != nil {
return changed, err
}
logger.Info(fmt.Sprintf("configured %s as slave", srv.ID()))
changed = append(changed, srv.ID())
}

} else {
shard.Servers[idx].Role = client.Slave
// Init the slaves
for _, srv := range shard.Servers {
if srv.ID() != masterHostPort {
changed, err := srv.InitSlave(ctx, master)
if err != nil {
merr = append(merr, err)
}
if changed {
listChanged = append(listChanged, srv.ID())
}

} else if role == client.Master {
shard.Servers[idx].Role = client.Master
} else {
return changed, fmt.Errorf("unable to get role for server %s", srv.ID())
}
}

return changed, nil
return listChanged, merr.ErrorOrNil()
}
Loading

0 comments on commit fd47dd9

Please sign in to comment.