From 8a7f1cc505678256a1dc52b414adf25630bac381 Mon Sep 17 00:00:00 2001 From: becomeStar Date: Fri, 31 Oct 2025 13:24:05 +0900 Subject: [PATCH 1/2] binder: fix race between newStream() and unregisterInbound() by synchronizing in-use updates Previously, concurrent calls to newStream() and unregisterInbound() could both update numInUseStreams and invoke transportInUse() in conflicting order, leading to inconsistent listener state. This change synchronizes updates and only notifies the listener on transitions between 0 and >0. Fixes #10917 --- .../internal/BinderClientTransport.java | 43 ++++++++++++++----- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java index 54d65936fab..a3b09bdb7cb 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java @@ -57,7 +57,7 @@ import io.grpc.internal.StatsTraceContext; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicInteger; + import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -73,7 +73,12 @@ public final class BinderClientTransport extends BinderTransport private final Bindable serviceBinding; /** Number of ongoing calls which keep this transport "in-use". */ - private final AtomicInteger numInUseStreams; + @GuardedBy("this") + private int numInUseStreams; + + /** Last in-use state that was reported to the listener */ + @GuardedBy("this") + private boolean listenerInUse; private final long readyTimeoutMillis; private final PingTracker pingTracker; @@ -114,7 +119,8 @@ public BinderClientTransport( Boolean preAuthServerOverride = options.getEagAttributes().get(PRE_AUTH_SERVER_OVERRIDE); this.preAuthorizeServer = preAuthServerOverride != null ? preAuthServerOverride : factory.preAuthorizeServers; - numInUseStreams = new AtomicInteger(); + numInUseStreams = 0; + listenerInUse = false; pingTracker = new PingTracker(Ticker.systemTicker(), (id) -> sendPing(id)); serviceBinding = @@ -259,9 +265,7 @@ public synchronized ClientStream newStream( return newFailingClientStream(failure, attributes, headers, tracers); } - if (inbound.countsForInUse() && numInUseStreams.getAndIncrement() == 0) { - clientTransportListener.transportInUse(true); - } + updateInUseStreamsIfNeed(inbound.countsForInUse(), 1); Outbound.ClientOutbound outbound = new Outbound.ClientOutbound(this, callId, method, headers, statsTraceContext); if (method.getType().clientSendsOneMessage()) { @@ -273,9 +277,7 @@ public synchronized ClientStream newStream( @Override protected void unregisterInbound(Inbound inbound) { - if (inbound.countsForInUse() && numInUseStreams.decrementAndGet() == 0) { - clientTransportListener.transportInUse(false); - } + updateInUseStreamsIfNeed(inbound.countsForInUse(), -1); super.unregisterInbound(inbound); } @@ -305,7 +307,9 @@ void notifyShutdown(Status status) { @Override @GuardedBy("this") void notifyTerminated() { - if (numInUseStreams.getAndSet(0) > 0) { + if(numInUseStreams > 0) { + numInUseStreams = 0; + listenerInUse = false; clientTransportListener.transportInUse(false); } if (readyTimeoutFuture != null) { @@ -391,6 +395,25 @@ private synchronized void handleAuthResult(Throwable t) { Status.INTERNAL.withDescription("Could not evaluate SecurityPolicy").withCause(t), true); } + /** Updates in-use-stream count and notifies listener only on transitions between 0 and >0 */ + private synchronized void updateInUseStreamsIfNeed(boolean countsForInUse, int delta) { + if(!countsForInUse) { + return; + } + + numInUseStreams += delta; + if(numInUseStreams < 0) { + // Defensive: prevent negative due to unexpected double-decrement + numInUseStreams = 0; + } + + boolean nowInUseStream = numInUseStreams > 0; + if(nowInUseStream != listenerInUse) { + listenerInUse = nowInUseStream; + clientTransportListener.transportInUse(nowInUseStream); + } + } + @GuardedBy("this") @Override protected void handlePingResponse(Parcel parcel) { From e32c264b2a8afd0f8496dbf6db4ae78e644e21c8 Mon Sep 17 00:00:00 2001 From: becomeStar Date: Thu, 6 Nov 2025 00:27:53 +0900 Subject: [PATCH 2/2] fix: avoid potential deadlock by scheduling listener notifications outside transport lock --- .../internal/BinderClientTransport.java | 88 ++++++++++++++----- 1 file changed, 66 insertions(+), 22 deletions(-) diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java index a3b09bdb7cb..fae22848f7c 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java @@ -25,6 +25,8 @@ import android.os.IBinder; import android.os.Parcel; import android.os.Process; + +import com.google.common.base.Preconditions; import com.google.common.base.Ticker; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -57,6 +59,8 @@ import io.grpc.internal.StatsTraceContext; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -73,12 +77,13 @@ public final class BinderClientTransport extends BinderTransport private final Bindable serviceBinding; /** Number of ongoing calls which keep this transport "in-use". */ - @GuardedBy("this") - private int numInUseStreams; + private final AtomicInteger numInUseStreams; /** Last in-use state that was reported to the listener */ - @GuardedBy("this") - private boolean listenerInUse; + private final AtomicBoolean listenerInUse; + + /** Synchronizes transport listener callbacks */ + private final Object listenerNotifyLock; private final long readyTimeoutMillis; private final PingTracker pingTracker; @@ -119,8 +124,10 @@ public BinderClientTransport( Boolean preAuthServerOverride = options.getEagAttributes().get(PRE_AUTH_SERVER_OVERRIDE); this.preAuthorizeServer = preAuthServerOverride != null ? preAuthServerOverride : factory.preAuthorizeServers; - numInUseStreams = 0; - listenerInUse = false; + this.numInUseStreams = new AtomicInteger(); + this.listenerInUse = new AtomicBoolean(); + this.listenerNotifyLock = new Object(); + pingTracker = new PingTracker(Ticker.systemTicker(), (id) -> sendPing(id)); serviceBinding = @@ -265,7 +272,7 @@ public synchronized ClientStream newStream( return newFailingClientStream(failure, attributes, headers, tracers); } - updateInUseStreamsIfNeed(inbound.countsForInUse(), 1); + updateInUseStreamsCountIfNeeded(inbound.countsForInUse(), 1); Outbound.ClientOutbound outbound = new Outbound.ClientOutbound(this, callId, method, headers, statsTraceContext); if (method.getType().clientSendsOneMessage()) { @@ -277,7 +284,7 @@ public synchronized ClientStream newStream( @Override protected void unregisterInbound(Inbound inbound) { - updateInUseStreamsIfNeed(inbound.countsForInUse(), -1); + updateInUseStreamsCountIfNeeded(inbound.countsForInUse(), -1); super.unregisterInbound(inbound); } @@ -307,9 +314,8 @@ void notifyShutdown(Status status) { @Override @GuardedBy("this") void notifyTerminated() { - if(numInUseStreams > 0) { - numInUseStreams = 0; - listenerInUse = false; + if (numInUseStreams.getAndSet(0) > 0) { + listenerInUse.set(false); clientTransportListener.transportInUse(false); } if (readyTimeoutFuture != null) { @@ -395,25 +401,63 @@ private synchronized void handleAuthResult(Throwable t) { Status.INTERNAL.withDescription("Could not evaluate SecurityPolicy").withCause(t), true); } - /** Updates in-use-stream count and notifies listener only on transitions between 0 and >0 */ - private synchronized void updateInUseStreamsIfNeed(boolean countsForInUse, int delta) { - if(!countsForInUse) { + /** + * Updates in-use stream count and notifies listener only on transitions between 0 and >0, without + * acquiring the transport lock. + */ + private void updateInUseStreamsCountIfNeeded(boolean countsForInUse, int delta) { + Preconditions.checkArgument(delta == -1 || delta == 1, "stream count delta must be -1 or +1"); + if (!countsForInUse) { return; } + int prev, next; - numInUseStreams += delta; - if(numInUseStreams < 0) { - // Defensive: prevent negative due to unexpected double-decrement - numInUseStreams = 0; + if (delta > 0) { + next = numInUseStreams.incrementAndGet(); + prev = next - 1; + } else { + prev = numInUseStreams.get(); + int updated; + + while (true) { + int current = prev; + int newValue = current > 0 ? current - 1 : 0; + if (numInUseStreams.compareAndSet(current, newValue)) { + updated = newValue; + break; + } + prev = numInUseStreams.get(); + } + next = updated; } - boolean nowInUseStream = numInUseStreams > 0; - if(nowInUseStream != listenerInUse) { - listenerInUse = nowInUseStream; - clientTransportListener.transportInUse(nowInUseStream); + boolean prevInUse = prev > 0; + boolean nextInUse = next > 0; + + if (prevInUse != nextInUse) { + if (listenerInUse.compareAndSet(prevInUse, nextInUse)) { + scheduleTransportInUseNotification(nextInUse); + } } } + private void scheduleTransportInUseNotification(final boolean inUse) { + getScheduledExecutorService() + .execute( + new Runnable() { + @Override + public void run() { + // Provide external synchronization as required by Listener contract, + // without taking the transport lock to avoid potential deadlocks. + synchronized (listenerNotifyLock) { + if (listenerInUse.get() == inUse) { + clientTransportListener.transportInUse(inUse); + } + } + } + }); + } + @GuardedBy("this") @Override protected void handlePingResponse(Parcel parcel) {