From 6730b4ea19a075a352b21d993038c8bc4957552e Mon Sep 17 00:00:00 2001 From: Guy Korland Date: Wed, 11 Sep 2024 17:30:16 +0300 Subject: [PATCH 1/4] add JedisCluster support --- .../com/redislabs/redisgraph/Connection.java | 28 ++++++++++ .../redisgraph/RedisGraphContext.java | 4 +- .../impl/api/AbstractRedisGraph.java | 6 +-- .../impl/api/ClusterConnection.java | 53 ++++++++++++++++++ .../impl/api/ContextedRedisGraph.java | 27 +++++----- .../redisgraph/impl/api/PooledConnection.java | 19 +++++++ .../redisgraph/impl/api/RedisGraph.java | 25 +++++---- .../redisgraph/impl/api/SingleConnection.java | 54 +++++++++++++++++++ 8 files changed, 188 insertions(+), 28 deletions(-) create mode 100644 src/main/java/com/redislabs/redisgraph/Connection.java create mode 100644 src/main/java/com/redislabs/redisgraph/impl/api/ClusterConnection.java create mode 100644 src/main/java/com/redislabs/redisgraph/impl/api/PooledConnection.java create mode 100644 src/main/java/com/redislabs/redisgraph/impl/api/SingleConnection.java diff --git a/src/main/java/com/redislabs/redisgraph/Connection.java b/src/main/java/com/redislabs/redisgraph/Connection.java new file mode 100644 index 0000000..83ff8f4 --- /dev/null +++ b/src/main/java/com/redislabs/redisgraph/Connection.java @@ -0,0 +1,28 @@ +package com.redislabs.redisgraph; + +import java.io.Closeable; +import redis.clients.jedis.Client; +import redis.clients.jedis.commands.ProtocolCommand; + +/** + * Connection interface + * + * This interface defines the methods that a connection to a RedisGraph instance should implement. + * It abstracts the underlying connection mechanism, allowing for different implementations to be used. + */ +public interface Connection extends Closeable { + Object sendCommand(ProtocolCommand cmd, String... args); + + Object sendBlockingCommand(ProtocolCommand cmd, String... args); + + void close(); + + String watch(String... keys); + + String unwatch(); + + Client getClient(); + + void disconnect(); +} + diff --git a/src/main/java/com/redislabs/redisgraph/RedisGraphContext.java b/src/main/java/com/redislabs/redisgraph/RedisGraphContext.java index 44d6319..5e8764f 100644 --- a/src/main/java/com/redislabs/redisgraph/RedisGraphContext.java +++ b/src/main/java/com/redislabs/redisgraph/RedisGraphContext.java @@ -1,7 +1,5 @@ package com.redislabs.redisgraph; -import redis.clients.jedis.Jedis; - public interface RedisGraphContext extends RedisGraph { @@ -9,7 +7,7 @@ public interface RedisGraphContext extends RedisGraph { * Returns implementing class connection context * @return Jedis connection */ - Jedis getConnectionContext(); + Connection getConnectionContext(); /** * Returns a Redis transactional object, over the connection context, with graph API capabilities diff --git a/src/main/java/com/redislabs/redisgraph/impl/api/AbstractRedisGraph.java b/src/main/java/com/redislabs/redisgraph/impl/api/AbstractRedisGraph.java index 57fce53..a9cb462 100644 --- a/src/main/java/com/redislabs/redisgraph/impl/api/AbstractRedisGraph.java +++ b/src/main/java/com/redislabs/redisgraph/impl/api/AbstractRedisGraph.java @@ -1,9 +1,9 @@ package com.redislabs.redisgraph.impl.api; +import com.redislabs.redisgraph.Connection; import com.redislabs.redisgraph.RedisGraph; import com.redislabs.redisgraph.ResultSet; import com.redislabs.redisgraph.impl.Utils; -import redis.clients.jedis.Jedis; import java.util.List; import java.util.Map; @@ -15,9 +15,9 @@ public abstract class AbstractRedisGraph implements RedisGraph { /** * Inherited classes should return a Jedis connection, with respect to their context - * @return Jedis connection + * @return connection */ - protected abstract Jedis getConnection(); + protected abstract Connection getConnection(); /** * Sends a query to the redis graph. Implementation and context dependent diff --git a/src/main/java/com/redislabs/redisgraph/impl/api/ClusterConnection.java b/src/main/java/com/redislabs/redisgraph/impl/api/ClusterConnection.java new file mode 100644 index 0000000..580660a --- /dev/null +++ b/src/main/java/com/redislabs/redisgraph/impl/api/ClusterConnection.java @@ -0,0 +1,53 @@ +package com.redislabs.redisgraph.impl.api; + +import com.redislabs.redisgraph.Connection; + +import redis.clients.jedis.Client; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.commands.ProtocolCommand; + +/** + * Cluster Connection implementation of the Connection interface for RedisGraph + */ +public class ClusterConnection implements Connection { + private final JedisCluster cluster; + + public ClusterConnection(JedisCluster cluster) { + this.cluster = cluster; + } + + @Override + public Object sendCommand(ProtocolCommand cmd, String... args) { + return cluster.sendCommand(args[0], cmd, args); + } + + @Override + public Object sendBlockingCommand(ProtocolCommand cmd, String... args) { + return cluster.sendBlockingCommand(args[0], cmd, args); + } + + @Override + public void close() { + // Doesn't actually close the connection + } + + @Override + public String watch(String... keys) { + throw new UnsupportedOperationException("Cluster does not support watch"); + } + + @Override + public String unwatch() { + throw new UnsupportedOperationException("Cluster does not support unwatch"); + } + + @Override + public Client getClient() { + throw new UnsupportedOperationException("Cluster does not support pipeline"); + } + + @Override + public void disconnect() { + cluster.close(); + } +} diff --git a/src/main/java/com/redislabs/redisgraph/impl/api/ContextedRedisGraph.java b/src/main/java/com/redislabs/redisgraph/impl/api/ContextedRedisGraph.java index 4d6fe34..d099f85 100644 --- a/src/main/java/com/redislabs/redisgraph/impl/api/ContextedRedisGraph.java +++ b/src/main/java/com/redislabs/redisgraph/impl/api/ContextedRedisGraph.java @@ -2,6 +2,7 @@ import java.util.List; +import com.redislabs.redisgraph.Connection; import com.redislabs.redisgraph.RedisGraphContext; import com.redislabs.redisgraph.ResultSet; import com.redislabs.redisgraph.exceptions.JRedisGraphException; @@ -20,14 +21,14 @@ */ public class ContextedRedisGraph extends AbstractRedisGraph implements RedisGraphContext, RedisGraphCacheHolder { - private final Jedis connectionContext; + private final Connection connectionContext; private RedisGraphCaches caches; /** * Generates a new instance with a specific Jedis connection * @param connectionContext */ - public ContextedRedisGraph(Jedis connectionContext) { + public ContextedRedisGraph(Connection connectionContext) { this.connectionContext = connectionContext; } @@ -36,7 +37,7 @@ public ContextedRedisGraph(Jedis connectionContext) { * @return */ @Override - protected Jedis getConnection() { + protected Connection getConnection() { return this.connectionContext; } @@ -48,7 +49,7 @@ protected Jedis getConnection() { */ @Override protected ResultSet sendQuery(String graphId, String preparedQuery) { - Jedis conn = getConnection(); + Connection conn = getConnection(); try { @SuppressWarnings("unchecked") List rawResponse = (List) conn.sendCommand(RedisGraphCommand.QUERY, graphId, preparedQuery, Utils.COMPACT_STRING); @@ -68,7 +69,7 @@ protected ResultSet sendQuery(String graphId, String preparedQuery) { */ @Override protected ResultSet sendReadOnlyQuery(String graphId, String preparedQuery) { - Jedis conn = getConnection(); + Connection conn = getConnection(); try { @SuppressWarnings("unchecked") List rawResponse = (List) conn.sendCommand(RedisGraphCommand.RO_QUERY, graphId, preparedQuery, Utils.COMPACT_STRING); @@ -89,7 +90,7 @@ protected ResultSet sendReadOnlyQuery(String graphId, String preparedQuery) { */ @Override protected ResultSet sendQuery(String graphId, String preparedQuery, long timeout) { - Jedis conn = getConnection(); + Connection conn = getConnection(); try { @SuppressWarnings("unchecked") List rawResponse = (List) conn.sendBlockingCommand(RedisGraphCommand.QUERY, @@ -111,7 +112,7 @@ protected ResultSet sendQuery(String graphId, String preparedQuery, long timeout */ @Override protected ResultSet sendReadOnlyQuery(String graphId, String preparedQuery, long timeout) { - Jedis conn = getConnection(); + Connection conn = getConnection(); try { @SuppressWarnings("unchecked") List rawResponse = (List) conn.sendBlockingCommand(RedisGraphCommand.RO_QUERY, @@ -128,7 +129,7 @@ protected ResultSet sendReadOnlyQuery(String graphId, String preparedQuery, long * @return Returns the instance Jedis connection. */ @Override - public Jedis getConnectionContext() { + public Connection getConnectionContext() { return this.connectionContext; } @@ -138,8 +139,8 @@ public Jedis getConnectionContext() { */ @Override public RedisGraphTransaction multi() { - Jedis jedis = getConnection(); - Client client = jedis.getClient(); + Connection connection = getConnection(); + Client client = connection.getClient(); client.multi(); client.getOne(); RedisGraphTransaction transaction = new RedisGraphTransaction(client, this); @@ -153,8 +154,8 @@ public RedisGraphTransaction multi() { */ @Override public RedisGraphPipeline pipelined() { - Jedis jedis = getConnection(); - Client client = jedis.getClient(); + Connection connection = getConnection(); + Client client = connection.getClient(); RedisGraphPipeline pipeline = new RedisGraphPipeline(client, this); pipeline.setRedisGraphCaches(caches); return pipeline; @@ -186,7 +187,7 @@ public String unwatch() { */ @Override public String deleteGraph(String graphId) { - Jedis conn = getConnection(); + Connection conn = getConnection(); Object response; try { response = conn.sendCommand(RedisGraphCommand.DELETE, graphId); diff --git a/src/main/java/com/redislabs/redisgraph/impl/api/PooledConnection.java b/src/main/java/com/redislabs/redisgraph/impl/api/PooledConnection.java new file mode 100644 index 0000000..c9e5c99 --- /dev/null +++ b/src/main/java/com/redislabs/redisgraph/impl/api/PooledConnection.java @@ -0,0 +1,19 @@ +package com.redislabs.redisgraph.impl.api; + +import redis.clients.jedis.Jedis; + +/** + * Pooled Connection implementation of the Connection interface for RedisGraph + */ +public class PooledConnection extends SingleConnection { + + public PooledConnection(Jedis jedis) { + super(jedis); + } + + @Override + public void close() { + // Doesn't actually close the connection, just returns it to the pool + super.jedis.close(); + } +} diff --git a/src/main/java/com/redislabs/redisgraph/impl/api/RedisGraph.java b/src/main/java/com/redislabs/redisgraph/impl/api/RedisGraph.java index d187f70..d17a0fa 100644 --- a/src/main/java/com/redislabs/redisgraph/impl/api/RedisGraph.java +++ b/src/main/java/com/redislabs/redisgraph/impl/api/RedisGraph.java @@ -1,11 +1,13 @@ package com.redislabs.redisgraph.impl.api; +import com.redislabs.redisgraph.Connection; import com.redislabs.redisgraph.RedisGraphContext; import com.redislabs.redisgraph.RedisGraphContextGenerator; import com.redislabs.redisgraph.ResultSet; import com.redislabs.redisgraph.impl.graph_cache.RedisGraphCaches; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisCluster; import redis.clients.jedis.util.Pool; import redis.clients.jedis.util.SafeEncoder; @@ -15,7 +17,7 @@ public class RedisGraph extends AbstractRedisGraph implements RedisGraphContextGenerator { private final Pool pool; - private final Jedis jedis; + private final Connection connection; private final RedisGraphCaches caches = new RedisGraphCaches(); /** @@ -43,11 +45,16 @@ public RedisGraph(String host, int port) { */ public RedisGraph(Pool pool) { this.pool = pool; - this.jedis = null; + this.connection = null; } public RedisGraph(Jedis jedis) { - this.jedis = jedis; + this.connection = new SingleConnection(jedis); + this.pool = null; + } + + public RedisGraph(JedisCluster jedis) { + this.connection = null; this.pool = null; } @@ -56,8 +63,8 @@ public RedisGraph(Jedis jedis) { * @return a Jedis connection */ @Override - protected Jedis getConnection() { - return jedis != null ? jedis : pool.getResource(); + protected Connection getConnection() { + return pool == null ? connection : new SingleConnection(pool.getResource()); } /** @@ -123,15 +130,15 @@ protected ResultSet sendReadOnlyQuery(String graphId, String preparedQuery, long } /** - * Closes the Jedis pool + * Closes all the connections */ @Override public void close() { if (pool != null) { pool.close(); } - if (jedis != null) { - jedis.close(); + if (connection != null) { + connection.disconnect(); } } @@ -142,7 +149,7 @@ public void close() { */ @Override public String deleteGraph(String graphId) { - try (Jedis conn = getConnection()) { + try (Connection conn = getConnection()) { Object response = conn.sendCommand(RedisGraphCommand.DELETE, graphId); //clear local state caches.removeGraphCache(graphId); diff --git a/src/main/java/com/redislabs/redisgraph/impl/api/SingleConnection.java b/src/main/java/com/redislabs/redisgraph/impl/api/SingleConnection.java new file mode 100644 index 0000000..de8c616 --- /dev/null +++ b/src/main/java/com/redislabs/redisgraph/impl/api/SingleConnection.java @@ -0,0 +1,54 @@ +package com.redislabs.redisgraph.impl.api; + +import com.redislabs.redisgraph.Connection; + +import redis.clients.jedis.Client; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.commands.ProtocolCommand; + +/** + * Single Connection implementation of the Connection interface for RedisGraph +*/ +public class SingleConnection implements Connection { + protected final Jedis jedis; + + public SingleConnection(Jedis jedis) { + this.jedis = jedis; + } + + @Override + public Object sendCommand(ProtocolCommand cmd, String... args) { + return jedis.sendCommand(cmd, args); + } + + @Override + public Object sendBlockingCommand(ProtocolCommand cmd, String... args) { + return jedis.sendBlockingCommand(cmd, args); + } + + @Override + public void close() { + // Doesn't actually close the connection + } + + @Override + public String watch(String... keys) { + return jedis.watch(keys); + } + + @Override + public String unwatch() { + return jedis.unwatch(); + } + + @Override + public Client getClient() { + return jedis.getClient(); + } + + @Override + public void disconnect() { + jedis.close(); + } + +} From 71756165fb83e0f4469fcea7fffcc656a479229a Mon Sep 17 00:00:00 2001 From: Guy Korland Date: Wed, 11 Sep 2024 18:16:12 +0300 Subject: [PATCH 2/4] fix connection allocation and index test --- .../com/redislabs/redisgraph/impl/api/RedisGraph.java | 2 +- .../com/redislabs/redisgraph/RedisGraphAPITest.java | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/redislabs/redisgraph/impl/api/RedisGraph.java b/src/main/java/com/redislabs/redisgraph/impl/api/RedisGraph.java index d17a0fa..ae70965 100644 --- a/src/main/java/com/redislabs/redisgraph/impl/api/RedisGraph.java +++ b/src/main/java/com/redislabs/redisgraph/impl/api/RedisGraph.java @@ -64,7 +64,7 @@ public RedisGraph(JedisCluster jedis) { */ @Override protected Connection getConnection() { - return pool == null ? connection : new SingleConnection(pool.getResource()); + return pool == null ? connection : new PooledConnection(pool.getResource()); } /** diff --git a/src/test/java/com/redislabs/redisgraph/RedisGraphAPITest.java b/src/test/java/com/redislabs/redisgraph/RedisGraphAPITest.java index 6fc132a..bc2777a 100644 --- a/src/test/java/com/redislabs/redisgraph/RedisGraphAPITest.java +++ b/src/test/java/com/redislabs/redisgraph/RedisGraphAPITest.java @@ -150,10 +150,12 @@ public void testIndex() { Assert.assertNotNull(createNonExistingIndexResult.getStatistics().getStringValue(Label.INDICES_ADDED)); Assert.assertEquals(1, createNonExistingIndexResult.getStatistics().indicesAdded()); - ResultSet createExistingIndexResult = client.query("social", "CREATE INDEX ON :person(age)"); - Assert.assertFalse(createExistingIndexResult.hasNext()); - Assert.assertNotNull(createExistingIndexResult.getStatistics().getStringValue(Label.INDICES_ADDED)); - Assert.assertEquals(0, createExistingIndexResult.getStatistics().indicesAdded()); + try { + ResultSet createExistingIndexResult = client.query("social", "CREATE INDEX ON :person(age)"); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("Attribute 'age' is already indexed")); + } ResultSet deleteExistingIndexResult = client.query("social", "DROP INDEX ON :person(age)"); Assert.assertFalse(deleteExistingIndexResult.hasNext()); From eee9db4d92299ce44e568427414dea0b195647ee Mon Sep 17 00:00:00 2001 From: Guy Korland Date: Wed, 11 Sep 2024 18:24:41 +0300 Subject: [PATCH 3/4] rename mvn package --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index fd096c0..81ad95e 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - com.redislabs + com.falkordb jredisgraph 2.6.0-SNAPSHOT From f570c7cebcf0fb991ed8c93314c850fc9455737c Mon Sep 17 00:00:00 2001 From: Guy Korland Date: Wed, 11 Sep 2024 18:25:45 +0300 Subject: [PATCH 4/4] set Cluster connection --- src/main/java/com/redislabs/redisgraph/impl/api/RedisGraph.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/redislabs/redisgraph/impl/api/RedisGraph.java b/src/main/java/com/redislabs/redisgraph/impl/api/RedisGraph.java index ae70965..15323ad 100644 --- a/src/main/java/com/redislabs/redisgraph/impl/api/RedisGraph.java +++ b/src/main/java/com/redislabs/redisgraph/impl/api/RedisGraph.java @@ -54,7 +54,7 @@ public RedisGraph(Jedis jedis) { } public RedisGraph(JedisCluster jedis) { - this.connection = null; + this.connection = new ClusterConnection(jedis); this.pool = null; }