Skip to content

Commit 6a6caf9

Browse files
committed
stub: ensure BlockingClientCall tasks run after cancellation
After canceling a `BlockingClientCall`, the caller may not interact with it further. That means the call executor, a `ThreadSafeThreadlessExecutor`, will not execute any more tasks. There may still be tasks submitted to the call executor, though, until the underlying call completes. Some of these tasks (e.g., server messages available) may leak native resources unless executed. So, we convert the executor to a "direct" executor during cancellation in order to ensure all call tasks run. Fixes #12355.
1 parent 9cddcb4 commit 6a6caf9

File tree

2 files changed

+33
-2
lines changed

2 files changed

+33
-2
lines changed

stub/src/main/java/io/grpc/stub/BlockingClientCall.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,13 @@ void sendSingleRequest(ReqT request) {
249249
public void cancel(String message, Throwable cause) {
250250
writeClosed = true;
251251
call.cancel(message, cause);
252+
// After canceling a BlockingClientCall, the caller may not interact with it further. That means
253+
// the call executor, a ThreadSafeThreadlessExecutor, will not execute any more tasks. There may
254+
// still be tasks submitted to the call executor, though, until the underlying call completes.
255+
// Some of these tasks (e.g., server messages available) may leak native resources unless
256+
// executed. So, we convert the executor to a "direct" executor in order to ensure all call
257+
// tasks run.
258+
executor.becomeDirect();
252259
}
253260

254261
/**

stub/src/main/java/io/grpc/stub/ClientCalls.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,7 @@ static final class ThreadSafeThreadlessExecutor extends ConcurrentLinkedQueue<Ru
871871

872872
private final Lock waiterLock = new ReentrantLock();
873873
private final Condition waiterCondition = waiterLock.newCondition();
874+
private boolean direct;
874875

875876
// Non private to avoid synthetic class
876877
ThreadSafeThreadlessExecutor() {}
@@ -950,6 +951,20 @@ void drain() {
950951
}
951952
}
952953

954+
/**
955+
* Turn the executor into a "direct" executor. Further calls to {@link #execute(Runnable)} will
956+
* executable the runnable immediately in the calling thread.
957+
*/
958+
void becomeDirect() {
959+
waiterLock.lock();
960+
try {
961+
direct = true;
962+
} finally {
963+
waiterLock.unlock();
964+
}
965+
drain();
966+
}
967+
953968
private void signalAll() {
954969
waiterLock.lock();
955970
try {
@@ -975,13 +990,22 @@ private static void throwIfInterrupted() throws InterruptedException {
975990

976991
@Override
977992
public void execute(Runnable runnable) {
993+
boolean added = false;
978994
waiterLock.lock();
979995
try {
980-
add(runnable);
981-
waiterCondition.signalAll(); // If anything is waiting let it wake up and process this task
996+
if (!direct) {
997+
added = true;
998+
add(runnable);
999+
// If anything is waiting, let it wake up and process this task.
1000+
waiterCondition.signalAll();
1001+
}
9821002
} finally {
9831003
waiterLock.unlock();
9841004
}
1005+
if (!added) {
1006+
runQuietly(runnable);
1007+
signalAll();
1008+
}
9851009
}
9861010
}
9871011

0 commit comments

Comments
 (0)