Skip to content

Commit f67b74c

Browse files
Addressing bulk ingester stuck threads (#870) (#873)
* signaling after successfully adding * stress test unit test * removed memory calc Co-authored-by: Laura Trotta <[email protected]>
1 parent c7ff6d3 commit f67b74c

File tree

2 files changed

+48
-0
lines changed

2 files changed

+48
-0
lines changed

java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java

+3
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,9 @@ public void add(BulkOperation operation, Context context) {
363363
if (!canAddOperation()) {
364364
flush();
365365
}
366+
else {
367+
addCondition.signalIfReady();
368+
}
366369
});
367370
}
368371

java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java

+45
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
2929
import co.elastic.clients.elasticsearch.core.bulk.OperationType;
3030
import co.elastic.clients.elasticsearch.end_to_end.RequestTest;
31+
import co.elastic.clients.elasticsearch.indices.IndicesStatsResponse;
3132
import co.elastic.clients.json.JsonpMapper;
3233
import co.elastic.clients.json.JsonpUtils;
3334
import co.elastic.clients.json.SimpleJsonpMapper;
@@ -156,6 +157,50 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads,
156157
assertEquals(expectedRequests, transport.requestsStarted.get());
157158
}
158159

160+
@Test
161+
public void multiThreadStressTest() throws InterruptedException, IOException {
162+
163+
String index = "bulk-ingester-stress-test";
164+
ElasticsearchClient client = ElasticsearchTestServer.global().client();
165+
166+
// DISCLAIMER: this configuration is highly inefficient and only used here to showcase an extreme
167+
// situation where the number of adding threads greatly exceeds the number of concurrent requests
168+
// handled by the ingester. It's strongly recommended to always tweak maxConcurrentRequests accordingly.
169+
BulkIngester<?> ingester = BulkIngester.of(b -> b
170+
.client(client)
171+
.globalSettings(s -> s.index(index))
172+
.flushInterval(5, TimeUnit.SECONDS)
173+
);
174+
175+
RequestTest.AppData appData = new RequestTest.AppData();
176+
appData.setIntValue(42);
177+
appData.setMsg("Some message");
178+
179+
ExecutorService executor = Executors.newFixedThreadPool(50);
180+
181+
for (int i = 0; i < 100000; i++) {
182+
int ii = i;
183+
Runnable thread = () -> {
184+
int finalI = ii;
185+
ingester.add(_1 -> _1
186+
.create(_2 -> _2
187+
.id(String.valueOf(finalI))
188+
.document(appData)
189+
));
190+
};
191+
executor.submit(thread);
192+
}
193+
194+
executor.awaitTermination(10,TimeUnit.SECONDS);
195+
ingester.close();
196+
197+
client.indices().refresh();
198+
199+
IndicesStatsResponse indexStats = client.indices().stats(g -> g.index(index));
200+
201+
assertTrue(indexStats.indices().get(index).primaries().docs().count()==100000);
202+
}
203+
159204
@Test
160205
public void sizeLimitTest() throws Exception {
161206
TestTransport transport = new TestTransport();

0 commit comments

Comments
 (0)