Skip to content

Commit c8c062e

Browse files
Addressing bulk ingester stuck threads (#870) (#872)
* signaling after successfully adding * stress test unit test * removed memory calc Co-authored-by: Laura Trotta <[email protected]>
1 parent f660a7e commit c8c062e

File tree

2 files changed

+48
-0
lines changed

2 files changed

+48
-0
lines changed

java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java

+3
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,9 @@ public void add(BulkOperation operation, Context context) {
358358
if (!canAddOperation()) {
359359
flush();
360360
}
361+
else {
362+
addCondition.signalIfReady();
363+
}
361364
});
362365
}
363366

java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java

+45
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
2929
import co.elastic.clients.elasticsearch.core.bulk.OperationType;
3030
import co.elastic.clients.elasticsearch.end_to_end.RequestTest;
31+
import co.elastic.clients.elasticsearch.indices.IndicesStatsResponse;
3132
import co.elastic.clients.json.JsonpMapper;
3233
import co.elastic.clients.json.SimpleJsonpMapper;
3334
import co.elastic.clients.transport.ElasticsearchTransport;
@@ -146,6 +147,50 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads,
146147
assertEquals(expectedRequests, transport.requestsStarted.get());
147148
}
148149

150+
@Test
151+
public void multiThreadStressTest() throws InterruptedException, IOException {
152+
153+
String index = "bulk-ingester-stress-test";
154+
ElasticsearchClient client = ElasticsearchTestServer.global().client();
155+
156+
// DISCLAIMER: this configuration is highly inefficient and only used here to showcase an extreme
157+
// situation where the number of adding threads greatly exceeds the number of concurrent requests
158+
// handled by the ingester. It's strongly recommended to always tweak maxConcurrentRequests accordingly.
159+
BulkIngester<?> ingester = BulkIngester.of(b -> b
160+
.client(client)
161+
.globalSettings(s -> s.index(index))
162+
.flushInterval(5, TimeUnit.SECONDS)
163+
);
164+
165+
RequestTest.AppData appData = new RequestTest.AppData();
166+
appData.setIntValue(42);
167+
appData.setMsg("Some message");
168+
169+
ExecutorService executor = Executors.newFixedThreadPool(50);
170+
171+
for (int i = 0; i < 100000; i++) {
172+
int ii = i;
173+
Runnable thread = () -> {
174+
int finalI = ii;
175+
ingester.add(_1 -> _1
176+
.create(_2 -> _2
177+
.id(String.valueOf(finalI))
178+
.document(appData)
179+
));
180+
};
181+
executor.submit(thread);
182+
}
183+
184+
executor.awaitTermination(10,TimeUnit.SECONDS);
185+
ingester.close();
186+
187+
client.indices().refresh();
188+
189+
IndicesStatsResponse indexStats = client.indices().stats(g -> g.index(index));
190+
191+
assertTrue(indexStats.indices().get(index).primaries().docs().count()==100000);
192+
}
193+
149194
@Test
150195
public void sizeLimitTest() throws Exception {
151196
TestTransport transport = new TestTransport();

0 commit comments

Comments
 (0)