Skip to content

Commit 4299e12

Browse files
authored
[DE-1071] Filter pushdown (#20)
This PR implements filter pushdown optimization for ArangoDB TinkerPop integration by translating Gremlin predicates into AQL queries. The implementation adds query optimization to reduce data transfer and improve performance. This PR implements the push down only for filters of type: org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasStep. For example this optimizes queries like g.V().has("value", "foo"), by reading from the database only the matching vertices.
1 parent ab2b4eb commit 4299e12

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2104
-50
lines changed

spotbugs/spotbugs-exclude.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,9 @@
3535
<Bug pattern="SE_BAD_FIELD"/>
3636
</Match>
3737

38+
<Match>
39+
<Class name="com.arangodb.tinkerpop.gremlin.process.traversal.strategy.optimization.ArangoStepStrategy"/>
40+
<Bug pattern="SING_SINGLETON_IMPLEMENTS_SERIALIZABLE"/>
41+
</Match>
42+
3843
</FindBugsFilter>

src/main/java/com/arangodb/tinkerpop/gremlin/client/ArangoDBGraphClient.java

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.*;
2020
import java.util.stream.Collectors;
21+
import java.util.stream.Stream;
2122

2223
import com.arangodb.*;
2324
import com.arangodb.entity.*;
@@ -28,12 +29,14 @@
2829
import com.arangodb.serde.jackson.JacksonSerde;
2930
import com.arangodb.tinkerpop.gremlin.persistence.*;
3031
import com.arangodb.tinkerpop.gremlin.persistence.serde.SerdeModule;
32+
import com.arangodb.tinkerpop.gremlin.process.filter.ArangoFilter;
33+
import com.arangodb.tinkerpop.gremlin.process.filter.EmptyFilter;
34+
import com.arangodb.tinkerpop.gremlin.process.filter.FilterSupport;
3135
import com.arangodb.tinkerpop.gremlin.structure.*;
3236
import com.arangodb.tinkerpop.gremlin.utils.AqlDeserializer;
3337
import com.fasterxml.jackson.databind.*;
3438
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
3539
import org.apache.tinkerpop.gremlin.structure.Direction;
36-
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
3740
import org.slf4j.Logger;
3841
import org.slf4j.LoggerFactory;
3942

@@ -110,15 +113,25 @@ public void updateGraphVariables(VariablesData document) {
110113
}
111114
}
112115

