Skip to content

Commit df3f751

Browse files
committed
stub: ensure BlockingClientCall tasks to run even after cancellation
After canceling a `BlockingClientCall`, the caller may not interact with it further. That means the call executor, a `ThreadSafeThreadlessExecutor`, will not execute any more tasks. There may still be tasks submitted to the call executor, though, until the underlying call completes. Some of these tasks (e.g., server messages available) may leak native resources unless executed. So, we convert the executor to a "direct" executor during cancellation in order to ensure all call tasks run. Fixes #12355.
1 parent 9cddcb4 commit df3f751

File tree

2 files changed

+30
-2
lines changed

2 files changed

+30
-2
lines changed

stub/src/main/java/io/grpc/stub/BlockingClientCall.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,13 @@ void sendSingleRequest(ReqT request) {
249249
public void cancel(String message, Throwable cause) {
250250
writeClosed = true;
251251
call.cancel(message, cause);
252+
// After canceling a BlockingClientCall, the caller may not interact with it further. That means
253+
// the call executor, a ThreadSafeThreadlessExecutor, will not execute any more tasks. There may
254+
// still be tasks submitted to the call executor, though, until the underlying call completes.
255+
// Some of these tasks (e.g., server messages available) may leak native resources unless
256+
// executed. So, we convert the executor to a "direct" executor in order to ensure all call
257+
// tasks run.
258+
executor.becomeDirect();
252259
}
253260

254261
/**

stub/src/main/java/io/grpc/stub/ClientCalls.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,7 @@ static final class ThreadSafeThreadlessExecutor extends ConcurrentLinkedQueue<Ru
871871

872872
private final Lock waiterLock = new ReentrantLock();
873873
private final Condition waiterCondition = waiterLock.newCondition();
874+
private boolean direct;
874875

875876
// Non private to avoid synthetic class
876877
ThreadSafeThreadlessExecutor() {}
@@ -950,6 +951,17 @@ void drain() {
950951
}
951952
}
952953

954+
/**
955+
* Turn the executor into a "direct" executor. Further calls to {@link #execute(Runnable)} will
956+
* executable the runnable immediately in the calling thread.
957+
*/
958+
void becomeDirect() {
959+
waiterLock.lock();
960+
direct = true;
961+
waiterLock.unlock();
962+
drain();
963+
}
964+
953965
private void signalAll() {
954966
waiterLock.lock();
955967
try {
@@ -975,13 +987,22 @@ private static void throwIfInterrupted() throws InterruptedException {
975987

976988
@Override
977989
public void execute(Runnable runnable) {
990+
boolean added = false;
978991
waiterLock.lock();
979992
try {
980-
add(runnable);
981-
waiterCondition.signalAll(); // If anything is waiting let it wake up and process this task
993+
if (!direct) {
994+
added = true;
995+
add(runnable);
996+
// If anything is waiting, let it wake up and process this task.
997+
waiterCondition.signalAll();
998+
}
982999
} finally {
9831000
waiterLock.unlock();
9841001
}
1002+
if (!added) {
1003+
runQuietly(runnable);
1004+
signalAll();
1005+
}
9851006
}
9861007
}
9871008

0 commit comments

Comments
 (0)