Skip to content

Commit

Permalink
fix some errors
Browse files Browse the repository at this point in the history
  • Loading branch information
bitofsky committed Jul 2, 2021
1 parent 9667431 commit 1204196
Showing 1 changed file with 19 additions and 24 deletions.
43 changes: 19 additions & 24 deletions redis/redis-checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ import (
par "github.com/vmware/vmware-go-kcl/clientlibrary/partition"
)

type ErrLeaseNotAcquired struct {
cause string
}

// RedisCheckpoint implements the Checkpoint interface using Redis as a backend
type RedisCheckpoint struct {
prefix string
Expand All @@ -30,7 +26,8 @@ type RedisCheckpoint struct {
kclConfig *cfg.KinesisClientLibConfiguration
lastLeaseSync time.Time
rs *redsync.Redsync
mutex *redsync.Mutex
shardPrefix string
mutexPrefix string
}

type RedisCheckpointOptions struct {
Expand All @@ -51,10 +48,6 @@ var (
ctx = context.Background()
)

func (e ErrLeaseNotAcquired) Error() string {
return fmt.Sprintf("lease not acquired: %s", e.cause)
}

func NewRedisCheckpoint(kclConfig *cfg.KinesisClientLibConfiguration, options *RedisCheckpointOptions) *RedisCheckpoint {

prefix := options.Prefix
Expand All @@ -70,6 +63,8 @@ func NewRedisCheckpoint(kclConfig *cfg.KinesisClientLibConfiguration, options *R

checkpointer := &RedisCheckpoint{
prefix: prefix,
shardPrefix: prefix + ":shard:",
mutexPrefix: prefix + ":mutex:",
redisEndpoint: redisEndpoint,
LeaseDuration: kclConfig.FailoverTimeMillis,
kclConfig: kclConfig,
Expand Down Expand Up @@ -132,7 +127,7 @@ func (checkpointer *RedisCheckpoint) GetLease(shard *par.ShardStatus, newAssignT
claimRequest = currentCheckpointClaimRequest
if newAssignTo != claimRequest && !isClaimRequestExpired {
checkpointer.kclConfig.Logger.Debugf("another worker: %s has a claim on this shard. Not going to renew the lease", claimRequest)
return errors.New(chk.ErrShardClaimed)
return errors.New(chk.ErrShardClaimed + " " + claimRequest)
}
}
}
Expand All @@ -145,11 +140,11 @@ func (checkpointer *RedisCheckpoint) GetLease(shard *par.ShardStatus, newAssignT
return err
} else if checkpointer.kclConfig.EnableLeaseStealing {
if time.Now().UTC().Before(currentLeaseTimeout) && assignedTo != newAssignTo && !isClaimRequestExpired {
return ErrLeaseNotAcquired{fmt.Sprintf("current lease timeout not yet expired shard: %s, leaseTimeout: %s, assignedTo: %s, newAssignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo, newAssignTo)}
return chk.ErrLeaseNotAcquired{}
}
} else {
if time.Now().UTC().Before(currentLeaseTimeout) && assignedTo != newAssignTo {
return ErrLeaseNotAcquired{fmt.Sprintf("current lease timeout not yet expired shard: %s, leaseTimeout: %s, assignedTo: %s, newAssignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo, newAssignTo)}
return chk.ErrLeaseNotAcquired{}
}
checkpointer.kclConfig.Logger.Debugf("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s, newAssignedTo: %s", shard.ID, currentLeaseTimeout, assignedTo, newAssignTo)
}
Expand All @@ -166,7 +161,8 @@ func (checkpointer *RedisCheckpoint) GetLease(shard *par.ShardStatus, newAssignT
err = checkpointer.putItem(newCheckpoint)

if err != nil {
return ErrLeaseNotAcquired{err.Error()}
checkpointer.kclConfig.Logger.Errorf("putItem err : %s", err)
return chk.ErrLeaseNotAcquired{}
}

shard.Mux.Lock()
Expand All @@ -192,6 +188,7 @@ func (checkpointer *RedisCheckpoint) CheckpointSequence(shard *par.ShardStatus)
Checkpoint: shard.GetCheckpoint(),
AssignedTo: shard.GetLeaseOwner(),
LeaseTimeout: leaseTimeout,
ClaimRequest: shard.ClaimRequest,
}

if len(shard.ParentShardId) > 0 {
Expand Down Expand Up @@ -219,10 +216,8 @@ func (checkpointer *RedisCheckpoint) FetchCheckpoint(shard *par.ShardStatus) err
return chk.ErrSequenceIDNotFound
}

sequenceID := checkpoint.Checkpoint

checkpointer.kclConfig.Logger.Debugf("Retrieved Shard Iterator %s", sequenceID)
shard.SetCheckpoint(sequenceID)
checkpointer.kclConfig.Logger.Debugf("Retrieved Shard Iterator %s", checkpoint.Checkpoint)
shard.SetCheckpoint(checkpoint.Checkpoint)
shard.SetLeaseOwner(checkpoint.AssignedTo)

// Use up-to-date leaseTimeout to avoid ConditionalCheckFailedException when claiming
Expand Down Expand Up @@ -360,21 +355,21 @@ func (checkpointer *RedisCheckpoint) syncLeases(shardStatus map[string]*par.Shar
checkpointer.lastLeaseSync = time.Now()
var cursor uint64

iter := checkpointer.svc.Scan(ctx, cursor, checkpointer.prefix+"*", 1).Iterator()
iter := checkpointer.svc.Scan(ctx, cursor, checkpointer.shardPrefix+"*", 1).Iterator()

for iter.Next(ctx) {
key := iter.Val()
j, err := checkpointer.svc.Get(ctx, key).Result()

if err != nil { // just logging
log.Errorf("syncLeases Get Error: %s, %s", err.Error(), key)
log.Errorf("syncLeases Get Error: %s, %s", err, key)
continue
}

cp, err := jsonToCheckpoint(j)

if err != nil { // just logging
log.Errorf("syncLeases jsonToCheckpoint Error: %s, %s", err.Error(), j)
log.Errorf("syncLeases jsonToCheckpoint Error: %s, %s", err, j)
continue
}

Expand All @@ -401,7 +396,7 @@ func (checkpointer *RedisCheckpoint) putItem(newCheckpoint *ShardCheckpoint) err
var _ string

if j, err = json.Marshal(newCheckpoint); err == nil {
if err = checkpointer.svc.Set(ctx, checkpointer.prefix+newCheckpoint.ShardID, string(j), 0).Err(); err == nil {
if err = checkpointer.svc.Set(ctx, checkpointer.shardPrefix+newCheckpoint.ShardID, string(j), 0).Err(); err == nil {
checkpointer.kclConfig.Logger.Infof("putItem : %s", j)
return nil
}
Expand All @@ -419,7 +414,7 @@ func jsonToCheckpoint(j string) (*ShardCheckpoint, error) {
}

func (checkpointer *RedisCheckpoint) getItem(shardID string) (*ShardCheckpoint, error) {
if r, err := checkpointer.svc.Get(ctx, checkpointer.prefix+shardID).Result(); err != nil {
if r, err := checkpointer.svc.Get(ctx, checkpointer.shardPrefix+shardID).Result(); err != nil {
if err == Nil {
return &ShardCheckpoint{ShardID: shardID}, nil
} else {
Expand All @@ -432,7 +427,7 @@ func (checkpointer *RedisCheckpoint) getItem(shardID string) (*ShardCheckpoint,
}

func (checkpointer *RedisCheckpoint) removeItem(shardID string) error {
err := checkpointer.svc.Del(ctx, checkpointer.prefix+shardID).Err()
err := checkpointer.svc.Del(ctx, checkpointer.shardPrefix+shardID).Err()

if err != nil {
checkpointer.kclConfig.Logger.Infof("removeItem : %s", shardID)
Expand All @@ -442,7 +437,7 @@ func (checkpointer *RedisCheckpoint) removeItem(shardID string) error {
}

func (checkpointer *RedisCheckpoint) lock(shardID string) (*redsync.Mutex, error) {
mutexname := checkpointer.prefix + ":locks:" + shardID
mutexname := checkpointer.mutexPrefix + shardID
mutex := checkpointer.rs.NewMutex(mutexname)

if err := mutex.Lock(); err != nil {
Expand Down

0 comments on commit 1204196

Please sign in to comment.