Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ES May Permanently Block During Concurrent Query Operations #963

Open
EmptyDreams opened this issue Mar 18, 2025 · 4 comments
Open

ES May Permanently Block During Concurrent Query Operations #963

EmptyDreams opened this issue Mar 18, 2025 · 4 comments

Comments

@EmptyDreams
Copy link

EmptyDreams commented Mar 18, 2025

Java API client version

8.17.0

Java version

21

Elasticsearch Version

8.10.2

Problem description

When I use concurrent requests to ES with the code below, the ES Java API may permanently block the current thread, eventually depleting my thread pool resources and causing my program to completely freeze.

My code is:

@JvmStatic
internal val client = run {
    RestClient.builder(HttpHost.create(LogScannerMain.main.pluginConfig.esHost))
        .setCompressionEnabled(true)
        .build()
}

@JvmStatic
private suspend fun postSearch(request: SearchRequest, view: String): EsResponse {
    val response = withContext(Dispatchers.IO) {
        suspendCoroutine {
            client.performRequestAsync(Request("POST", "/$view/_search").apply {
                val jsonText = FastJsonProvider.createGenerator().use { jsonGenerator ->
                    request.serialize(jsonGenerator, FastJsonMapper())
                    jsonGenerator.toString()
                }
                LogScannerMain.main.logger.trace { "post search: $jsonText" }
                entity = NStringEntity(jsonText, ContentType.APPLICATION_JSON)
            }, SuspendResponseListener(it))
        }
    }
    val text = EntityUtils.toString(response.entity)
    return JSON.parseObject(text, EsResponse::class.java)
}

When ES blocks, the ES-related dump content obtained from jstack is as follows:

"elasticsearch-rest-client-0-thread-1" #48 [8978] daemon prio=5 os_prio=0 cpu=90.04ms elapsed=1506.58s tid=0x0000ffff701390f0 nid=8978 runnable  [0x0000ffff42dde000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPoll.wait([email protected]/Native Method)
        at sun.nio.ch.EPollSelectorImpl.doSelect([email protected]/EPollSelectorImpl.java:121)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect([email protected]/SelectorImpl.java:130)
        - locked <0x00000006c17fcae8> (a sun.nio.ch.Util$2)
        - locked <0x00000006c17fca98> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select([email protected]/SelectorImpl.java:142)
        at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:343)
        at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
        at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
        at java.lang.Thread.runWith([email protected]/Thread.java:1596)
        at java.lang.Thread.run([email protected]/Thread.java:1583)

"elasticsearch-rest-client-0-thread-2" #49 [8979] daemon prio=5 os_prio=0 cpu=145.01ms elapsed=1506.57s tid=0x0000ffffa80ea4c0 nid=8979 runnable  [0x0000ffff42be0000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPoll.wait([email protected]/Native Method)
        at sun.nio.ch.EPollSelectorImpl.doSelect([email protected]/EPollSelectorImpl.java:121)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect([email protected]/SelectorImpl.java:130)
        - locked <0x00000006c18676a0> (a sun.nio.ch.Util$2)
        - locked <0x00000006c17fcef0> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select([email protected]/SelectorImpl.java:142)
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:255)
        at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
        at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
        at java.lang.Thread.runWith([email protected]/Thread.java:1596)
        at java.lang.Thread.run([email protected]/Thread.java:1583)

"elasticsearch-rest-client-0-thread-3" #50 [8980] daemon prio=5 os_prio=0 cpu=192.02ms elapsed=1506.57s tid=0x0000ffffa82d17c0 nid=8980 runnable  [0x0000ffff429e2000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPoll.wait([email protected]/Native Method)
        at sun.nio.ch.EPollSelectorImpl.doSelect([email protected]/EPollSelectorImpl.java:121)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect([email protected]/SelectorImpl.java:130)
        - locked <0x00000006c1882158> (a sun.nio.ch.Util$2)
        - locked <0x00000006c186f938> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select([email protected]/SelectorImpl.java:142)
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:255)
        at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
        at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
        at java.lang.Thread.runWith([email protected]/Thread.java:1596)
        at java.lang.Thread.run([email protected]/Thread.java:1583)

@l-trotta
Copy link
Contributor

Hello! First of all this is not really the java client being used, it's the underlying low level rest client, but this shouldn't be happening anyways. I'm not too familiar with Kotlin so I can't understand if anything's wrong just by looking at the code. Would it be possible to have a small reproducer for the issue?

@EmptyDreams
Copy link
Author

Hello! First of all this is not really the java client being used, it's the underlying low level rest client, but this shouldn't be happening anyways. I'm not too familiar with Kotlin so I can't understand if anything's wrong just by looking at the code. Would it be possible to have a small reproducer for the issue?

Yes, I actually only used the ES package to construct the DSL, but the issue still occurred. This might be because the problem lies in the network layer API.

The functionality of my Kotlin code above is similar to the Java code below:

import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.ResponseListener;
import com.alibaba.fastjson.JSON;
import java.util.concurrent.CompletableFuture;

public class ElasticsearchClient {

    private static final RestClient client = createClient();
    
    private static RestClient createClient() {
        return RestClient.builder(HttpHost.create(LogScannerMain.main.pluginConfig.esHost))
                .setCompressionEnabled(true)
                .build();
    }
    
    public static CompletableFuture<EsResponse> postSearchAsync(SearchRequest request, String view) {
        CompletableFuture<EsResponse> future = new CompletableFuture<>();
        
        try {
            Request esRequest = new Request("POST", "/" + view + "/_search");
            
            FastJsonProvider jsonProvider = new FastJsonProvider();
            FastJsonMapper mapper = new FastJsonMapper();
            String jsonText;
            try (FastJsonGenerator jsonGenerator = jsonProvider.createGenerator()) {
                request.serialize(jsonGenerator, mapper);
                jsonText = jsonGenerator.toString();
            }
            
            LogScannerMain.main.logger.trace("post search: " + jsonText);
            esRequest.setEntity(new NStringEntity(jsonText, ContentType.APPLICATION_JSON));
            
            client.performRequestAsync(esRequest, new ResponseListener() {
                @Override
                public void onSuccess(Response response) {
                    try {
                        String text = EntityUtils.toString(response.getEntity());
                        EsResponse esResponse = JSON.parseObject(text, EsResponse.class);
                        future.complete(esResponse);
                    } catch (Exception e) {
                        future.completeExceptionally(e);
                    }
                }
                
                @Override
                public void onFailure(Exception exception) {
                    future.completeExceptionally(exception);
                }
            });
            
        } catch (Exception e) {
            future.completeExceptionally(e);
        }
        
        return future;
    }
}

@l-trotta
Copy link
Contributor

Thank you! Will investigate ASAP

@l-trotta
Copy link
Contributor

l-trotta commented Apr 2, 2025

Hey! I tried reproducing the issue using the provided java code, but on my machine everything works smoothly. The threads temporarily waiting for the Selector is normally okay, so let's see first if it could be something else: could you try executing the same code, without json deseralization, so just returning strings? I can see that FastJsonMapper is a custom mapper implemented over com.alibaba.fastjson, which we never tested, so that could be something worth checking.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants