Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/main/java/io/lettuce/core/failover/HealthStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.lettuce.core.failover;

public enum HealthStatus {
UNKNOWN, HEALTHY, UNHEALTHY
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume UNKNOWN status is not used as of now, but is intended to be used once we have actual health checks as the initial state, till we get a result from the first health check?

}
8 changes: 2 additions & 6 deletions src/main/java/io/lettuce/core/failover/MultiDbClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ private <K, V> RedisDatabase<StatefulRedisConnection<K, V>> createRedisDatabase(
RedisURI uri = config.getRedisURI();
StatefulRedisConnection<K, V> connection = connect(codec, uri);
DatabaseEndpoint databaseEndpoint = extractDatabaseEndpoint(connection);
RedisDatabase<StatefulRedisConnection<K, V>> database = new RedisDatabase<>(
new RedisDatabase.RedisDatabaseConfig(uri, config.getWeight(), config.getCircuitBreakerConfig()), connection,
databaseEndpoint);
RedisDatabase<StatefulRedisConnection<K, V>> database = new RedisDatabase<>(config, connection, databaseEndpoint);

return database;
}
Expand Down Expand Up @@ -138,9 +136,7 @@ private <K, V> RedisDatabase<StatefulRedisPubSubConnection<K, V>> createRedisDat
RedisURI uri = config.getRedisURI();
StatefulRedisPubSubConnection<K, V> connection = connectPubSub(codec, uri);
DatabaseEndpoint databaseEndpoint = extractDatabaseEndpoint(connection);
RedisDatabase<StatefulRedisPubSubConnection<K, V>> database = new RedisDatabase<>(
new RedisDatabase.RedisDatabaseConfig(uri, config.getWeight(), config.getCircuitBreakerConfig()), connection,
databaseEndpoint);
RedisDatabase<StatefulRedisPubSubConnection<K, V>> database = new RedisDatabase<>(config, connection, databaseEndpoint);
return database;
}

Expand Down
33 changes: 8 additions & 25 deletions src/main/java/io/lettuce/core/failover/RedisDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.failover.CircuitBreaker.CircuitBreakerConfig;

/**
* Represents a Redis database with a weight and a connection.
Expand All @@ -24,12 +23,14 @@ public class RedisDatabase<C extends StatefulRedisConnection<?, ?>> {

private final CircuitBreaker circuitBreaker;

public RedisDatabase(RedisDatabaseConfig config, C connection, DatabaseEndpoint databaseEndpoint) {
this.redisURI = config.redisURI;
this.weight = config.weight;
private HealthStatus healthStatus = HealthStatus.HEALTHY;

public RedisDatabase(DatabaseConfig config, C connection, DatabaseEndpoint databaseEndpoint) {
this.redisURI = config.getRedisURI();
this.weight = config.getWeight();
this.connection = connection;
this.databaseEndpoint = databaseEndpoint;
this.circuitBreaker = new CircuitBreaker(config.circuitBreakerConfig);
this.circuitBreaker = new CircuitBreaker(config.getCircuitBreakerConfig());
databaseEndpoint.setCircuitBreaker(circuitBreaker);
}

Expand All @@ -53,26 +54,8 @@ public CircuitBreaker getCircuitBreaker() {
return circuitBreaker;
}

public static class RedisDatabaseConfig {

private RedisURI redisURI;

private float weight;

private CircuitBreakerConfig circuitBreakerConfig;

public RedisDatabaseConfig(RedisURI redisURI, float weight) {
this.redisURI = redisURI;
this.weight = weight;
this.circuitBreakerConfig = CircuitBreakerConfig.DEFAULT;
}

public RedisDatabaseConfig(RedisURI redisURI, float weight, CircuitBreakerConfig circuitBreakerConfig) {
this.redisURI = redisURI;
this.weight = weight;
this.circuitBreakerConfig = circuitBreakerConfig;
}

public HealthStatus getHealthStatus() {
return healthStatus;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public StatefulRedisMultiDbConnectionImpl(Map<RedisURI, RedisDatabase<C>> connec
this.codec = codec;
this.parser = parser;
this.connectionFactory = connectionFactory;
this.current = connections.values().stream().max(Comparator.comparingDouble(RedisDatabase::getWeight)).get();
this.current = getNextHealthyDatabase(null);

this.async = newRedisAsyncCommandsImpl();
this.sync = newRedisSyncCommandsImpl();
Expand All @@ -86,7 +86,7 @@ private void onCircuitBreakerStateChange(CircuitBreakerStateChangeEvent event) {
}

private void failoverFrom(RedisDatabase<C> fromDb) {
RedisDatabase<C> healthyDatabase = getHealthyDatabase(fromDb);
RedisDatabase<C> healthyDatabase = getNextHealthyDatabase(fromDb);
if (healthyDatabase != null) {
switchToDatabase(healthyDatabase.getRedisURI());
} else {
Expand All @@ -95,10 +95,9 @@ private void failoverFrom(RedisDatabase<C> fromDb) {
}
}

private RedisDatabase<C> getHealthyDatabase(RedisDatabase<C> current) {
return databases.values().stream().filter(db -> db != current)
.filter(db -> db.getCircuitBreaker().getCurrentState() == CircuitBreaker.State.CLOSED)
.max(Comparator.comparingDouble(RedisDatabase::getWeight)).get();
private RedisDatabase<C> getNextHealthyDatabase(RedisDatabase<C> current) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm my understanding getHealthStatus() reflects the state of database only based on HealthChecks that will be introduced(not considering the state of CB for given DB)?

In that case where do we ensure that failoverFrom() is not failing over toward DB with CB already in OPEN_STATE?

return databases.values().stream().filter(db -> db.getHealthStatus() == HealthStatus.HEALTHY)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve readability,I suggest something like :

private RedisDatabase<C> getNextHealthyDatabase(RedisDatabase<C> current) {
        return databases.values().stream()
                .filter(isHealthy)
                .filter(isNotCurrent(current))
                .max(DatabaseComparators.byWeight).orElse(null);
    }
    
static class DatabaseComparators {
        public static final Comparator<RedisDatabase<?>> byWeight = Comparator.comparingDouble(RedisDatabase::getWeight);
    }

static class DatabasePredicates {
        public static final Predicate<RedisDatabase<?>> isHealthy = db -> db.getHealthStatus() == HealthStatus.HEALTHY;

        public static Predicate<RedisDatabase<?>> isNotCurrent( RedisDatabase<?> current) {
            return db -> !db.equals(current);
        }
    }

.filter(db -> !db.equals(current)).max(Comparator.comparingDouble(RedisDatabase::getWeight)).orElse(null);
}

@Override
Expand Down
Loading