Skip to content

add JedisCluster support #164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.redislabs</groupId>
<groupId>com.falkordb</groupId>
<artifactId>jredisgraph</artifactId>
<version>2.6.0-SNAPSHOT</version>

Expand Down
28 changes: 28 additions & 0 deletions src/main/java/com/redislabs/redisgraph/Connection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.redislabs.redisgraph;

import java.io.Closeable;
import redis.clients.jedis.Client;
import redis.clients.jedis.commands.ProtocolCommand;

/**
* Connection interface
*
* This interface defines the methods that a connection to a RedisGraph instance should implement.
* It abstracts the underlying connection mechanism, allowing for different implementations to be used.
*/
public interface Connection extends Closeable {
Object sendCommand(ProtocolCommand cmd, String... args);

Object sendBlockingCommand(ProtocolCommand cmd, String... args);

void close();

String watch(String... keys);

String unwatch();

Client getClient();

void disconnect();
}

Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package com.redislabs.redisgraph;

import redis.clients.jedis.Jedis;

public interface RedisGraphContext extends RedisGraph {


/**
* Returns implementing class connection context
* @return Jedis connection
*/
Jedis getConnectionContext();
Connection getConnectionContext();

/**
* Returns a Redis transactional object, over the connection context, with graph API capabilities
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.redislabs.redisgraph.impl.api;

import com.redislabs.redisgraph.Connection;
import com.redislabs.redisgraph.RedisGraph;
import com.redislabs.redisgraph.ResultSet;
import com.redislabs.redisgraph.impl.Utils;
import redis.clients.jedis.Jedis;

import java.util.List;
import java.util.Map;
Expand All @@ -15,9 +15,9 @@ public abstract class AbstractRedisGraph implements RedisGraph {

/**
* Inherited classes should return a Jedis connection, with respect to their context
* @return Jedis connection
* @return connection
*/
protected abstract Jedis getConnection();
protected abstract Connection getConnection();

/**
* Sends a query to the redis graph. Implementation and context dependent
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.redislabs.redisgraph.impl.api;

import com.redislabs.redisgraph.Connection;

import redis.clients.jedis.Client;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.commands.ProtocolCommand;

/**
* Cluster Connection implementation of the Connection interface for RedisGraph
*/
public class ClusterConnection implements Connection {
private final JedisCluster cluster;

public ClusterConnection(JedisCluster cluster) {
this.cluster = cluster;
}

@Override
public Object sendCommand(ProtocolCommand cmd, String... args) {
return cluster.sendCommand(args[0], cmd, args);
}

@Override
public Object sendBlockingCommand(ProtocolCommand cmd, String... args) {
return cluster.sendBlockingCommand(args[0], cmd, args);
}

@Override
public void close() {
// Doesn't actually close the connection
}

@Override
public String watch(String... keys) {
throw new UnsupportedOperationException("Cluster does not support watch");
}

@Override
public String unwatch() {
throw new UnsupportedOperationException("Cluster does not support unwatch");
}

@Override
public Client getClient() {
throw new UnsupportedOperationException("Cluster does not support pipeline");
}

@Override
public void disconnect() {
cluster.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.List;

import com.redislabs.redisgraph.Connection;
import com.redislabs.redisgraph.RedisGraphContext;
import com.redislabs.redisgraph.ResultSet;
import com.redislabs.redisgraph.exceptions.JRedisGraphException;
Expand All @@ -20,14 +21,14 @@
*/
public class ContextedRedisGraph extends AbstractRedisGraph implements RedisGraphContext, RedisGraphCacheHolder {

private final Jedis connectionContext;
private final Connection connectionContext;
private RedisGraphCaches caches;

/**
* Generates a new instance with a specific Jedis connection
* @param connectionContext
*/
public ContextedRedisGraph(Jedis connectionContext) {
public ContextedRedisGraph(Connection connectionContext) {
this.connectionContext = connectionContext;
}

Expand All @@ -36,7 +37,7 @@ public ContextedRedisGraph(Jedis connectionContext) {
* @return
*/
@Override
protected Jedis getConnection() {
protected Connection getConnection() {
return this.connectionContext;
}

Expand All @@ -48,7 +49,7 @@ protected Jedis getConnection() {
*/
@Override
protected ResultSet sendQuery(String graphId, String preparedQuery) {
Jedis conn = getConnection();
Connection conn = getConnection();
try {
@SuppressWarnings("unchecked")
List<Object> rawResponse = (List<Object>) conn.sendCommand(RedisGraphCommand.QUERY, graphId, preparedQuery, Utils.COMPACT_STRING);
Expand All @@ -68,7 +69,7 @@ protected ResultSet sendQuery(String graphId, String preparedQuery) {
*/
@Override
protected ResultSet sendReadOnlyQuery(String graphId, String preparedQuery) {
Jedis conn = getConnection();
Connection conn = getConnection();
try {
@SuppressWarnings("unchecked")
List<Object> rawResponse = (List<Object>) conn.sendCommand(RedisGraphCommand.RO_QUERY, graphId, preparedQuery, Utils.COMPACT_STRING);
Expand All @@ -89,7 +90,7 @@ protected ResultSet sendReadOnlyQuery(String graphId, String preparedQuery) {
*/
@Override
protected ResultSet sendQuery(String graphId, String preparedQuery, long timeout) {
Jedis conn = getConnection();
Connection conn = getConnection();
try {
@SuppressWarnings("unchecked")
List<Object> rawResponse = (List<Object>) conn.sendBlockingCommand(RedisGraphCommand.QUERY,
Expand All @@ -111,7 +112,7 @@ protected ResultSet sendQuery(String graphId, String preparedQuery, long timeout
*/
@Override
protected ResultSet sendReadOnlyQuery(String graphId, String preparedQuery, long timeout) {
Jedis conn = getConnection();
Connection conn = getConnection();
try {
@SuppressWarnings("unchecked")
List<Object> rawResponse = (List<Object>) conn.sendBlockingCommand(RedisGraphCommand.RO_QUERY,
Expand All @@ -128,7 +129,7 @@ protected ResultSet sendReadOnlyQuery(String graphId, String preparedQuery, long
* @return Returns the instance Jedis connection.
*/
@Override
public Jedis getConnectionContext() {
public Connection getConnectionContext() {
return this.connectionContext;
}

Expand All @@ -138,8 +139,8 @@ public Jedis getConnectionContext() {
*/
@Override
public RedisGraphTransaction multi() {
Jedis jedis = getConnection();
Client client = jedis.getClient();
Connection connection = getConnection();
Client client = connection.getClient();
client.multi();
client.getOne();
RedisGraphTransaction transaction = new RedisGraphTransaction(client, this);
Expand All @@ -153,8 +154,8 @@ public RedisGraphTransaction multi() {
*/
@Override
public RedisGraphPipeline pipelined() {
Jedis jedis = getConnection();
Client client = jedis.getClient();
Connection connection = getConnection();
Client client = connection.getClient();
RedisGraphPipeline pipeline = new RedisGraphPipeline(client, this);
pipeline.setRedisGraphCaches(caches);
return pipeline;
Expand Down Expand Up @@ -186,7 +187,7 @@ public String unwatch() {
*/
@Override
public String deleteGraph(String graphId) {
Jedis conn = getConnection();
Connection conn = getConnection();
Object response;
try {
response = conn.sendCommand(RedisGraphCommand.DELETE, graphId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.redislabs.redisgraph.impl.api;

import redis.clients.jedis.Jedis;

/**
* Pooled Connection implementation of the Connection interface for RedisGraph
*/
public class PooledConnection extends SingleConnection {

public PooledConnection(Jedis jedis) {
super(jedis);
}

@Override
public void close() {
// Doesn't actually close the connection, just returns it to the pool
super.jedis.close();
}
}
25 changes: 16 additions & 9 deletions src/main/java/com/redislabs/redisgraph/impl/api/RedisGraph.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.redislabs.redisgraph.impl.api;

import com.redislabs.redisgraph.Connection;
import com.redislabs.redisgraph.RedisGraphContext;
import com.redislabs.redisgraph.RedisGraphContextGenerator;
import com.redislabs.redisgraph.ResultSet;
import com.redislabs.redisgraph.impl.graph_cache.RedisGraphCaches;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.util.Pool;
import redis.clients.jedis.util.SafeEncoder;

Expand All @@ -15,7 +17,7 @@
public class RedisGraph extends AbstractRedisGraph implements RedisGraphContextGenerator {

private final Pool<Jedis> pool;
private final Jedis jedis;
private final Connection connection;
private final RedisGraphCaches caches = new RedisGraphCaches();

/**
Expand Down Expand Up @@ -43,11 +45,16 @@ public RedisGraph(String host, int port) {
*/
public RedisGraph(Pool<Jedis> pool) {
this.pool = pool;
this.jedis = null;
this.connection = null;
}

public RedisGraph(Jedis jedis) {
this.jedis = jedis;
this.connection = new SingleConnection(jedis);
this.pool = null;
}

public RedisGraph(JedisCluster jedis) {
this.connection = new ClusterConnection(jedis);
this.pool = null;
}

Expand All @@ -56,8 +63,8 @@ public RedisGraph(Jedis jedis) {
* @return a Jedis connection
*/
@Override
protected Jedis getConnection() {
return jedis != null ? jedis : pool.getResource();
protected Connection getConnection() {
return pool == null ? connection : new PooledConnection(pool.getResource());
}

/**
Expand Down Expand Up @@ -123,15 +130,15 @@ protected ResultSet sendReadOnlyQuery(String graphId, String preparedQuery, long
}

/**
* Closes the Jedis pool
* Closes all the connections
*/
@Override
public void close() {
if (pool != null) {
pool.close();
}
if (jedis != null) {
jedis.close();
if (connection != null) {
connection.disconnect();
}
}

Expand All @@ -142,7 +149,7 @@ public void close() {
*/
@Override
public String deleteGraph(String graphId) {
try (Jedis conn = getConnection()) {
try (Connection conn = getConnection()) {
Object response = conn.sendCommand(RedisGraphCommand.DELETE, graphId);
//clear local state
caches.removeGraphCache(graphId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.redislabs.redisgraph.impl.api;

import com.redislabs.redisgraph.Connection;

import redis.clients.jedis.Client;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.commands.ProtocolCommand;

/**
* Single Connection implementation of the Connection interface for RedisGraph
*/
public class SingleConnection implements Connection {
protected final Jedis jedis;

public SingleConnection(Jedis jedis) {
this.jedis = jedis;
}

@Override
public Object sendCommand(ProtocolCommand cmd, String... args) {
return jedis.sendCommand(cmd, args);
}

@Override
public Object sendBlockingCommand(ProtocolCommand cmd, String... args) {
return jedis.sendBlockingCommand(cmd, args);
}

@Override
public void close() {
// Doesn't actually close the connection
}

@Override
public String watch(String... keys) {
return jedis.watch(keys);
}

@Override
public String unwatch() {
return jedis.unwatch();
}

@Override
public Client getClient() {
return jedis.getClient();
}

@Override
public void disconnect() {
jedis.close();
}

}
Loading