Skip to content

Commit

Permalink
Add deadline cancellation for indexing (#612)
Browse files Browse the repository at this point in the history
  • Loading branch information
aprudhomme authored Jan 2, 2024
1 parent 003e652 commit 70727c2
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 19 deletions.
45 changes: 26 additions & 19 deletions src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -794,10 +794,13 @@ public void onNext(AddDocumentRequest addDocumentRequest) {
"indexing addDocumentRequestQueue size: %s, total: %s",
addDocumentRequestQueue.size(), getCount(indexName)));
try {
DeadlineUtils.checkDeadline("addDocuments: onNext", "INDEXING");

List<AddDocumentRequest> addDocRequestList = new ArrayList<>(addDocumentRequestQueue);
Future<Long> future =
globalState.submitIndexingTask(
new DocumentIndexer(globalState, addDocRequestList, indexName));
Context.current()
.wrap(new DocumentIndexer(globalState, addDocRequestList, indexName)));
futures.put(indexName, future);
} catch (Exception e) {
responseObserver.onError(e);
Expand All @@ -821,6 +824,8 @@ private String onCompletedForIndex(String indexName) {
"onCompleted, addDocumentRequestQueue: %s", addDocumentRequestQueue.size()));
long highestGen = -1;
try {
DeadlineUtils.checkDeadline("addDocuments: onCompletedForIndex", "INDEXING");

// index the left over docs
if (!addDocumentRequestQueue.isEmpty()) {
logger.debug(
Expand Down Expand Up @@ -874,24 +879,26 @@ private String onCompletedForIndex(String indexName) {
public void onCompleted() {
try {
globalState.submitIndexingTask(
() -> {
try {
// TODO: this should return a map on index to genId in the response
String genId = "-1";
for (String indexName : addDocumentRequestQueueMap.keySet()) {
genId = onCompletedForIndex(indexName);
}
responseObserver.onNext(
AddDocumentResponse.newBuilder()
.setGenId(genId)
.setPrimaryId(globalState.getEphemeralId())
.build());
responseObserver.onCompleted();
} catch (Throwable t) {
responseObserver.onError(t);
}
return null;
});
Context.current()
.wrap(
() -> {
try {
// TODO: this should return a map on index to genId in the response
String genId = "-1";
for (String indexName : addDocumentRequestQueueMap.keySet()) {
genId = onCompletedForIndex(indexName);
}
responseObserver.onNext(
AddDocumentResponse.newBuilder()
.setGenId(genId)
.setPrimaryId(globalState.getEphemeralId())
.build());
responseObserver.onCompleted();
} catch (Throwable t) {
responseObserver.onError(t);
}
return null;
}));
} catch (RejectedExecutionException e) {
logger.error("Threadpool is full, unable to submit indexing completion job");
responseObserver.onError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.protobuf.ProtocolStringList;
import com.yelp.nrtsearch.server.grpc.AddDocumentRequest;
import com.yelp.nrtsearch.server.grpc.DeadlineUtils;
import com.yelp.nrtsearch.server.grpc.FacetHierarchyPath;
import com.yelp.nrtsearch.server.luceneserver.field.FieldDef;
import com.yelp.nrtsearch.server.luceneserver.field.IdFieldDef;
Expand Down Expand Up @@ -181,6 +182,8 @@ public DocumentIndexer(
}

public long runIndexingJob() throws Exception {
DeadlineUtils.checkDeadline("DocumentIndexer: runIndexingJob", "INDEXING");

logger.debug(
String.format(
"running indexing job on threadId: %s",
Expand Down

0 comments on commit 70727c2

Please sign in to comment.