116+
public Stream<VertexData> getGraphVertices(List<ElementId> ids, ArangoFilter filter, Set<String> colNames) {
117+
logger.debug("Get all {} graph vertices, filtered by AQL filters", config.graphName);
118+
return getGraphDocuments(ids, filter, colNames, VertexData.class);
119+
}
120+
121+
public Stream<EdgeData> getGraphEdges(List<ElementId> ids, ArangoFilter filter, Set<String> colNames) {
122+
logger.debug("Get all {} graph edges, filtered by AQL filters", config.graphName);
123+
return getGraphDocuments(ids, filter, colNames, EdgeData.class);
124+
}
125+
113126
/**
114127
* Get vertices of a graph. If no ids are provided, get all vertices.
115128
*
116129
* @param ids the ids to match
117130
* @return the documents
118131
*/
119-
public ArangoIterable<VertexData> getGraphVertices(final List<ElementId> ids) {
132+
public Stream<VertexData> getGraphVertices(final List<ElementId> ids) {
120133
logger.debug("Get all {} graph vertices, filtered by ids: {}", config.graphName, ids);
121-
return getGraphDocuments(ids, config.vertices, VertexData.class);
134+
return getGraphDocuments(ids, EmptyFilter.instance(), config.vertices, VertexData.class);
122135
}
123136

124137
/**
@@ -127,19 +140,28 @@ public ArangoIterable<VertexData> getGraphVertices(final List<ElementId> ids) {
127140
* @param ids the ids to match
128141
* @return the documents
129142
*/
130-
public ArangoIterable<EdgeData> getGraphEdges(List<ElementId> ids) {
143+
public Stream<EdgeData> getGraphEdges(List<ElementId> ids) {
131144
logger.debug("Get all {} graph edges, filtered by ids: {}", config.graphName, ids);
132-
return getGraphDocuments(ids, config.edges, EdgeData.class);
145+
return getGraphDocuments(ids, EmptyFilter.instance(), config.edges, EdgeData.class);
133146
}
134147

135-
private <V> ArangoIterable<V> getGraphDocuments(List<ElementId> ids, Set<String> colNames, Class<V> clazz) {
148+
private <V> Stream<V> getGraphDocuments(List<ElementId> ids, ArangoFilter filter, Set<String> colNames, Class<V> clazz) {
136149
if (ids.isEmpty()) {
137-
return query(ArangoDBQueryBuilder.readAllDocuments(colNames), clazz, null);
150+
if (colNames.isEmpty()) {
151+
return Stream.empty();
152+
}
153+
return query(ArangoDBQueryBuilder.readAllDocuments(colNames, filter), clazz, null);
138154
} else {
139155
List<ElementId> prunedIds = ids.stream()
140156
.filter(it -> colNames.contains(it.getCollection()))
141157
.collect(Collectors.toList());
142-
return query("FOR d IN DOCUMENT(@ids) RETURN d", clazz, Collections.singletonMap("ids", prunedIds));
158+
StringBuilder query = new StringBuilder();
159+
query.append("FOR d IN DOCUMENT(@ids)");
160+
if (filter.getSupport() != FilterSupport.NONE) {
161+
query.append(" FILTER ").append(filter.toAql("d"));
162+
}
163+
query.append(" RETURN d");
164+
return query(query.toString(), clazz, Collections.singletonMap("ids", prunedIds));
143165
}
144166
}
145167

@@ -191,19 +213,19 @@ public ArangoDB getArangoDriver() {
191213
return db.arango();
192214
}
193215

194-
public Iterator<Object> query(final String query, final Map<String, ?> parameters, final AqlQueryOptions options) {
195-
Iterator<JsonNode> res = query(query, JsonNode.class, parameters, options);
196-
return IteratorUtils.map(res, aqlDeserializer::deserialize);
216+
public Stream<Object> query(final String query, final Map<String, ?> parameters, final AqlQueryOptions options) {
217+
return query(query, JsonNode.class, parameters, options)
218+
.map(aqlDeserializer::deserialize);
197219
}
198220

199-
private <V> ArangoCursor<V> query(String query, Class<V> type, Map<String, ?> parameters) {
221+
private <V> Stream<V> query(String query, Class<V> type, Map<String, ?> parameters) {
200222
return query(query, type, parameters, new AqlQueryOptions());
201223
}
202224

203-
private <V> ArangoCursor<V> query(String query, Class<V> type, Map<String, ?> parameters, AqlQueryOptions options) {
225+
private <V> Stream<V> query(String query, Class<V> type, Map<String, ?> parameters, AqlQueryOptions options) {
204226
logger.debug("Executing AQL query: {}, with parameters: {}, with options: {}", query, parameters, options);
205227
try {
206-
return db.query(query, type, parameters, options);
228+
return db.query(query, type, parameters, options).stream();
207229
} catch (ArangoDBException e) {
208230
throw mapException(e);
209231
}
@@ -303,7 +325,7 @@ public void updateVertex(ArangoDBVertex vertex) {
303325
logger.debug("Document updated, new rev {}", vertexEntity.getRev());
304326
}
305327

306-
public Iterator<VertexData> getVertexNeighbors(ElementId vertexId, Set<String> edgeCollections, Direction direction, String[] labels) {
328+
public Stream<VertexData> getVertexNeighbors(ElementId vertexId, Set<String> edgeCollections, Direction direction, String[] labels) {
307329
logger.debug("Get vertex {}:{} Neighbors, in {}, from collections {}", vertexId, direction, config.graphName, edgeCollections);
308330
String query = ArangoDBQueryBuilder.readVertexNeighbors(config.graphName, direction, config, labels);
309331
Map<String, Object> params = new HashMap<>();
@@ -315,7 +337,7 @@ public Iterator<VertexData> getVertexNeighbors(ElementId vertexId, Set<String> e
315337
return query(query, VertexData.class, params);
316338
}
317339

318-
public Iterator<EdgeData> getVertexEdges(ElementId vertexId, Set<String> edgeCollections, Direction direction, String[] labels) {
340+
public Stream<EdgeData> getVertexEdges(ElementId vertexId, Set<String> edgeCollections, Direction direction, String[] labels) {
319341
logger.debug("Get vertex {}:{} Edges, in {}, from collections {}", vertexId, direction, config.graphName, edgeCollections);
320342
String query = ArangoDBQueryBuilder.readVertexEdges(config.graphName, direction, config, labels);
321343
Map<String, Object> params = new HashMap<>();

src/main/java/com/arangodb/tinkerpop/gremlin/client/ArangoDBQueryBuilder.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.util.Set;
2020
import java.util.stream.Collectors;
2121

22+
import com.arangodb.tinkerpop.gremlin.process.filter.ArangoFilter;
23+
import com.arangodb.tinkerpop.gremlin.process.filter.FilterSupport;
2224
import com.arangodb.tinkerpop.gremlin.structure.ArangoDBGraphConfig;
2325
import org.apache.tinkerpop.gremlin.structure.Direction;
2426

@@ -57,26 +59,33 @@ private static StringBuilder oneStepTraversal(String graphName, Direction direct
5759
return query;
5860
}
5961

60-
public static String readAllDocuments(Set<String> collections) {
62+
static String readAllDocuments(Set<String> collections, ArangoFilter filter) {
6163
if (collections.isEmpty()) {
6264
throw new IllegalArgumentException();
6365
} else if (collections.size() == 1) {
64-
return readAllDocumentsFromSingleCollection(collections.iterator().next());
66+
return readAllDocumentsFromSingleCollection(collections.iterator().next(), filter);
6567
} else {
66-
return readAllDocumentsFromMultipleCollections(collections);
68+
return readAllDocumentsFromMultipleCollections(collections, filter);
6769
}
6870
}
6971

70-
private static String readAllDocumentsFromMultipleCollections(Set<String> collections) {
72+
private static String readAllDocumentsFromMultipleCollections(Set<String> collections, ArangoFilter filter) {
7173
String inner = collections.stream()
72-
.map(ArangoDBQueryBuilder::readAllDocumentsFromSingleCollection)
74+
.map(it -> ArangoDBQueryBuilder.readAllDocumentsFromSingleCollection(it, filter))
7375
.map(it -> "(" + it + ")")
7476
.collect(Collectors.joining(","));
7577
return String.format("FOR d in UNION(%s) RETURN d", inner);
7678
}
7779

78-
private static String readAllDocumentsFromSingleCollection(String collection) {
79-
return String.format("FOR x IN %s RETURN x", escape(collection));
80+
private static String readAllDocumentsFromSingleCollection(String collection, ArangoFilter filter) {
81+
StringBuilder query = new StringBuilder()
82+
.append("FOR x IN ")
83+
.append(escape(collection));
84+
if (filter.getSupport() != FilterSupport.NONE) {
85+
query.append(" FILTER ").append(filter.toAql("x"));
86+
}
87+
query.append(" RETURN x");
88+
return query.toString();
8089
}
8190

8291
private static String escape(String collection) {
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2025 ArangoDB GmbH and The University of York
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.arangodb.tinkerpop.gremlin.process.filter;
18+
19+
import java.util.Collection;
20+
import java.util.List;
21+
import java.util.stream.Collectors;
22+
23+
public class AndFilter implements ArangoFilter {
24+
private final List<ArangoFilter> filters;
25+
26+
public static ArangoFilter of(Collection<ArangoFilter> filters) {
27+
List<ArangoFilter> supportedFilters = filters.stream()
28+
.filter(it -> it.getSupport() != FilterSupport.NONE)
29+
.collect(Collectors.toList());
30+
if (supportedFilters.isEmpty()) {
31+
return EmptyFilter.instance();
32+
} else if (supportedFilters.size() == 1) {
33+
return supportedFilters.get(0);
34+
} else {
35+
return new AndFilter(supportedFilters);
36+
}
37+
}
38+
39+
private AndFilter(List<ArangoFilter> filters) {
40+
this.filters = filters;
41+
}
42+
43+
/**
44+
* +---------++---------+---------+---------+
45+
* | AND || FULL | PARTIAL | NONE |
46+
* +---------++---------+---------+---------+
47+
* | FULL || FULL | PARTIAL | PARTIAL |
48+
* | PARTIAL || PARTIAL | PARTIAL | PARTIAL |
49+
* | NONE || PARTIAL | PARTIAL | NONE |
50+
* +---------++---------+---------+---------+
51+
*/
52+
@Override
53+
public FilterSupport getSupport() {
54+
if (filters.stream().map(ArangoFilter::getSupport).allMatch(FilterSupport.FULL::equals)) {
55+
return FilterSupport.FULL;
56+
} else {
57+
return FilterSupport.PARTIAL;
58+
}
59+
}
60+
61+
@Override
62+
public String toAql(String variableName) {
63+
return filters.stream()
64+
.map(it -> it.toAql(variableName))
65+
.collect(Collectors.joining(" AND ", "(", ")"));
66+
}
67+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2025 ArangoDB GmbH and The University of York
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.arangodb.tinkerpop.gremlin.process.filter;
18+
19+
import org.apache.tinkerpop.gremlin.process.traversal.P;
20+
import org.apache.tinkerpop.gremlin.process.traversal.util.AndP;
21+
import org.apache.tinkerpop.gremlin.process.traversal.util.OrP;
22+
23+
import java.util.Arrays;
24+
import java.util.Collection;
25+
import java.util.regex.Pattern;
26+
import java.util.stream.Collectors;
27+
28+
public interface ArangoFilter {
29+
static ArangoFilter of(String key, P<?> p) {
30+
String pName = p.getPredicateName();
31+
switch (pName) {
32+
case "eq":
33+
return new CompareEqFilter(key, p.getValue());
34+
case "neq":
35+
return NotFilter.of(new CompareEqFilter(key, p.getValue()));
36+
case "lt":
37+
return new CompareLtFilter(key, p.getValue());
38+
case "lte":
39+
return OrFilter.of(Arrays.asList(new CompareLtFilter(key, p.getValue()), new CompareEqFilter(key, p.getValue())));
40+
case "gt":
41+
return new CompareGtFilter(key, p.getValue());
42+
case "gte":
43+
return OrFilter.of(Arrays.asList(new CompareGtFilter(key, p.getValue()), new CompareEqFilter(key, p.getValue())));
44+
case "within":
45+
return new ContainsWithinFilter(key, (Collection<?>) p.getValue());
46+
case "without":
47+
return NotFilter.of(new ContainsWithinFilter(key, (Collection<?>) p.getValue()));
48+
case "containing":
49+
return new TextContainingFilter(key, (String) p.getValue());
50+
case "notContaining":
51+
return NotFilter.of(new TextContainingFilter(key, (String) p.getValue()));
52+
case "endingWith":
53+
return new TextRegexFilter(key, Pattern.quote((String) p.getValue()) + "$");
54+
case "notEndingWith":
55+
return NotFilter.of(new TextRegexFilter(key, Pattern.quote((String) p.getValue()) + "$"));
56+
case "startingWith":
57+
return new TextStartingWithFilter(key, (String) p.getValue());
58+
case "notStartingWith":
59+
return NotFilter.of(new TextStartingWithFilter(key, (String) p.getValue()));
60+
case "regex":
61+
return new TextRegexFilter(key, (String) p.getValue());
62+
case "notRegex":
63+
return NotFilter.of(new TextRegexFilter(key, (String) p.getValue()));
64+
case "or":
65+
if (p instanceof OrP) {
66+
return OrFilter.of(((OrP<?>) p).getPredicates().stream()
67+
.map(it -> of(key, it))
68+
.collect(Collectors.toList()));
69+
}
70+
throw new UnsupportedOperationException("Unsupported predicate: " + p);
71+
case "and":
72+
if (p instanceof AndP) {
73+
return AndFilter.of(((AndP<?>) p).getPredicates().stream()
74+
.map(it -> of(key, it))
75+
.collect(Collectors.toList()));
76+
}
77+
throw new UnsupportedOperationException("Unsupported predicate: " + p);
78+
default:
79+
throw new UnsupportedOperationException("Unsupported predicate: " + p);
80+
}
81+
}
82+
83+
FilterSupport getSupport();
84+
85+
String toAql(String variableName);
86+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2025 ArangoDB GmbH and The University of York
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.arangodb.tinkerpop.gremlin.process.filter;
18+
19+
20+
import com.arangodb.tinkerpop.gremlin.process.value.Value;
21+
22+
import java.util.Objects;
23+
24+
public class CompareEqFilter implements ArangoFilter {
25+
26+
private final String attribute;
27+
private final Value value;
28+
29+
public CompareEqFilter(String attribute, Object value) {
30+
Objects.requireNonNull(attribute, "attribute cannot be null");
31+
if (attribute.isEmpty()) {
32+
throw new IllegalArgumentException("attribute cannot be empty");
33+
}
34+
this.attribute = attribute;
35+
this.value = Value.of(value);
36+
}
37+
38+
@Override
39+
public FilterSupport getSupport() {
40+
return value.getSupport();
41+
}
42+
43+
@Override
44+
public String toAql(String variableName) {
45+
return "`" + variableName + "`.`" + attribute + "` == " + value.toAql();
46+
}
47+
}

0 commit comments

Comments
 (0)